CommitFailedException 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员

编程入门 行业动态 更新时间:2024-10-25 12:18:32
本文介绍了CommitFailedException 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我使用的是 kafka 0.10.2,现在遇到了 CommitFailedException.喜欢:

I was using kafka 0.10.2 and now faced a CommitFailedException. like:

提交无法完成,因为组已经重新平衡并且将分区分配给另一个成员.这意味着时间对 poll() 的后续调用之间的时间比配置的要长max.poll.interval.ms,这通常意味着轮询循环是花费太多时间处理消息.你可以解决这个问题通过增加会话超时或减少最大大小使用 max.poll.records 在 poll() 中返回的批次.

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

我已将 max.poll.interval.ms 设置为 Integer.MAX_VALUE.所以谁能告诉我为什么即使我已经设置了这个值仍然会发生?

I have set max.poll.interval.ms to Integer.MAX_VALUE. so can anyone tell me why this still happens even I have set the value ?

另一个问题是:我按照说明将 session.timeout.ms 设置为 60000,但它仍然发生.我尝试通过简单的代码重现

Another question is: I do as description to set session.timeout.ms to 60000 and it still happens. I try to reproduce by a simple code

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.automit", "true");
        props.put("automit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

当我将 session.timeout.ms 设置为 10000 时,我尝试在轮询循环中睡眠超过 10000 毫秒,但它似乎有效并且没有异常.所以我对此感到困惑.如果心跳是由consumer.poll 和consumermit 触发的,则在我的代码中,心跳似乎已超出会话超时.为什么不抛出 CommitFailedException ?

when I set session.timeout.ms to 10000, I try to sleep more than 10000 ms in my poll loop, but it seems work and no Exception out. so I'm confused about this. if heartbeat is triggered by consumer.poll and consumermit, seems heartbeat is out of session timeout in my code. why not throw CommitFailedException ?

推荐答案

session.timeout.ms 设置在消费者上应该小于 group.max.session.timeout.ms 在 Kafka 代理上设置.

session.timeout.ms set on the consumer should be less than the group.max.session.timeout.ms set on Kafka broker.

这为我解决了这个问题.

This resolved the issue for me.

归功于 github 链接 提交失败

Credit to github link Commit Failures

这篇关于CommitFailedException 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 13:48:30,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/963050.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:并将   为该   成员   CommitFailedException

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!