我使用Reactive Kafka创建消费者记录的Source,如下所示:
I create a Source of consumer records using Reactive Kafka as follows:
val settings = ConsumerSettings(system, keyDeserializer, valueDeserializer) .withBootstrapServers(bootstrapServers) .withGroupId(groupName) // what offset to begin with if there's no offset for this group .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // do we want to automatically commit offsets? .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") // auto-commit offsets every 1 minute, in the background .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") // reconnect every 1 second, when disconnected .withProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000") // every session lasts 30 seconds .withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") // send heartbeat every 10 seconds i.e. 1/3 * session.timeout.ms .withProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000") // how many records to fetch in each poll( ) .withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100") Consumer.atMostOnceSource(settings, Subscriptions.topics(topic)).map(_.value)我有1个Kafka实例在我的本地计算机上运行.我通过控制台生产者将值推送到主题中,然后看到它们已打印出来.然后我杀死Kafka,然后重新启动以查看源是否重新连接.
I have 1 instance of Kafka running on my local machine. I push values into the topic via the console producer and see them printed out. Then I kill Kafka, and restart it to see if the source reconnects.
这些是我的日志的处理方式:
These are how my logs proceed:
* Connection with /192.168.0.1 disconnected java.ConnectException: Connection refused * Give up sending metadata request since no node is available * Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds * Resuming partition test-events-0 * Error while fetching metadata with correlation id 139 : {test-events=INVALID_REPLICATION_FACTOR} * Sending metadata request (type=MetadataRequest, topics=test-events) to node 0 * Sending GroupCoordinator request for group mytestgroup to broker 192.168.0.1:9092 (id: 0 rack: null) * Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds * Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797713078, latencyMs=70, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=166,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup * Error while fetching metadata with correlation id 169 : {test-events=INVALID_REPLICATION_FACTOR} * Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797716169, latencyMs=72, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=196,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup 09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group coordinator lookup for group mytestgroup failed: The group coordinator is not available. 09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Coordinator discovery failed for group mytestgroup, refreshing metadata * Initiating API versions fetch from node 2147483647 * Offset commit for group mytestgroup failed: This is not the correct coordinator for this group. * Marking the coordinator 192.168.43.25:9092 (id: 2147483647 rack: null) dead for group mytestgroup * The Kafka consumer has closed.如何确保此源重新连接并继续处理日志?
How do I make sure that this Source reconnects and continues processing the logs?
推荐答案我认为您至少需要2个经纪人.如果其中一个失败,则另一个可以完成这项工作,而您可以重新启动另一个.
I think you need to have at least 2 brokers. If one fails the other one can do the job and you could restart the other one.
更多推荐
Kafka重新启动时如何使Kafka Source重新连接
发布评论