本文介绍了春天·卡夫卡听着regex的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试使用以下代码收听新创建的主题,但不起作用。您能告诉我下面的代码是否正确吗?
public class KafkaMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class); private final ProcessEventModel eventModel; @KafkaListener(topicPattern = "betsyncDataTopic*") public void receive(ConsumerRecord<String, String> consumerRecord) { LOGGER.info("received payload at '{}'", consumerRecord.timestamp()); eventModel.process(consumerRecord.value()); } 推荐答案您的正则表达式无效;它应该是betsyncDataTopic.*。
@KafkaListener(id = "xxx", topicPattern = "kbgh.*") public void listen(String in) { System.out.println(in); }...
partitions assigned: [kbgh290-0]编辑
如果稍后添加与模式匹配的新主题,则在重新平衡之前会有延迟。根据KafkaConsumerjavadoc...
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * The pattern matching will be done periodically against topic existing at the time of check. * <p> * As part of group management, the consumer will keep track of the list of consumers that * belong to a particular group and will trigger a rebalance operation if one of the * following events trigger - * <ul> * <li>Number of partitions change for any of the subscribed list of topics * <li>Topic is created or deleted * <li>An existing member of the consumer group dies * <li>A new member is added to an existing consumer group via the join API * </ul>我刚刚运行了一个测试;在12:13:32处添加了一个新的匹配主题;结果:
2018-02-12 12:17:30.394 INFO 88028 --- [ xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [kbgh290-0] 2018-02-12 12:17:30.450 INFO 88028 --- [ xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [kbgh290-0, kbghNew-0]所以需要几分钟。
更多推荐
春天·卡夫卡听着regex
发布评论