当AWS KCL processRecords失败时,如何“标记”应该重新处理记录?(When AWS KCL processRecords is failed, how to “mark” that

编程入门 行业动态 更新时间:2024-10-25 08:26:30
当AWS KCL processRecords失败时,如何“标记”应该重新处理记录?(When AWS KCL processRecords is failed, how to “mark” that the records should be reprocessed?)

我正在使用AWS DynamoStream,他的API基于AWS KCL。

如果我收到了我未能处理的记录,我希望以后可以使用这些记录来重新处理它们。 例如,我正在尝试将它们保存到远程数据库,我有时会遇到网络问题。

我的问题是:

我可以用某种方式使用Checkpointer来表明我没有处理过这些记录吗? 我应该避免执行Checkpointer.checkpoint()吗? 如果我仍然在下一次processRecords调用中使用它会有什么影响吗? 我可能会为此目的使用任何例外吗?

I'm working with AWS DynamoStream which his API is based on the AWS KCL.

In cases I received records which I failed to process and I want those records to be available later to allow reprocessing of them. For instance I'm trying to save them to a remote DB and I experience network issues sometime.

My questions are:

Can I use the Checkpointer in some way to indicate I Didn't handled the records? Should I just avoid executing Checkpointer.checkpoint()? will it have any effect if I still use it in the next call of processRecords? Is there maybe any exception I may use for that purpose?

最满意答案

KCL不提供这种内置的重新驱动机制 - 一旦processRecords返回(无论是抛出异常还是成功返回),它都会将这些记录视为已处理并继续运行,即使内部失败也是如此。

如果您希望稍后重新处理某些记录,则需要捕获这些记录并将其存储在其他位置以便稍后进行重新处理(明显需要注意的是,它们不会从流的其余部分按顺序处理)。

最简单的解决方案是让记录处理器逻辑识别失败的记录(在返回KCL之前)并将它们发送到SQS队列。 然后,记录不会丢失,并且它们可以在您闲暇时(或者使用SQS队列的另一个进程处理,可能使用DLQ机制来处理重复的故障/放弃方案)。

回答您的具体问题:

不。 检查点只是说“我已经走到这一步,不要在检查站前查看事情” 想象检查点就像一个全球状态。 一旦设定,它就包含了之前的所有内容。 您也不需要检查每次调用processRecords - 您可能每隔X秒或每个Y记录等执行一次。 不是在KCL级别 - 您可以在内部使用特殊的异常类型,并在返回Kinesis之前在processRecords的外层捕获它。 或者您可以捕获所有异常 - 这取决于您以及您希望如何使用重新驱动逻辑。

KCL does not provide this sort of built-in redrive mechanism - once processRecords returns (whether it threw an exception or returned successfully), it considers those records as processed and moves on, even if internally it failed.

If you want to reprocess some records at a later point, you need to capture those records and store them somewhere else for reprocessing attempt later (with the obvious caveat that they won't be processed in order from the rest of the stream).

The simplest solution for this is to have your record processor logic identify the failed records (before returning to KCL) and send them to an SQS queue. Then, the records aren't lost, and they're available for processing at your leisure (or by another process consuming the SQS queue, possibly with a DLQ mechanism for handling repeated failures / give-up scenarios).

To answer your specific questions:

Nope. Checkpointing just says "I've got this far, don't look at things before the checkpoint" Think of checkpointing like a global state. Once it's set, it encompasses everything that came before it. You also don't need to checkpoint every call to processRecords - you might do it every X seconds, or every Y records, etc. Not at KCL level - you could use a special exception type internally, and catch that at your outer level of processRecords just before you return to Kinesis. Or you could just catch all exceptions - it's up to you and how specific you want to be with your redrive logic.

更多推荐

本文发布于:2023-07-09 02:24:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1082892.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:标记   processRecords   KCL   AWS   reprocessed

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!