Kafka重新启动时如何使Kafka Source重新连接

编程入门 行业动态 更新时间:2024-10-25 14:23:26
本文介绍了Kafka重新启动时如何使Kafka Source重新连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我使用Reactive Kafka创建消费者记录的Source,如下所示:

I create a Source of consumer records using Reactive Kafka as follows:

val settings = ConsumerSettings(system, keyDeserializer, valueDeserializer) .withBootstrapServers(bootstrapServers) .withGroupId(groupName) // what offset to begin with if there's no offset for this group .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // do we want to automatically commit offsets? .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") // auto-commit offsets every 1 minute, in the background .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") // reconnect every 1 second, when disconnected .withProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000") // every session lasts 30 seconds .withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") // send heartbeat every 10 seconds i.e. 1/3 * session.timeout.ms .withProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000") // how many records to fetch in each poll( ) .withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100") Consumer.atMostOnceSource(settings, Subscriptions.topics(topic)).map(_.value)

我有1个Kafka实例在我的本地计算机上运行.我通过控制台生产者将值推送到主题中,然后看到它们已打印出来.然后我杀死Kafka,然后重新启动以查看源是否重新连接.

I have 1 instance of Kafka running on my local machine. I push values into the topic via the console producer and see them printed out. Then I kill Kafka, and restart it to see if the source reconnects.

这些是我的日志的处理方式:

These are how my logs proceed:

* Connection with /192.168.0.1 disconnected java.ConnectException: Connection refused * Give up sending metadata request since no node is available * Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds * Resuming partition test-events-0 * Error while fetching metadata with correlation id 139 : {test-events=INVALID_REPLICATION_FACTOR} * Sending metadata request (type=MetadataRequest, topics=test-events) to node 0 * Sending GroupCoordinator request for group mytestgroup to broker 192.168.0.1:9092 (id: 0 rack: null) * Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds * Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797713078, latencyMs=70, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=166,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup * Error while fetching metadata with correlation id 169 : {test-events=INVALID_REPLICATION_FACTOR} * Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797716169, latencyMs=72, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=196,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup 09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group coordinator lookup for group mytestgroup failed: The group coordinator is not available. 09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Coordinator discovery failed for group mytestgroup, refreshing metadata * Initiating API versions fetch from node 2147483647 * Offset commit for group mytestgroup failed: This is not the correct coordinator for this group. * Marking the coordinator 192.168.43.25:9092 (id: 2147483647 rack: null) dead for group mytestgroup * The Kafka consumer has closed.

如何确保此源重新连接并继续处理日志?

How do I make sure that this Source reconnects and continues processing the logs?

推荐答案

我认为您至少需要2个经纪人.如果其中一个失败,则另一个可以完成这项工作,而您可以重新启动另一个.

I think you need to have at least 2 brokers. If one fails the other one can do the job and you could restart the other one.

更多推荐

Kafka重新启动时如何使Kafka Source重新连接

本文发布于:2023-10-28 17:54:14,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1537357.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:启动时   Kafka   Source

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!