问题描述
限时送ChatGPT账号..我正在使用 KafkaMessageListenerContainer 从 kafka 主题中进行消费,我有一个应用程序逻辑来处理每个依赖于其他微服务的记录.我现在在处理每条记录后手动提交偏移量.
I am using the KafkaMessageListenerContainer for consuming from the kafka topic, I have an application logic to process each record which is dependent on other micro services as well. I am now manually committing the offset after each record is processed.
但是如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它直到它成功.为此,我需要对最后一个偏移量进行运行时手动查找.
But if I the application logic fails I need to seek to the failed offset and keep processing it until it's succeeds. For that I need to do a run time manual seek of the last offset.
KafkaMessageListenerContainer 是否可以做到这一点?
Is this possible with the KafkaMessageListenerContainer yet ?
推荐答案
参见 寻求特定偏移.
为了进行搜索,您的侦听器必须实现 ConsumerSeekAware
,它具有以下方法:
In order to seek, your listener must implement
ConsumerSeekAware
which has the following methods:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map
void onIdleContainer(Map
容器启动时调用第一个;在初始化后的某个任意时间进行查找时,应使用此回调.您应该保存对回调的引用;如果您在多个容器(或 ConcurrentMessageListenerContainer
)中使用相同的侦听器,则应将回调存储在 ThreadLocal
或由侦听器 Thread 键控的其他结构中.>
The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer
) you should store the callback in a ThreadLocal
or some other structure keyed by the listener Thread.
这篇关于Spring kafka 消费者,在运行时寻求偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论