Spring Kafka:轮询新消息,而不是使用onMessage进行通知

编程入门 行业动态 更新时间:2024-10-28 04:28:04
本文介绍了Spring Kafka:轮询新消息,而不是使用onMessage进行通知的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我在我的项目中使用Spring Kafka,因为在基于Spring的项目中使用Kafka消息似乎是很自然的选择.要使用消息,我可以使用MessageListener接口. Spring Kafka在内部负责为每条新消息调用我的onMessage方法.

I am using Spring Kafka in my project as it seemed a natural choice in a Spring based project to consume Kafka messages. To consume messages, I can make use of the MessageListener interface. Spring Kafka internally takes care to invoke my onMessage method for each new message.

但是,在我的设置中,我希望显式轮询新消息并按顺序进行处理(这将需要几秒钟).作为一种解决方法,我可能只是阻塞我的onMessage实现内部,或在内部缓冲消息.但是,这似乎违背了Spring Kafka的核心思想.

However, in my setting I prefer to explicitly poll for new messages and work on them sequentially (which will take a few seconds). As a workaround, I might just block inside my onMessage implementation, or buffer the messages internally. However, this seems to go against the core idea of Spring Kafka.

Kafka的设计使消费者不得不轮询新邮件,这符合我的要求.有没有办法在Spring Kafka中利用这种自然"的工作流程?

Kafka is designed so that consumers have to poll for new messages, which matches my requirements. Is there a way to make use of this "natural" workflow with Spring Kafka?

在这种使用情况下,我应该避免使用Spring Kafka吗?

Should I refrain from using Spring Kafka for this use case?

文档状态:

对于消息处理时间意外变化的用例, 这些选项都不足够.推荐的方式 处理这些情况是将消息处理移至另一个线程, 允许消费者在处理器处理过程中继续调用民意调查 仍在工作.必须采取一定的措施以确保承诺 偏移量不超过实际位置.通常,您必须 禁用自动提交并手动提交已处理的偏移量 仅在线程完成处理后记录(取决于 您需要的投放语义).另请注意,您将需要 暂停分区,以便从轮询中没有收到新记录 直到线程处理完之前返回的内容为止.

For use cases where message processing time varies unpredictably, neither of these options may be sufficient. The recommended way to handle these cases is to move message processing to another thread, which allows the consumer to continue calling poll while the processor is still working. Some care must be taken to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause the partition so that no new records are received from poll until after thread has finished handling those previously returned.

相关问题: github/spring-projects/spring-kafka/issues/195

推荐答案

必须继续轮询消费者的问题现已解决(在0.10.1.x中,由 KIP-62 ),这样就不会问题不再存在(只要您不超过max.poll.interval.ms),默认情况下是5分钟,但可以增加.

The issue with having to keep polling the consumer has now been resolved (in 0.10.1.x by KIP-62) so that's not an issue any more (as long as you don't exceed the max.poll.interval.ms) which is 5 mins by default but can be increased.

但是,如果要轮询自己,则仍然可以使用spring-kafka(例如,如果使用Boot,则可以获取Spring Boot自动配置的优点),但是可以从DefaultKafkaConsumerFactory和Consumer中获得Consumer. poll()直接.

However, if you want to poll yourself, you can still use spring-kafka (e.g. to get the Spring Boot auto configuration goodness if you are using Boot), but you can get a Consumer from the DefaultKafkaConsumerFactory and poll() it directly.

更多推荐

Spring Kafka:轮询新消息,而不是使用onMessage进行通知

本文发布于:2023-10-24 05:58:33,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1523113.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:而不是   新消息   通知   Spring   Kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!