kafka的测试用例

编程入门 行业动态 更新时间:2024-10-23 13:26:05

kafka的<a href=https://www.elefans.com/category/jswz/34/1771117.html style=测试用例"/>

kafka的测试用例

环境(maven管理)

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.8.2</artifactId><version>0.8.1</version>
</dependency>

生产者:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;import java.util.Properties;
import java.util.UUID;/**
 * Created by Administrator on 2016/8/22.
 */
public class KafkaProducerSimple {public static void main(String[] args) {long startTime = System.currentTimeMillis();/**
                 * 1、指定当前kafka producer生产的数据的目的地
                 *  创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
                 *  bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
                 */
                // "test2"
                String TOPIC =args[0];/**
                 * 2、读取配置文件
                 */
                Properties props = new Properties();/*
         * key.serializer.class默认为serializer.class
               */
                props.put("serializer.class", "kafka.serializer.StringEncoder");/*
               * kafka broker对应的主机,格式为host1:port1,host2:port2
               */
                //"192.168.199.9:9092"
                props.put("metadata.broker.list",args[1] );/*
         * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
               * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
               * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
               * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
               * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
               * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
               * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
               * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
               */
                props.put("request.required.acks", "1");/*
               * 可选配置,如果不配置,则使用默认的partitioner partitioner.class
               * 默认值:kafka.producer.DefaultPartitioner
               * 用来把消息分到各个partition中,默认行为是对key进行hash。
               */
                props.put("partitioner.class", "netty.kafka.MyLogPartitioner");
//        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
                /**
                 * 3、通过配置文件,创建生产者
                 */
                Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));/**
                 * 4、通过for循环生产数据
                 */
                int nums = Integer.parseInt(args[2]);System.out.println(args[0]+"--"+args[1]+"--"+args[2]);for (int messageNo = 0; messageNo < nums; messageNo++) {/**
                         * 5、调用producer的send方法发送数据
                         * 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
                         */
                //245,678980,96cc7f50-25d8-4f48-9a2f-eabc95c9ba9b,广东省韶关市浈江区乐园镇金沙居委会金沙中巷9号江畔花园A栋403房, , ,113.58011051,24.76961320,66424120.00000000,广东省,440000,韶关市,440200,浈江区,440204,乐园镇,440204036,金沙居委会,440204036036,金沙中巷,4402040000688,9号,440204036000093,江畔花园,440204036000001,,,,,,,,,,,,,A栋,440204036000063,,,403房,440204036000000000612,004,003,浈江区分局,440204000000,南山派出所,440204520000, , , , , , ,1,,,2014/6/9,,, , , , ,, , , ,房间,440204036000000000612,403房,08, , ,10, ,广东省韶关市浈江区金沙中巷9号江畔花园A栋403房,E2F35E92-2E05-6CB4-DE04-02B0A37016C4, ,1,1,113.58011051,24.7696132,113.58011051,24.7696132,,,,,0,0,3,0C

                        producer.send(new KeyedMessage<String, String>(TOPIC, messageNo+","+(60000+messageNo)+",96cc7f50-25d8-4f48-9a2f-eabc95c9ba9b,广东省韶关市浈江区乐园镇金沙居委会金沙中巷9号江畔花园A栋403房, , ,113.58011051,24.76961320,66424120.00000000,广东省,440000,韶关市,440200,浈江区,440204,乐园镇,440204036,金沙居委会,440204036036,金沙中巷,4402040000688,9号,440204036000093,江畔花园,440204036000001,,,,,,,,,,,,,A栋,440204036000063,,,403房,440204036000000000612,004,003,2040392-2E05-6CB4-DE04-02B0A37016C4, ,1,1,,0,0,3,0C\n"));}long endTime = System.currentTimeMillis();float seconds = (endTime - startTime) / 1000F;System.out.println(Float.toString(seconds) + " seconds.");}
}

 
 
消费者代码:
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.log4j.Logger;import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/**
 * Created by Administrator on 2016/8/22.
 */
public class KafkaConsumerSimple implements Runnable {// private static Logger logger = Logger.getLogger(KafkaConsumerSimple.class);
        public String title;public KafkaStream<byte[], byte[]> stream;public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {this.title = title;this.stream = stream;}@Override
        public void run() {System.out.println("开始运行 " + title);ConsumerIterator<byte[], byte[]> it = stream.iterator();/**
                 * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
                 * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
                 * */
                while (it.hasNext()) {MessageAndMetadata<byte[], byte[]> data = it.next();String topic = data.topic();int partition = data.partition();long offset = data.offset();String msg = new String(data.message());System.out.println(String.format("Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",title, topic, partition, offset, msg));}System.out.println(String.format("Consumer: [%s] exiting ...", title));}public static void main(String[] args) throws Exception{Properties props = new Properties();props.put("group.id", args[0]);// "192.168.199.6:2181,192.168.199.7:2181,192.168.199.8:2181"
                props.put("zookeeper.connect",args[1]);//largest 读即信息    smallest 根据zk位置开始读
                props.put("auto.offset.reset", "smallest");//动提交的时间间隔
                props.put("automit.interval.ms", "1000");props.put("partition.assignment.strategy", "roundrobin");ConsumerConfig config = new ConsumerConfig(props);//"test2"
                String topic1 =args[2] ;String topic2 = "paymentMq";System.out.println(args[0]+"--"+args[1]+"--"+args[2]);//logger.info(args[0]+"--"+args[1]+"--"+args[2]);
                //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
                ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);//定义一个map
                Map<String, Integer> topicCountMap = new HashMap<>();topicCountMap.put(topic1, 3);//Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流
                Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);//取出 `kafkaTest` 对应的 streams
                List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);/* //创建一个容量为4的线程池
                ExecutorService executor = Executors.newFixedThreadPool(3);
                //创建20个consumer threads
                for (int i = 0; i < streams.size(); i++)
                        executor.execute(new KafkaConsumerSimple("消费者" + (i + 1), streams.get(i)));*/
                KafkaStream<byte[], byte[]> stream = streams.get(0);ConsumerIterator<byte[], byte[]> it = stream.iterator();while(it.hasNext()){System.out.println(new String(it.next().message()));}}
}

 
 

更多推荐

kafka的测试用例

本文发布于:2024-02-25 15:11:52,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1699384.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:测试   kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!