我的Kafka自定义分区程序类出错(Error in my Kafka custom partitioner class)

编程入门 行业动态 更新时间:2024-10-23 15:27:09
我的Kafka自定义分区程序类出错(Error in my Kafka custom partitioner class)

我正在开发一个Kafka Custom分区程序类。 在这里,我试图将数据推送到单独的分区。 我的卡夫卡制片人班:

import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaCustomPartitioner { public static void main(String[] args) { long events = Long.parseLong(args[0]); int blocks = Integer.parseInt(args[1]); Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("serializer.class","kafka.serializer.StringEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class","com.kafka.partdecider.CustomPartitioner"); props.put("producer.type", "sync"); props.put("request.required.acks","1"); ProducerConfig config = new ProducerConfig(props); Producer producer = new Producer(config); for(int nBlocks=0; nBlocks<blocks; nBlocks++) { for(long nEvents=0; nEvents<events; nEvents++) { long runTime = new Date().getTime(); String msg = runTime + ": " + (50+nBlocks) + ": " + nEvents + ": " + rnd; KeyedMessage<String, String> data = new KeyedMessage<String, String>("CustPartTopic",String.valueOf(nBlocks),msg); producer.send(data); } } producer.close(); } }

客户分配器类:

import kafka.producer.Partitioner; public class CustomPartitioner implements Partitioner { public int partition(Object key, int arg1) { String receivingkey = (String) key; long id = Long.parseLong(receivingkey); return (int) (id%arg1); } }

项目的参数部分具有以下值:3 2如果我运行该类,我将在此行获得“ArrayOutOfBoundsException”:

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0 at com.kafka.custompartitioner.KafkaCustomPartitioner.main(KafkaCustomPartitioner.java:13)

错误显示在以下行: long events = Long.parseLong(args[0]); 但是我不明白为什么那条线会给出错误。 谁能告诉我怎么解决这个问题?

I am working on a Kafka Custom partitioner class. Here I am trying to push the data into separate partitions. My Kafka producer class:

import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaCustomPartitioner { public static void main(String[] args) { long events = Long.parseLong(args[0]); int blocks = Integer.parseInt(args[1]); Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("serializer.class","kafka.serializer.StringEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class","com.kafka.partdecider.CustomPartitioner"); props.put("producer.type", "sync"); props.put("request.required.acks","1"); ProducerConfig config = new ProducerConfig(props); Producer producer = new Producer(config); for(int nBlocks=0; nBlocks<blocks; nBlocks++) { for(long nEvents=0; nEvents<events; nEvents++) { long runTime = new Date().getTime(); String msg = runTime + ": " + (50+nBlocks) + ": " + nEvents + ": " + rnd; KeyedMessage<String, String> data = new KeyedMessage<String, String>("CustPartTopic",String.valueOf(nBlocks),msg); producer.send(data); } } producer.close(); } }

Customer Partitioner Class:

import kafka.producer.Partitioner; public class CustomPartitioner implements Partitioner { public int partition(Object key, int arg1) { String receivingkey = (String) key; long id = Long.parseLong(receivingkey); return (int) (id%arg1); } }

The project's arguments section has the values: 3 2 I am getting "ArrayOutOfBoundsException" at this line if I run the class:

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0 at com.kafka.custompartitioner.KafkaCustomPartitioner.main(KafkaCustomPartitioner.java:13)

The error is shown at the line:long events = Long.parseLong(args[0]); But I don't understand why is that line giving the error. Could anyone let me know how can I fix this ?

最满意答案

这对我有用,API非常不同:

package mypackage.io; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Date; import java.util.Properties; import java.util.Random; import java.util.concurrent.ExecutionException; public class KafkaCustomPartitioner { public static void main(String[] args) throws InterruptedException, ExecutionException { long events = Long.parseLong(args[0]); int blocks = Integer.parseInt(args[1]); Random rnd = new Random(); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "mypackage.io.CustomPartitioner"); props.put(ProducerConfig.ACKS_CONFIG, "1"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); for(int nBlocks=0; nBlocks<blocks; nBlocks++) { for(long nEvents=0; nEvents<events; nEvents++) { long runTime = new Date().getTime(); String msg = runTime + ": " + (50+nBlocks) + ": " + nEvents + ": " + rnd; producer.send(new ProducerRecord<String, String>("CustPartTopic", String.valueOf(nBlocks), msg)).get(); } } producer.close(); } }

然后是自定义分区程序

package mypackage.io; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class CustomPartitioner implements Partitioner { public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String receivingkey = (String) key; long id = Long.parseLong(receivingkey); int numPartitions = cluster.availablePartitionsForTopic(topic).size(); return (int) (id % numPartitions); } public void close() { } public void configure(Map<String, ?> map) { } }

This works for me, the API are quite different :

package mypackage.io; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Date; import java.util.Properties; import java.util.Random; import java.util.concurrent.ExecutionException; public class KafkaCustomPartitioner { public static void main(String[] args) throws InterruptedException, ExecutionException { long events = Long.parseLong(args[0]); int blocks = Integer.parseInt(args[1]); Random rnd = new Random(); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "mypackage.io.CustomPartitioner"); props.put(ProducerConfig.ACKS_CONFIG, "1"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); for(int nBlocks=0; nBlocks<blocks; nBlocks++) { for(long nEvents=0; nEvents<events; nEvents++) { long runTime = new Date().getTime(); String msg = runTime + ": " + (50+nBlocks) + ": " + nEvents + ": " + rnd; producer.send(new ProducerRecord<String, String>("CustPartTopic", String.valueOf(nBlocks), msg)).get(); } } producer.close(); } }

then the custom partitioner

package mypackage.io; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class CustomPartitioner implements Partitioner { public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String receivingkey = (String) key; long id = Long.parseLong(receivingkey); int numPartitions = cluster.availablePartitionsForTopic(topic).size(); return (int) (id % numPartitions); } public void close() { } public void configure(Map<String, ?> map) { } }

更多推荐

本文发布于:2023-08-06 22:20:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1456774.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:自定义   分区   程序   Kafka   class

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!