本文介绍了计算存储在 kafka 主题中的消息数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
限时送ChatGPT账号..我使用的是 0.9.0.0 版本的 Kafka,我想在不使用管理脚本 kafka-console-consumer.sh 的情况下计算主题中的消息数.
I'm using 0.9.0.0 version of Kafka and I want to count the number of messages in a topic without using the admin script kafka-console-consumer.sh.
我已经尝试了答案中的所有命令 Java,如何在 apache kafka 中获取主题中的消息数但没有一个产生结果.有人可以帮我吗?
I have tried all the commands in the answer Java, How to get number of messages in a topic in apache kafka but none are yielding the result. Can anyone help me out here?
推荐答案
您可以尝试执行以下命令:
You could try to execute the command below:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1
然后,将每个分区的所有计数相加.
Then, sum up all the counts for each partition.
更新:Java 实现
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
......
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("your_topic"));
Set<TopicPartition> assignment;
while ((assignment = consumer.assignment()).isEmpty()) {
consumer.poll(Duration.ofMillis(100));
}
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment);
assert (endOffsets.size() == beginningOffsets.size());
assert (endOffsets.keySet().equals(beginningOffsets.keySet()));
Long totalCount = beginningOffsets.entrySet().stream().mapToLong(entry -> {
TopicPartition tp = entry.getKey();
Long beginningOffset = entry.getValue();
Long endOffset = endOffsets.get(tp);
return endOffset - beginningOffset;
}).sum();
System.out.println(totalCount);
}
这篇关于计算存储在 kafka 主题中的消息数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论