springboot对接kafka

编程入门 行业动态 更新时间:2024-10-10 04:27:19

<a href=https://www.elefans.com/category/jswz/34/1769943.html style=springboot对接kafka"/>

springboot对接kafka

第一次做为生产端进行接入kafka,网上找了一些相关的接入方式,整理了一份比较简单的接入方式

首先是对接的工具类 KafkaUtil.java

package com.ruoyi.soaworkflow.utils.kafka;import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafkamon.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/**kafka有用户名验证的配置
*/
@Component //添加注释进行实例化
public class KafkaUtil {//服务器地址 没密码使用PLAINTEXT前缀 有密码了使用SASL_PLAINTEXT前缀//public static final String servers="SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024";//要使用set方式读取配置文件里面内容将数据放到静态变量中private static String bootstrapServers;@Value("${spring.kafka.bpm.bootstrap-servers}")public void setBootstrapServers(String bootstrapServers){KafkaUtil.bootstrapServers = bootstrapServers;}private static Integer retries;@Value("${spring.kafka.bpm.producer.retries}")public void setRetries(Integer retries){KafkaUtil.retries = retries;}private static Integer batchSize;@Value("${spring.kafka.bpm.producer.batch-size}")public void setBatchSize(Integer batchSize){KafkaUtil.batchSize = batchSize;}private static Integer bufferMemory;@Value("${spring.kafka.bpm.producer.buffer-memory}")public void setBufferMemory(Integer bufferMemory){KafkaUtil.bufferMemory = bufferMemory;}private static Integer linger;@Value("${spring.kafka.bpm.producer.linger}")public void setLinger(Integer linger){KafkaUtil.linger = linger;}private static String acks;@Value("${spring.kafka.bpm.producer.acks}")public void setAcks(String acks){KafkaUtil.acks = acks;}private static String username;@Value("${spring.kafka.bpm.producer.username}")public void setUsername(String username){KafkaUtil.username = username;}private static String passwd;@Value("${spring.kafka.bpm.producer.passwd}")public void setPasswd(String passwd){KafkaUtil.passwd = passwd;}//kafka集群生产者配置public static KafkaProducer<String, String> getProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers );props.put(ProducerConfig.ACKS_CONFIG, acks);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.setProperty("security.protocol", "SASL_PLAINTEXT");props.setProperty("sasl.mechanism", "SCRAM-SHA-512");String jassc = "org.apache.kafkamon.security.scram.ScramLoginModule required username=" + username + " password=" + passwd + ";";props.setProperty("sasl.jaas.config", jassc);KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);return kp;}public static KafkaConsumer<String, String> getConsumer(String groupId,String username,String passwd) {Properties props = new Properties();props.put("bootstrap.servers", "SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024");props.put("auto.offset.reset", "earliest"); //必须要加,如果要读旧数据props.put("group.id", groupId);props.put("enable.automit", "false");props.put("automit.interval.ms", "100");props.put("max.partition.fetch.bytes", "10240");//每次拉取的消息字节数,10K?,每次取回20条左右props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");props.setProperty ("security.protocol", "SASL_PLAINTEXT");props.setProperty ("sasl.mechanism", "SCRAM-SHA-512");String jassc = "org.apache.kafkamon.security.scram.ScramLoginModule required username=" + username + " password=" + passwd + ";";props.setProperty("sasl.jaas.config", jassc);KafkaConsumer<String, String> kc = new KafkaConsumer<String, String>(props);return kc;}
}

然后是配置文件里面内容,当然也可以直接写死在工具类中

spring:kafka:bpm:bootstrap-servers: SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024producer:retries: 1     #发送失败后的重复发送次数batch-size: 2 #一次最多发送数据量buffer-memory: 33554432 #32M批处理缓冲区linger: 1000acks: allusername: aaaaa  passwd: bbbbb

pom文件中也要引入对应的jar

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

然后就是生产者 ProducerClient.java

package com.ruoyi.soaworkflow.utils.kafka;import java.util.concurrent.Future;import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;/*** 生产者*/
public class ProducerClient {/*** @param args*/public static void main(String[] args) {// TODO Auto-generated method stub}private static Producer<String, String> producer = KafkaUtil.getProducer();public static void sendToKafka(String topic,String processId,JSONObject bpmData) {try {final ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,processId, bpmData.toJSONString());Future<RecordMetadata> send = producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e != null) {e.printStackTrace();}}});System.out.println("sendToKafka-发送至Kafka:" + "d+key-" + processId);} catch (Exception e) {e.printStackTrace();}producer.close();}
}

消费端代码 ConsumerClient.java

package com.ruoyi.soaworkflow.utils.kafka;import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;/** 
消费者
*/
public class ConsumerClient {public  static KafkaConsumer<String, String> consumer = null;public static void main(String[] args) {fecthKafka();}public static void fecthKafka() {consumer = KafkaUtil.getConsumer("testGroup1","oaadmin","NTA4YjRhZDBmYjQ3"); //groupconsumer.subscribe(Arrays.asList("3_kjczxsmrtj"));//topicint i=0;while (true) {ConsumerRecords<String, String> records ;try {records = consumer.poll(Long.MAX_VALUE);//毫秒}catch (Exception e){e.printStackTrace();continue;}for (ConsumerRecord<String, String> record : records) {System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ",key: " + record.key() + ",value:" + record.value() );i++;System.out.println(i);}try {consumermitSync();} catch (Exception e) {e.printStackTrace();continue;}}}
}

更多推荐

springboot对接kafka

本文发布于:2024-02-06 05:45:41,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1746812.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:springboot   kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!