问题描述
限时送ChatGPT账号..我使用的是 kafka 0.10.2,现在遇到了 CommitFailedException.喜欢:
I was using kafka 0.10.2 and now faced a CommitFailedException. like:
提交无法完成,因为组已经重新平衡并且将分区分配给另一个成员.这意味着时间对 poll() 的后续调用之间的时间比配置的要长max.poll.interval.ms,这通常意味着轮询循环是花费太多时间处理消息.你可以解决这个问题通过增加会话超时或减少最大大小使用 max.poll.records 在 poll() 中返回的批次.
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
我已将 max.poll.interval.ms 设置为 Integer.MAX_VALUE.所以谁能告诉我为什么即使我已经设置了这个值仍然会发生?
I have set max.poll.interval.ms to Integer.MAX_VALUE. so can anyone tell me why this still happens even I have set the value ?
另一个问题是:我按照说明将 session.timeout.ms 设置为 60000,但它仍然发生.我尝试通过简单的代码重现
Another question is: I do as description to set session.timeout.ms to 60000 and it still happens. I try to reproduce by a simple code
public static void main(String[] args) throws InterruptedException {
Logger logger = Logger.getLogger(KafkaConsumer10.class);
logger.info("XX");
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker:9098");
props.put("group.id", "test");
props.put("enable.automit", "true");
props.put("automit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
props.put("max.poll.interval.ms", "300000");
props.put("session.timeout.ms", "10000");
props.put("max.poll.records", "2");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("t1"));
while (true) {
Thread.sleep(11000);
ConsumerRecords<String, String> records = consumer.poll(100);
//Thread.sleep(11000);
Thread.sleep(11000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
当我将 session.timeout.ms 设置为 10000 时,我尝试在轮询循环中睡眠超过 10000 毫秒,但它似乎有效并且没有异常.所以我对此感到困惑.如果心跳是由consumer.poll 和consumermit 触发的,则在我的代码中,心跳似乎已超出会话超时.为什么不抛出 CommitFailedException ?
when I set session.timeout.ms to 10000, I try to sleep more than 10000 ms in my poll loop, but it seems work and no Exception out. so I'm confused about this. if heartbeat is triggered by consumer.poll and consumermit, seems heartbeat is out of session timeout in my code. why not throw CommitFailedException ?
推荐答案
session.timeout.ms
设置在消费者上应该小于 group.max.session.timeout.ms
在 Kafka 代理上设置.
session.timeout.ms
set on the consumer should be less than the group.max.session.timeout.ms
set on Kafka broker.
这为我解决了这个问题.
This resolved the issue for me.
归功于 github 链接 提交失败
Credit to github link Commit Failures
这篇关于CommitFailedException 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论