过滤卡夫卡流

编程入门 行业动态 更新时间:2024-10-21 09:35:28
本文介绍了过滤卡夫卡流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我一直在检查Kafka流.我一直在测试以下代码用于Kafka流

I have been checking Kafka streams. I have been testing the below code for Kafka streams

生产者主题:(这是第一个生产者主题-发送以下json数据)

Producer topic: (this is the first producer topic - which sends the below json data)

KafkaProducer<String, String> producer = new KafkaProducer<>( properties); producer.send(new ProducerRecord<String,String>(topic, jsonobject.toString())); producer.close();

JSON-主题的生产者:

JSON - Producer from topic:

{"UserID":"1","Address":"XXX","AccountNo":"234234","MemberName":"Stella","AccountType":"Savings"}

流主题代码:(这是第二个流代码和主题)

Stream Topic code: (this is the second Streaming code and topic)

builder.<String,String>stream(topic) .filter(new Predicate <String, String>() { @Override public boolean test(String key, String value) { // put you processor logic here System.out.println("value : " + value); return value.substring(0).equals("1"); } }) .to(streamouttopic); final KafkaStreams streams = new KafkaStreams(builder, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0);

如果UserID值为"1",我想进行过滤,然后将该数据发送到目标流主题.

I want to filer if UserID value is "1", then send that data to destination streaming topic.

当我使用".filter"并打印System.out.println("value:" + value);时,执行时会引发以下错误.

When I use ".filter" and print System.out.println("value : " + value);, it throws the below error when executing.

Exception in thread "SampleStreamProducer-a6bb543e-bb92-48d0-8d9f-225046722d81-StreamThread-1" java.lang.ClassCastException: [B cannot be cast to java.lang.String

如果我不使用".filter"并使用像这样的简单代码builder.stream(topic).to(streamouttopic);,则可以正常工作,但无需过滤.但是,我需要使用该过滤器.

If i don’t use ".filter" and use simple code like this, builder.stream(topic).to(streamouttopic); , it is working fine, but without filtering. But, I need to use that filter.

有人可以指导我进行修复吗?

Can someone guide me to fix it?

推荐答案

默认情况下,Kafka Streams假定数据类型为<byte[],byte[]>,并且byte[]无法转换为String.

By default, Kafka Streams assumes data type <byte[],byte[]> and a byte[] cannot be cast to a String.

将主题读为KStream时,需要指定正确的Serdes:

You need to specify the correct Serdes when reading the topic as KStream:

builder.<String,String>stream(topic, Consumed.with(Serdes.String(), Serdes.String()) .filter(...)

请查看示例并阅读文档:

Please check out the examples and read the docs:

  • github/confluentinc/kafka-streams-examples
  • kafka.apache/11/documentation/streams/
  • github/confluentinc/kafka-streams-examples
  • kafka.apache/11/documentation/streams/

更多推荐

过滤卡夫卡流

本文发布于:2023-11-26 00:29:02,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631932.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:卡夫卡

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!