了解eventhub中的检查点

编程入门 行业动态 更新时间:2024-10-28 02:36:27
本文介绍了了解eventhub中的检查点的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我想确保,如果我的eventhub客户端崩溃(当前是一个控制台应用程序),它只会拾取尚未从eventhub中获取的事件.实现此目的的一种方法是利用偏移量.但是,(据我所知)这需要客户端存储最新的偏移量(事件似乎不一定会按SequenceNumber排序到ProcessEventsAsync方法的foreach循环).

I want to ensure that, if my eventhub client crashes (currently a console application), it only picks up events it has not yet taken from the eventhub. One way to achieve this, is to exploit offsets. However, this (to my understanding) requires the client to store the latest offset (besides events do not necessarily seem to hit the foreach loop of the ProcessEventsAsync method ordered by SequenceNumber).

一种替代方法是使用检查点.我认为它们是使用提供的存储帐户凭据通过服务器(eventhub)保留的.这是正确的吗?

An alternative, is to use checkpoints. I think they are persisted via the server (eventhub) using the provided storage account credentials. Is this correct?

这是我当前正在使用的一些初步代码:

This is some preliminary code I am currently using:

public class SimpleEventProcessor : IEventProcessor { private Stopwatch _checkpointStopWatch; async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason) { Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason); if (reason == CloseReason.Shutdown) { await context.CheckpointAsync(); } } Task IEventProcessor.OpenAsync(PartitionContext context) { Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset); _checkpointStopWatch = new Stopwatch(); _checkpointStopWatch.Start(); return Task.FromResult<object>(null); } async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) { foreach (var eventData in messages) { // do something } //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts. if (_checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5)) { await context.CheckpointAsync(); _checkpointStopWatch.Restart(); } } }

我相信它每5分钟发送一次向服务器创建一个检查点.服务器如何知道哪个客户端(通过上下文)提交了检查点?另外,如果客户端重新启动,如何防止事件再次处理?此外,仍然可能有一个长达5分钟的窗口,在该窗口中再次处理事件.也许我会根据需要使用队列/主题?

I believe it sends creates a checkpoint to the server every 5 minutes. How does the server know, which client has submitted the checkpoint (via the context)? Also, how can I prevent events from processed again if the client restarts? Furthermore, there could still be an up to 5 minutes window in which events are processed again. Perhaps I should rather use a queue/topic given my requirement?

PS:

这似乎足够:

async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) { foreach (var eventData in messages) { // do something } await context.CheckpointAsync(); }

推荐答案

Lemme在回答之前提出了一些基本术语:

EventHubs 是高吞吐量的持久事件获取管道.简而言之-在云上可靠的事件流 .

偏移量(实际上是流中的一个游标)是EventData(流中的一个事件).拥有此光标-将启用类似的操作-从该光标重新开始读取(又名偏移")-包含或排除.

EventProcessor库 是EventHubs团队构建的框架,位于ServiceBus SDK的顶层,可简化"eventhub接收器". Kafka的ZooKeeper<-> Event Hub的EPH .它将确保在特定分区上运行EventProcessor的进程死/崩溃-将在其他可用EventProcessorHost实例中从最后一个检查点偏移量恢复.

CheckPoint :从今天开始-EventHubs仅支持客户端检查点.当您从客户代码致电Checkpoint时:

Lemme put forward a few basic terminology before answering:

EventHubs is high-thruput durable event ingestion pipeline. Simply put - its a reliable stream of events on Cloud.

Offset on EventData (one Event in the stream) is literally a Cursor on the Stream. Having this Cursor - will enable operations like - restart reading from this cursor (aka Offset) - inclusive or exclusive.

EventProcessor library is a framework that EventHubs team built, on-Top-of ServiceBus SDK to make "eventhub receiver gu" - look easier. ZooKeeper for Kafka <-> EPH for Event Hub. It will make sure when the process running EventProcessor on a specific partition dies/crashes - it will be resumed from last Checkpointed offset - in other available EventProcessorHost instance.

CheckPoint : as of today - EventHubs only supports client-side check-pointing. When you call Checkpoint from your Client-code:

await context.CheckpointAsync();

-它会转换为存储调用(直接从客户端)-将当前偏移量存储在您提供的存储帐户中. EventHubs服务不会与存储对话以进行检查.

- it will translate to a Storage call (directly from Client) - which will store the current offset in the storage account you provided. EventHubs Service will not talk to Storage for Check-pointing.

答案

EventProcessor框架旨在完全实现您所寻找的目标.

检查点无法通过服务器(也称为EVENTHUBS服务)保留.它纯粹是客户端.您正在使用Azure存储.这就是EventProcessor库引入新的附加依赖项的原因- AzureStorageClient .您可以连接到存储帐户&检查点写入到的容器-我们维护所有权信息-EPH实例(名称)到它们拥有的EventHub的分区以及它们当前在哪个检查点读取/处理到.

按照基于计时器的检查点检查模式-您原来有-如果进程关闭-您将在最近5分钟的窗口中重新执行事件. 这是一种健康的模式,如下:

THE ANSWER

EventProcessor framework is meant to achieve exactly what you are looking for.

Checkpoints are not persisted via Server (aka EVENTHUBS Service). Its purely client-side. You are talking to Azure storage. That's the reason EventProcessor library brings in a new additional dependency - AzureStorageClient. You can connect to the storage account & the container to which the checkpoints are written to - we maintain the ownership information - EPH instances (name) to Partitions of EventHubs they own and at what checkpoint they currently read/processed until.

As per the timer based checkpoint'ing pattern - you originally had - if the Process goes down - you will re-do the events in last 5 minute window. This is a healthy pattern as:

  • 基本假设是:故障是罕见事件-因此您 很少处理重复事件
  • 你最终会减少 调用存储服务(您可能会很容易对此感到不知所措 经常检查点).我会更进一步, 实际上,会异步触发检查点调用. OnProcessEvents 如果检查点失败,则不必失败!
  • fundamental assumption is that Faults are rare events - so you will deal with duplicate events rarely
  • you will end-up make less calls to Storage service (which you could easily overwhelm by check-pointing frequently). I would go one step further and actually, would fire checkpoint call asynchronously. OnProcessEvents need not fail if checkpoint fails!
  • 如果您希望绝对没有重复事件发生-您将需要在下游管道中构建此重复数据删除逻辑.

    if you want absolutely no-events to repeat - you will need to build this de-duplication logic in the down-stream pipeline.

    • 每次EventProcessorImpl启动时-向下游查询最后的序列号.它得到并继续丢弃事件,直到当前序列号.

    这里是有关事件中心的更一般的阅读内容...

    更多推荐

    了解eventhub中的检查点

    本文发布于:2023-07-27 23:20:21,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1225255.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:检查点   eventhub

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!