我有一个带有多个分区的Kafka主题,我想知道Java中是否有一种方法可以获取该主题的最后一条消息.我不在乎我只是想获取最新消息的分区.
I have a Kafka topic with multiple partitions and I wonder if there is a way in Java to fetch the last message for the topic. I don't care for the partitions I just want to get the latest message.
我尝试了 @KafkaListener ,但是只有在主题更新时,它才会获取消息.如果打开应用程序后没有发布任何内容,则不会返回任何内容.
I have tried @KafkaListener but it fetches the message only when the topic is updated. If there is nothing published after the application is opened nothing is returned.
也许听众根本不是解决问题的正确方法?
Maybe the listener is not the right approach to the problem at all?
推荐答案以下代码段对我有用.您可以尝试一下.注释中的解释.
This following snippet worked for me. You may try this. Explanation in the comments.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList(topic)); consumer.poll(Duration.ofSeconds(10)); consumer.assignment().forEach(System.out::println); AtomicLong maxTimestamp = new AtomicLong(); AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>(); // get the last offsets for each partition consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> { System.out.println("offset: "+offset); // seek to the last offset of each partition consumer.seek(topicPartition, (offset==0) ? offset:offset - 1); // poll to get the last record in each partition consumer.poll(Duration.ofSeconds(10)).forEach(record -> { // the latest record in the 'topic' is the one with the highest timestamp if (record.timestamp() > maxTimestamp.get()) { maxTimestamp.set(record.timestamp()); latestRecord.set(record); } }); }); System.out.println(latestRecord.get());更多推荐
有没有办法从Kafka主题获取最新消息?
发布评论