如何转换/分叉Kafka流并将其发送到特定主题?

编程入门 行业动态 更新时间:2024-10-28 12:25:58
本文介绍了如何转换/分叉Kafka流并将其发送到特定主题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在尝试使用功能"mapValues"将在原始流"textlines"中获得的字符串值转换为JSONObject Messages到newStream中.然后将在newStream中获得的所有内容流式传输到一个名为"testoutput"的主题.但是,每当一条消息实际经过转换块时,我都会得到一个NullPointerException,错误仅指向kafka流库.不知道怎么回事:((

I am Trying to transform the string value obtained in my original stream "textlines" into JSONObject Messages using the function "mapValues" into newStream. Then stream whatever I get in newStream onto a topic called "testoutput". But everytime a message actually goes through the transformation block I get a NullPointerException with errors pointing only into kafka stream libraries. Have no idea what is going on :((

P.S.当我从原始流派生/创建新的kafka流时,新流是否属于原始构建器?因为我需要构建器来创建KafkaStreams对象并开始流式传输,所以我不确定是否需要对新流进行其他操作,而不仅仅是指定其.to("topic")

P.S. When I fork/create a new kafka stream from the original stream, does the new stream belong to the original builder? Since I need builder to create the KafkaStreams Object and start streaming I am not sure if I need to do something else with the new stream other than just specifying where its going .to("topic")

//Testing a Kafka Stream Application public class testStream { public static void main(String[] args) throws Exception { //Configurations Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-teststream"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxx:xxxx"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //Building Stream KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> textlines = builder.stream("mytest2"); //Printout The Inputs just for testing purposes textlines.foreach(new ForeachAction<String, String>(){ public void apply(String key, String value){ for(int y=0; y<value.length(); y++){ System.out.print(value.charAt(y)); } System.out.print("\n"); } }); //Transform String Records to JSON Objects KStream<String, JSONObject> newStream = textlines.mapValues(new ValueMapper<String,JSONObject>(){ @Override public JSONObject apply(String value) { JSONObject jsnobj = new JSONObject(); //If the first 4 letters of the message is "xxxx" then parse it to a //JSON Object, otherwise create a dummy if(value.substring(0, 4).equals("xxxx")){ jsnobj.put("Header_Title", value.substring(0, 4)); jsnobj.put("Data_Part", value.substring(4)); }else{ jsnobj.put("Header_Title", "Not xxxx"); jsnobj.put("Data_Part", "None"); } return jsnobj; } }); //Specify target newStream.to("testoutput"); //Off you go KafkaStreams streams=new KafkaStreams(builder, props); streams.start(); } }

推荐答案

据我所知,您的问题是此行:

From what I can tell your problem is this line:

newStream.to("testoutput");

newStream的类型为KStream<String, JSONObject>.

但是,默认情况下,您的应用程序配置为使用String Serde来序列化/反序列化记录键和记录值:

However, your application is configured to use, by default, a String serde to serialize/deserialize record keys and record values:

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

这意味着,当您未在to()调用中提供显式Serdes时,Kafka Streams将尝试将您的newStream作为KStream<String, String>(而不是KStream<String, JSONObject>)写回到Kafka.

This means that, when you do not provide explicit serdes in the to() call, Kafka Streams will attempt to write your newStream as KStream<String, String> (rather than KStream<String, JSONObject>) back to Kafka.

您需要做的是在to()调用中提供显式的serdes:

What you need to do is to provide explicit serdes in the to() call:

// Sth like this newStream.to(Serdes.String(), myJsonSerde, "testoutput");

不幸的是,Kafka尚未包括现成的JSON Serde(已计划).幸运的是,您可以查看(并复制)Kafka自己的演示应用程序中包含的针对Kafka Streams API的示例JSON serde: github/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview

Unfortunately, Kafka doesn't include an out-of-the-box JSON serde yet (it's planned). Fortunately, you can look at (and copy) the example JSON serde included in Kafka's own demo applications for the Kafka Streams API: github/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview

更多推荐

如何转换/分叉Kafka流并将其发送到特定主题?

本文发布于:2023-07-13 22:20:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1102474.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:发送到   主题   并将其   Kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!