如何获取kafka主题的分区的最新偏移量?

编程入门 行业动态 更新时间:2024-10-22 09:35:14
本文介绍了如何获取kafka主题的分区的最新偏移量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我正在为 Kafka 使用 Python 高级消费者,并且想知道主题的每个分区的最新偏移量.但是我无法让它工作.

I am using the Python high level consumer for Kafka and want to know the latest offsets for each partition of a topic. However I cannot get it to work.

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()

但我得到的输出是

For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None

我有一个使用 assign 的替代方法,但结果是一样的

I have an alternate approach using assign but the result is the same

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()

从一些文档看来,如果 fetch 尚未发布,我可能会得到这种行为.但我找不到一种方法来强迫它.我做错了什么?

It seems from some of the documentation that I might get this behaviour if a fetch has not been issued. But I cannot find a way to force that. What am I doing wrong?

或者是否有不同/更简单的方法来获取主题的最新偏移量?

Or is there a different/simpler way to get the latest offsets for a topic?

推荐答案

最后,在花了一天的时间和几次错误的启动之后,我终于找到了解决方案并让它发挥作用.把它贴给她,以便其他人可以参考.

Finally after spending a day on this and several false starts, I was able to find a solution and get it working. Posting it her so that others may refer to it.

from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafkamon import OffsetRequestPayload

client = SimpleClient(brokers)

partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]

offsets_responses = client.send_offset_request(offset_requests)

for r in offsets_responses:
    print "partition = %s, offset = %s"%(r.partition, r.offsets[0])

这篇关于如何获取kafka主题的分区的最新偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 13:01:55,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/963090.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:分区   偏移量   主题   最新   kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!