SpringBoot 多组 Kafka 配置

编程入门 行业动态 更新时间:2024-10-28 21:17:57

SpringBoot <a href=https://www.elefans.com/category/jswz/34/1765890.html style=多组 Kafka 配置"/>

SpringBoot 多组 Kafka 配置

SpringBoot 多组 Kafka 配置

单组 Kafka 配置

时隔多日,冒个泡吧。

场景 是 我在日常的开发过程中需要监听 kafka 的消息进行回调处理,但是呢,不同的三方服务他们用了不同的 kafka 集群,那么默认的 Spring 自动读取的 kafka 配置就不行了,它默认只支持一组,那么就需要单独进行多组配置。

先说单组配置的场景,只需要在你的 yml 里增加配置

spring:kafka:bootstrap-servers: 192.168.25.11:9092,192.168.25.22:9092properties:security.protocol: SASL_PLAINTEXTsasl.mechanism: SCRAM-SHA-256sasl.jaas.config: org.apache.kafkamon.security.scram.ScramLoginModule required username="admin" password="admin";topic: your-topic

里面 security 和 sasl 用于鉴权

然后你就可以直接写个 consumer 来接受消息了

@Component
@Slf4j
public class Consumer {@KafkaListener(topics = "${spring.kafka.topic}", groupId = "your-group")public void consumeMsgLog(ConsumerRecord<?, ?> record) {// do everything}}

在上述配置和代码示例中,groupId 是 Kafka 消费者组的标识符,它在 Kafka 架构中起到了关键的角色。让我解释一下 groupId 在 Kafka 架构设计中的作用:

  1. Kafka 消费者组:Kafka 消费者组是一组 Kafka 消费者的逻辑集合,它们共同订阅一个或多个 Kafka 主题。消费者组中的每个消费者可以独立处理主题中的消息,而消费者组协调消息的分配和处理。
  2. 消息分发:Kafka主题中的每个分区中的消息可以被同一个消费者组的一个消费者处理。groupId 用于将消费者组中的消费者分配到分区,以确保消息被均匀地分发。这意味着每个分区的消息只能被消费者组中的一个消费者处理。
  3. Offset 管理groupId 还用于管理消息偏移量(offsets)。每个分区的消息都有一个偏移量,用于跟踪已处理的消息。Kafka维护每个消费者组的每个分区的偏移量,以确保消息不会被重复处理。这使得每个消费者组可以在不同时间点开始处理消息,并且不会丢失已处理的消息。
  4. 水平伸缩groupId 允许消费者组进行水平伸缩。您可以添加或删除消费者,而不会破坏分配的消息负载均衡。Kafka会根据消费者组的大小自动重新分配分区。

也就是说,一个主题中的消息,可以被多个消费者组消费,但是不能被同一个消费者组的多个消费者消费

在Kafka中,消费者组不需要显式地创建。当您的消费者开始订阅特定的主题时,如果指定了相同的 groupId,Kafka 会自动将这些消费者视为同一个消费者组。这意味着,只要您在消费者配置中指定了相同的 groupId,Kafka 就会自动将它们分配到同一个消费者组。

如果指定了不同的 groupId,Kafka 将把它们视为不同的消费者组,并且这些消费者组会独立地消费相同或不同的主题中的消息。

做了一些小小的铺垫,让我们进入正题

多组 Kafka 配置

Spring Kafka 提供了 ConcurrentKafkaListenerContainerFactory 以支持同时监听多个不同的 Kafka 集群或主题。可以为每个不同的 Kafka 集群或主题配置不同的 ConcurrentKafkaListenerContainerFactory 实例,以满足多组消费者需求。

所以就是我们自己定义加载配置而不是使用 Spring Boot 默认的预留配置。

那比如我有两组 Kafka 集群,为了省事,第一组我就用默认的,而另一组单独设置一组,然后进行ConcurrentKafkaListenerContainerFactory 的定制化注入

@Slf4j
@Configuration
public class KafkaConfiguration {@Value("${kafka.sec-kafka.consumer.bootstrap-servers:192.168.25.22:9092}")private String servers;@Value("${spring.kafka.properties.sasl.jaas.config}")private String jaasConfig;@Beanpublic ConsumerFactory<String, String> secKafkaConsumerFactory() {Map<String, Object> consumerProps = new HashMap<>();consumerProps.put("bootstrap.servers", servers);consumerProps.put("group.id", "your-group");consumerProps.put("enable.automit", "true");consumerProps.put("automit.interval.ms", "2000");consumerProps.put("key.deserializer", StringDeserializer.class);consumerProps.put("value.deserializer", StringDeserializer.class);consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 鉴权相关配置consumerProps.put("security.protocol", "SASL_PLAINTEXT");consumerProps.put("sasl.mechanism", "SCRAM-SHA-256");consumerProps.put("sasl.jaas.config", jaasConfig);return new DefaultKafkaConsumerFactory<>(consumerProps);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> secKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(secKafkaConsumerFactory());return factory;}}

解释一下配置

  1. bootstrap.servers:指定了 Kafka 服务器的地址和端口,这是连接到 Kafka 集群的入口点。

  2. group.id:指定了消费者所属的消费者组的标识符。Kafka 使用消费者组来协调消息分发,确保消息被均匀分发给消费者。

  3. enable.automit:指定是否启用自动提交偏移量。如果设置为 “true”,Kafka 消费者会自动定期提交偏移量,以记录已经处理的消息。如果设置为 “false”,您需要手动管理偏移量。

  4. automit.interval.ms:如果启用了自动提交,这个参数指定了自动提交偏移量的时间间隔,以毫秒为单位。

  5. key.deserializervalue.deserializer:指定用于反序列化消息键和值的反序列化器类。在这种情况下,它们都设置为 StringDeserializer.class,表示消息键和值都被视为字符串。

  6. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG:指定了当消费者启动时或者偏移量丢失时如何处理消息的偏移量。“earliest” 表示从最早的可用消息开始处理,“latest” 表示从最新的消息开始处理。

  7. 鉴权相关配置(SASL):这些配置用于设置 Kafka 消费者与 Kafka 集群之间的安全通信和身份验证。这包括 security.protocolsasl.mechanismsasl.jaas.config。它们指定了使用 SASL 加密和身份验证的方式,以及相应的配置信息。jaasConfig 包含了 SASL 配置的详细信息。

这些属性是 Kafka 消费者连接和配置的关键部分,它们确保了消费者可以连接到 Kafka 集群并以安全的方式处理消息

而这个时候你的 Consumer,只需要在注解里多一个配置 containerFactory

@Component
@Slf4j
public class SecConsumer {@KafkaListener(topics = "${kafka.topic}", groupId = "your-group" containerFactory = "secKafkaListenerContainerFactory")public void consumeMsgLog(ConsumerRecord<?, ?> record) {// do everything}}

更多使用方法可以参考官方文档 Spring for Kafka

更多推荐

SpringBoot 多组 Kafka 配置

本文发布于:2023-11-17 01:49:23,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1636670.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:多组   SpringBoot   Kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!