【Kafka】Spring

编程入门 行业动态 更新时间:2024-10-06 23:20:51

【<a href=https://www.elefans.com/category/jswz/34/1769633.html style=Kafka】Spring"/>

【Kafka】Spring

Spring-kafka 配置是以 spring.kafka 为前缀的配置信息进行注入的,按照这种形式定义的 kafka 配置信息,在项目启动时会自动读取并配置到 kafka 实例中。当然也可以在配置文件中自定义配置的名称,对应的则需要手动封装配置信息的映射表,并创建相应的 Factory 和 KafkaTemplate 容器。

添加启动 kafka 需要的基本配置:

# kafka producer
kafka.servers=127.0.0.1:9092
kafka.producer.topic=kafka-demo-queue
kafka.producer.sasl.username=demo
kafka.producer.sasl.password=demo# kafka consumer
kafka.consumer.topic=kafka-demo-queue
kafka.consumer.group.id=kafka-demo-group
kafka.consumer.sasl.username=demo
kafka.consumer.sasl.password=demo

这里的 demo 是连接同一个 kafka 服务器(集群),因此 server ip 用的同一个,而且是自己生产消息,自己消费,所以使用同一个 topic。group id 是消费方用来筛选目标消息用的。kafka 服务器认证的账号、密码则是由服务器配置,控制业务方权限。

然后就是配置 kafka 生产者和消费者的启动配置,由于大部分配置都有其默认配置,所以这里只展示必须的一些配置项:

@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${kafka.servers}")private String servers;@Value("${kafka.producer.sasl.username}")private String userName;@Value("${kafka.producer.sasl.password}")private String password;@Beanpublic KafkaProducer<String, String> initKafkaProducer() {return new KafkaProducer<>(kafkaProducerConfig());}private Map<String, Object> kafkaProducerConfig() {Map<String, Object> props = new HashMap<>(8);// kafka 服务器地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);// 消息序列化类型props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 认证信息校验相关props.put("sasl.jaas.config", "org.apache.kafkamon.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");return props;}
}
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${kafka.servers}")private String servers;@Value("${kafka.consumer.topic}")private String topic;@Value("${kafka.consumer.group.id}")private String groupId;@Value("${kafka.consume.sasl.username}")private String userName;@Value("${kafka.consume.sasl.password}")private String password;@Beanpublic KafkaConsumer<String, String> initKafkaConsumer() {KafkaConsumer<String, String> consumer = new KafkaProducer<>(kafkaConsumerConfig());consumer.subscribe(topic);return consumer;}private Map<String, Object> kafkaConsumerConfig() {Map<String, Object> props = new HashMap<>(8);// kafka 服务器地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);// 消息序列化类型props.put(ConsumerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 认证信息校验相关props.put("sasl.jaas.config", "org.apache.kafkamon.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");return props;}
}

生产者发送

// 异步获取发送结果
template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {@Overridepublic void onFailure(Throwable throwable) {......}@Overridepublic void onSuccess(SendResult<Object, Object> objectObjectSendResult) {....}});// 同步获取发送结果
ListenableFuture<SendResult<Object,Object>> future = template.send("demo-topic","sync message send!");
try {SendResult<Object,Object> result = future.get();
}catch (Throwable e){e.printStackTrace();
}

消费者监听

  • 直接消费字符串内容
@KafkaListener(id = "demo_group_id", topics = "demo_topic")
public void consume(String msg) {logger.info("Consume message is: {}" , msg);
}
  • 手动 ack
@KafkaListener(id = "demo_group_id", topics = "demo_topic")
public String consume(String msg, Acknowledgment ack) {logger.info("input value: {}", msg);ack.acknowledge();return "successful";
}
  • 消费整个消息记录(包含分组、偏移量等)
@KafkaListener(topics = "demo_topic", containerFactory = "demoKafkaListenerContainerFactory")
public void consume(ConsumerRecord<String, String> record) {logger.info("Consume record: partition-[{}], offset-[{}]", record.partition(), record.offset());logger.info("Consume message is {}: {}.", record.key(), record.value());
}
  • 配置并发数,获取消费实例
@KafkaListener(topics = "demo_topic", containerFactory = "demoKafkaListenerContainerFactory", concurrency = "3")
public void consume(ConsumerRecord<String, String> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer<String, String> consumer) {logger.info("Consume record: partition-[{}], offset-[{}]", record.partition(), record.offset());logger.info("Consume message is {}: {}.", record.key(), record.value());consumermitSync();
}

接收回复的生产发送

ReplyingKafkaTemplate 继承了父类 KafkaTemplate,在这个基础上增加了 sendAndReceive 方法,实现了在消息发送的同时,接收消费方回复的功能。方法的具体定义为:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

其生产者配置在基本的 producerTemplate 基础下,补充下列的配置:

@Configuration
public class KafkaProducerConfig {@Beanpublic ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("send_and_reply");repliesContainer.getContainerProperties().setGroupId("reply_group");repliesContainer.setAutoStartup(false);return repliesContainer;}@Beanpublic ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> factory, ConcurrentMessageListenerContainer<String, String> container) {return new ReplyingKafkaTemplate(factory, container);}// ...... KafkaTemplate ......
}

完成相关配置后,就可以写一个简单的 demo 验证一下它的功能:

@RestController
public class KafkaController {@Autowiredprivate ReplyingKafkaTemplate template;@GetMapping("/kafka/send_and_receive")@Transactional(rollbackFor = RuntimeException.class)public void sendAndReceive(String input) throws Exception {ProducerRecord<String, String> record = new ProducerRecord<>("demo-topic", input);RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);ConsumerRecord<String, String> consumerRecord = replyFuture.get();System.out.println("Reply message: " + consumerRecord.value());}@KafkaListener(id = "reply_group", topics = "demo-topic")@SendTopublic String listen(String input) {logger.info("input value: {}", input);return "successful";}
}

Kafka 事务消息

默认情况下,Spring-kafka通过配置等方式启动了相关的 kafka 生产和消费服务,KafkaTemplate 实例是不具有事务消息处理能力的。如果需要支持事务特性,可以通过添加特定配置来激活,但是必须注意的是,一旦事务特性被激活,所有的消息发送逻辑都需要封装在事务方法内执行,否则会抛出无事务交易的异常。

Spring-kafka 的事务是基于 Kafka-client 的事务消息功能实现的,我们可以通过配置激活 kafka 消息事务特性:

kafka.producer.transaction-id-prefix=kafka_tx

事务的概念就是,当所有的流程都执行完成了才算成功,中途任何一步出现异常了,前面执行的操作都进行回滚。对于 kafka 来说,只有当事务内的消息发送动作都完成了,消费端才能接收到消息。

生产者使用事务消息的方式有两种:

// 方式一:使用模板实现的事务方法
@Autowired
private KafkaTemplate<String, Object> template;public void sendTransactionDemo(String input) {template.executeInTransaction(t -> {t.send("demo-topic", input);if (input == null || "".equals(input)) {throw new RuntimeException("invalid kafka input!");}t.send("demo-topic", "Second time sending: " + input);})
}// 方式二:事务注解 @Transactional
@Transactional(rollbackFor = RuntimeException.class)
public void sendTransactionDemo(String input) {t.send("demo-topic", input);if (input == null || "".equals(input)) {throw new Exception("invalid kafka input!");}t.send("demo-topic", "Second time sending: " + input);
}

更多推荐

【Kafka】Spring

本文发布于:2024-02-27 18:24:22,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1765353.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Kafka   Spring

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!