3 Kafka开发实战

编程入门 行业动态 更新时间:2024-10-08 22:59:27

3 Kafka开发<a href=https://www.elefans.com/category/jswz/34/1769775.html style=实战"/>

3 Kafka开发实战

3 Kafka开发实战

3.1 消息的发送与接收

生产者主要的对象有: KafkaProducer ,ProducerRecord。

其中 KafkaProducer 是用于发送消息的类, ProducerRecord 类用于封装Kafka的消息。

KafkaProducer 的创建需要指定的参数和含义:

其他参数可以从 org.apache.kafka.clients.producer.ProducerConfig 中找到。我们后面的内容会介绍到。

消费者生产消息后,需要broker端的确认,可以同步确认,也可以异步确认。同步确认效率低,异步确认效率高,但是需要设置回调对象。

生产者1

package com.lagou.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;public class MyProducer1 {public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {Map<String, Object> configs = new HashMap<>();//设置连接Kafka的初始连接用到的服务器地址 如果是集群,则可以通过此初始连接发现集群中的其他brokerconfigs.put("bootstrap.servers", "172.16.45.128:9092");//设置key的序列化器configs.put("key.serializer", "org.apache.kafkamon.serialization.IntegerSerializer");// 设置value的序列化器configs.put("value.serializer", "org.apache.kafkamon.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);// 用于封装Producer的消息ProducerRecord<Integer, String> record = newProducerRecord<>("topic_1", // 主题名称0, // 分区编号,现在只有一个分区,所以是00, // 数字作为key"message 1" // 字符串作为value);// 发送消息,同步等待消息的确认producer.send(record).get(3_000, TimeUnit.MILLISECONDS);// 关闭生产者producer.close();}
}

生产者2

package com.lagou.kafka.producer;import org.apache.kafka.clients.producer.*;import java.util.Properties;public class MyProducer2 {public static void main(String[] args) {Properties configs = new Properties();configs.put("bootstrap.servers", "172.16.45.128:9092");configs.put("key.serializer", "org.apache.kafkamon.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafkamon.serialization.StringSerializer");KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);ProducerRecord<Integer, String> record = new ProducerRecord<>("topic_1", 0, 1, "hello kafka");//使用回调异步等待消息的确认producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + "\n"+ "分区:" + recordMetadata.partition() + "\n"+ "偏移量:" + recordMetadata.offset() + "\n"+ "序列化的key字节:" + recordMetadata.serializedKeySize() + "\n"+ "序列化的value字节:" + recordMetadata.serializedValueSize() + "\n"+ "时间戳:" + recordMetadata.timestamp());} else {System.out.println("有异常:" + e.getMessage());}}});// 关闭连接producer.close();}
}

生产者3

package com.lagou.kafka.producer;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;public class MyProducer3 {public static void main(String[] args) {Properties configs = new Properties();configs.put("bootstrap.servers", "172.16.45.128:9092");configs.put("key.serializer", "org.apache.kafkamon.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafkamon.serialization.StringSerializer");KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);for (int i = 100; i < 200; i++) {ProducerRecord<Integer, String> record = new ProducerRecord<>("topic_1", 0, 1, "hello kafka" + i);//使用回调异步等待消息的确认producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + "\n"+ "分区:" + recordMetadata.partition() + "\n"+ "偏移量:" + recordMetadata.offset() + "\n"+ "序列化的key字节:" + recordMetadata.serializedKeySize() + "\n"+ "序列化的value字节:" + recordMetadata.serializedValueSize() + "\n"+ "时间戳:" + recordMetadata.timestamp());} else {System.out.println("有异常:" + e.getMessage());}}});}// 关闭连接producer.close();}
}

消息消费流程:

消费者

package com.lagou.kafka.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafkamon.TopicPartition;import java.util.*;
import java.util.regex.Pattern;public class MyConsumer1 {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();// 指定bootstrap.servers属性作为初始化连接Kafka的服务器。 如果是集群,则会基于此初始化连接发现集群中的其他服务器。configs.put("bootstrap.servers", "172.16.45.128:9092");// key的反序列化器configs.put("key.deserializer", "org.apache.kafkamon.serialization.IntegerDeserializer");// value的反序列化器configs.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");configs.put("group.id", "consumer.demo");configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 创建消费者对象KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);
//        final Pattern pattern = Patternpile("topic_\\d");
//        final Pattern pattern = Patternpile("topic_[0-9]");
//        consumer.subscribe(pattern);
//        consumer.subscribe(pattern, new ConsumerRebalanceListener() {// 消费者订阅主题或分区List<String> topics = Arrays.asList("topic_1");consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {partitions.forEach(tp -> {System.out.println("剥夺的分区:" + tp.partition());});}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {partitions.forEach(tp -> {System.out.println(tp.partition());});}});// 拉取订阅主题的消息ConsumerRecords<Integer, String> records = consumer.poll(3_000);// 获取topic_1主题的消息Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");// 遍历topic_1主题消息topic1Iterable.forEach(record -> {System.out.println("========================================");System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));System.out.println("消息的key:" + record.key());System.out.println("消息的偏移量:" + record.offset());System.out.println("消息的分区号:" + record.partition());System.out.println("消息的序列化key字节数:" + record.serializedKeySize());System.out.println("消息的序列化value字节数:" + record.serializedValueSize());System.out.println("消息的时间戳:" + record.timestamp());System.out.println("消息的时间戳类型:" + record.timestampType());System.out.println("消息的主题:" + record.topic());System.out.println("消息的值:" + record.value());});}
}

3.2 SpringBoot Kafka

1. pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.3.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.lagou.kafka.demo</groupId><artifactId>demo-02-springboot-kafka</artifactId><version>0.0.1-SNAPSHOT</version><name>demo-02-springboot-kafka</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

2. application.properties

spring.application.name=springboot-kafka-02
server.port=8080# kafka的配置
spring.kafka.bootstrap-servers=linux121:9092#producer配置
spring.kafka.producer.key-serializer=org.apache.kafkamon.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafkamon.serialization.StringSerializer
# 生产者每个批次最多放多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用发送缓冲区大小,此处设置为32MB,32*1024*1024
spring.kafka.producer.buffer-memory=33554432#consumer配置
spring.kafka.consumer.key-deserializer=org.apache.kafkamon.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafkamon.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer02
# 如果在kafka中找不到当前消费者的偏移量,则直接将偏移量重置为最早的
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交的时间间隔
spring.kafka.consumer.auto-commit-interval=1000

3. Demo02SpringbootApplication.java

package com.lagou.kafka.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Demo02SpringbootKafkaApplication {public static void main(String[] args) {SpringApplication.run(Demo02SpringbootKafkaApplication.class, args);}}

4. KafkaConfig.java

package com.lagou.kafka.demo.config;import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {@Beanpublic NewTopic topic1() {return new NewTopic("nptc-01", 3, (short) 1);}@Beanpublic NewTopic topic2() {return new NewTopic("nptc-02", 5, (short) 1);}@Beanpublic KafkaAdmin kafkaAdmin() {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "linux121:9092");KafkaAdmin admin = new KafkaAdmin(configs);return admin;}@Bean@Autowiredpublic KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {// 覆盖ProducerFactory原有设置Map<String, Object> configsOverride = new HashMap<>();configsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);KafkaTemplate<Integer, String> template = new KafkaTemplate<Integer, String>(producerFactory, configsOverride);return template;}}

5. KafkaSyncProducerController.java

package com.lagou.kafka.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController
public class KafkaSyncProducerController {@Autowiredprivate KafkaTemplate<Integer, String> template;@RequestMapping("send/sync/{message}")public String send(@PathVariable String message) {final ListenableFuture<SendResult<Integer, String>> future = template.send("topic-spring-01", 0, 0, message);// 同步发送消息try {final SendResult<Integer, String> sendResult = future.get();final RecordMetadata metadata = sendResult.getRecordMetadata();System.out.println("sync发送消息成功:"+ metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}}

6. KafkaAsyncProducerController

package com.lagou.kafka.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaAsyncProducerController {@Autowiredprivate KafkaTemplate<Integer, String> template;@RequestMapping("send/async/{message}")public String send(@PathVariable String message) {final ListenableFuture<SendResult<Integer, String>> future = this.template.send("topic-spring-01", 0, 1, message);// 设置回调函数,异步等待broker端的返回结果future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {final RecordMetadata metadata = result.getRecordMetadata();System.out.println("async发送消息成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());}});return "success";}}

7. MyConsumer.java

package com.lagou.kafka.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class MyConsumer {@KafkaListener(topics = "topic-spring-01")public void onMessage(ConsumerRecord<Integer, String> record) {System.out.println("消费者收到的消息:"+ record.topic() + "\t"+ record.partition() + "\t"+ record.offset() + "\t"+ record.key() + "\t"+ record.value());}}

 

 

更多推荐

3 Kafka开发实战

本文发布于:2024-02-28 05:17:11,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1768453.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:实战   Kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!