(三) Kafka的生产者消费者Api使用以及参数配置详解

编程入门 行业动态 更新时间:2024-10-08 22:48:50

(三) Kafka的<a href=https://www.elefans.com/category/jswz/34/1768323.html style=生产者消费者Api使用以及参数配置详解"/>

(三) Kafka的生产者消费者Api使用以及参数配置详解

  • 生产者代码

public class HelloKafkaProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();/*必要参数*/properties.put("bootstrap.servers", "s227:9092,s228:9092,s229:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put("value.serializer", "org.apache.kafkamon.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);CountDownLatch countDownLatch = new CountDownLatch(10000);for (int i = 0; i < 10000; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("test01", "test_key" + i, "test_value" + i);
//            Future<RecordMetadata> future = producer.send(record);
//            RecordMetadata recordMetadata = future.get();
//            if (null != recordMetadata) {
//                System.out.println(recordMetadata.offset() + "=>" + recordMetadata.partition());
//            }producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {if (null != exception) {exception.printStackTrace();}if (recordMetadata != null) {System.out.println(recordMetadata.offset() + "=>" + recordMetadata.partition());}countDownLatch.countDown();}});}countDownLatch.await();System.out.println("消息已经发送!");producer.close();}
}
  • 消费者代码

public class HelloKafkaConsumer {public static void main(String[] args) {Properties properties = new Properties();/*必要参数*/properties.put("bootstrap.servers", "s227:9092,s228:9092,s229:9092");properties.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");properties.put("group.id", "test01_group");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);String topic = "test01";consumer.subscribe(Collections.singleton(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(500);//500毫秒拉取一次数据records.forEach(record -> {System.out.println("分区:" + record.partition());System.out.println("消费的分区偏移量:" + record.offset());System.out.println("key:" + record.key());System.out.println("value:" + record.value());System.out.println("===========================================");});}}
}
  • 多线程模式下生产者代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafkamon.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 〈功能简述〉<br> * 〈〉** @author 张晓文* @create 2019/5/8* @since 1.0.0*/
public class KafkaConcurrentProducer {private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static CountDownLatch countDownLatch = new CountDownLatch(1000);private static DemoUser makeUser(int id) {DemoUser demoUser = new DemoUser(id);demoUser.setName("xiaowen" + id);return demoUser;}public static void main(String[] args) throws InterruptedException {Properties properties = new Properties();properties.put("bootstrap.servers", "s227:9092,s228:9092,s229:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put("value.serializer", "org.apache.kafkamon.serialization.StringSerializer");//        ProducerConfig.ACKS_CONFIG//thread safe classKafkaProducer<String, String> producer = new KafkaProducer<>(properties);for (int i = 0; i < 1000; i++) {DemoUser demoUser = makeUser(i);ProducerRecord<String, String> record = new ProducerRecord<>("test01",null,System.currentTimeMillis(),demoUser.getId() + "",demoUser.toString());ProducerWorker worker = new ProducerWorker(record, producer);executorService.submit(worker);}countDownLatch.await();executorService.shutdown();}private static class ProducerWorker implements Runnable {private ProducerRecord<String, String> record;private KafkaProducer<String, String> producer;public ProducerWorker(ProducerRecord<String, String> record, KafkaProducer<String, String> producer) {this.record = record;this.producer = producer;}@Overridepublic void run() {String id = Thread.currentThread().getId() + "-" + System.identityHashCode(producer);producer.send(record, (recordMetadata, exception) -> {if (null != exception) {exception.printStackTrace();}if (recordMetadata != null) {System.out.println("偏移量:" + recordMetadata.offset() + "=>分区:" + recordMetadata.partition());}System.out.println(id + ":[数据-" + record + "-]已经发送完成");countDownLatch.countDown();});}}public static class DemoUser {private int id;private String name;@Overridepublic String toString() {return "DemoUser{" +"id=" + id +", name='" + name + '\'' +'}';}public String getName() {return name;}public void setName(String name) {this.name = name;}public DemoUser(int id) {this.id = id;}public int getId() {return id;}public void setId(int id) {this.id = id;}}}

 

  • Kafka生产者参数

1.acks 必须要多少个分区副本收到了消息

   0       1 (默认)       all 可靠性最高,延迟高

 2.buffer.memory 生产者缓冲区大小 (生产太快会导致阻塞或抛出异常)

   32M(默认)

3.max.block.ms  获取kafka元数据(分区,offset)的信息 等待最长时间

   默认60000ms 60秒

4.retries  生产者重试次数  消息发送失败最大尝试次数

  默认为0 不重试 
5.retry.backoff.ms  和4搭配使用

  上一次重发和下次重发间隔时间100ms(默认)

6.batch.size 消息占用内存大小

  16k(默认)

7.linger.ms  指明发送消息batchSize的时间 和6组合起作用 

   默认为0 (来一条发一条)

   6和7搭配使用 哪个先到就发送

8pression.type 

   none(默认),gzip,snappy

9.client.id

   标识符 设置任意字符串,用于做消息追踪 broker查看是从哪个生产者发送来的消息

10.max.in.flight.requests.per.connection

   默认为1

   生产者等到broker给响应才会继续生产 

11.max.request.size

  控制生产者生产消息的最大大小

  1M (默认) 1个请求 一个批次的消息的大小      和server.properties的message.max.bytes最好一样

更多推荐

(三) Kafka的生产者消费者Api使用以及参数配置详解

本文发布于:2024-02-07 00:11:48,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1751749.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:生产者   详解   消费者   参数   Kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!