是否有适用于Spring Boot Kafka客户端的断路器?

编程入门 行业动态 更新时间:2024-10-27 14:18:36
本文介绍了是否有适用于Spring Boot Kafka客户端的断路器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

如果Kafka服务器(暂时)关闭,我的Spring Boot应用程序ReactiveKafkaConsumerTemplate一直尝试连接不成功,从而造成不必要的流量和日志文件混乱:

2021-11-10 14:45:30.265 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available. 2021-11-10 14:45:32.792 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected 2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available. 2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected

是否可以使用断路器(灵感here或here)之类的东西,以便Spring BootKafka客户端在出现故障时(或者更好的是连续几次故障)放慢尝试连接的速度,只有在服务器重新启动后才会恢复到正常速度?

是否已有现成的配置参数或任何其他解决方案?

我知道parameterreconnect.backoff.ms,这是我创建ReactiveKafkaConsumerTemplateBean的方式:

@Bean public ReactiveKafkaConsumerTemplate<String, MyEvent> kafkaConsumer(KafkaProperties properties) { final Map<String, Object> map = new HashMap<>(properties.buildConsumerProperties()); map.put(ConsumerConfig.GROUP_ID_CONFIG, "MyGroup"); map.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 10_000L); final JsonDeserializer<DisplayCurrencyEvent> jsonDeserializer = new JsonDeserializer<>(); jsonDeserializer.addTrustedPackages("com.example.myapplication"); return new ReactiveKafkaConsumerTemplate<>( ReceiverOptions .<String, MyEvent>create(map) .withKeyDeserializer(new ErrorHandlingDeserializer<>(new StringDeserializer())) .withValueDeserializer(new ErrorHandlingDeserializer<>(jsonDeserializer)) .subscription(List.of("MyTopic"))); }

消费者仍尝试每3秒连接一次。

推荐答案

参见kafka.apache/documentation/#consumerconfigs_retry.backoff.ms

尝试重新连接到给定主机之前等待的基本时间。这避免了在紧密环路中重复连接到主机。此退避适用于客户端到代理的所有连接尝试。

和kafka.apache/documentation/#consumerconfigs_reconnect.backoff.max.ms

重新连接到多次连接失败的代理时等待的最长时间(以毫秒为单位)。如果提供,则每次连续连接失败时,每台主机的退避将呈指数级增加,最高可达此最大值。计算退避增加后,增加20%的随机抖动以避免连接风暴。

更多推荐

是否有适用于Spring Boot Kafka客户端的断路器?

本文发布于:2023-11-17 14:11:20,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1610156.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:适用于   断路器   客户端   Boot   Spring

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!