Python Kafka消费者缺少轮询一些消息

编程入门 行业动态 更新时间:2024-10-24 08:31:49
本文介绍了Python Kafka消费者缺少轮询一些消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我的 Kafka 消费者的代码如下

The code for my Kafka consumer looks like this

def read_messages_from_kafka():
    topic = 'my-topic'
    consumer = KafkaConsumer(
        bootstrap_servers=['my-host1', 'my-host2'],
        client_id='my-client',
        group_id='my-group',
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        api_version=(0, 8, 2)
    )
    consumer.assign([TopicPartition(topic, 0), TopicPartition(topic, 1)])

    messages = consumer.poll(timeout_ms=kafka_config.poll_timeout_ms, max_records=kafka_config.poll_max_records)

    for partition in messages.values():
        for message in partition:
            log.info("read {}".format(message))

    if messages:
        consumermit()

    next_offset0, next_offset1 = consumer.position(TopicPartition(topic, 0)), consumer.position(TopicPartition(topic, 1))
    log.info("next offset0={} and offset1={}".format(next_offset0, next_offset1))

while True:
    read_messages_from_kafka()
    sleep(kafka_config.poll_sleep_ms / 1000.0)

我意识到这种消费者设置无法读取所有消息.我无法重现此问题,因为它是间歇性问题.

I have realised that this setup of consumer is not able to read all the messages. And I am not able to reproduce this as it's intermittent issue.

当我将使用 kafka-cat 的最后 100 条消息与此消费者进行比较时,我发现我的消费者间歇性地随机错过了几条消息.我的消费者有什么问题?

When I compare last 100 messages using kafka-cat to this consumer, I found that my consumer intermittently misses few messages randomly. What's wrong with my consumer?

kafkacat -C -b my-host1 -X broker.version.fallback=0.8.2.1 -t my-topic -o -100

只有在python中使用消息的方法太多了.应该有一种方法,最好只有一种明显的方法.

There are just too many ways to consume messages in python. There should be one and preferably only one obvious way to do it.

推荐答案

您的 Kafka 客户端存在丢失消息的问题.我在此处找到了解决方案:

There is a problem of missing messages in your Kafka client. I found solution here:

while True:
    raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
    for topic_partition, messages in raw_messages.items():
        application_message = json.loads(message.value.decode())

还有另外一个 Kafka 客户端存在:confluent_kafka.它没有这样的问题.

Also there is another Kafka client exists: confluent_kafka. It has no such problem.

这篇关于Python Kafka消费者缺少轮询一些消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 13:13:13,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/963112.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:消费者   消息   Python   Kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!