问题描述
限时送ChatGPT账号..我的 kafka 消费者代码中有以下行.
I have the following line in my kafka consumer's code.
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
如何将此流行"反序列化为原始对象?通过将类扩展为可序列化,在 kafka 生产者中实现了可序列化.我正在使用 Scala 在 spark 中实现这一点.
How to deserialize this stream "lines" into original object? Serialisability was implemented in the kafka producer by extending class to serialisable. I am implementing this in spark using scala.
推荐答案
你需要实现一个自定义的Decoder 并将预期的类型信息与解码器一起提供给 createStream 函数.
You need to implement a custom Decoder and provide the expected type information together with the decoder to the createStream function.
KafkaUtils.createStream[KeyType, ValueType, KeyDecoder, ValueDecoder] (...)
例如,如果您使用 String
作为键和 CustomContainer
作为值,您的流创建将如下所示:
For example, if you are using String
as key and CustomContainer
as value, your stream creation will look like this:
val stream = KafkaUtils.createStream[String, CustomContainer, StringDecoder, CustomContainerDecoder](...)
鉴于您将消息以 new KeyedMessage[String,String]
的形式发送到 kafka,正确的解码器是这样的字符串解码器:
Given that you are enconding the messages to kafka as new KeyedMessage[String,String]
, the right decoder is a string decoder like this:
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](topic,...)
这会给你一个 DStream[String,String]
作为你处理的基础.
that will give you a DStream[String,String]
as basis for your processing.
如果你想发送/接收一个特定的对象类型,你需要实现一个 Kafka 编码器 和 解码器.幸运的是,PcapPacket
已经实现了你需要的方法:
If you want to send/receive a specific object type you need to implement a Kafka Encoder and Decoder for it.
Luckily for you, PcapPacket
already implements the methods that you require to do that:
PcapPacket -> byte[]: public int transferStateAndDataTo(byte[] buffer)
byte[] -> PcapPacket:public PcapPacket(byte[] buffer)
byte[] -> PcapPacket: public PcapPacket(byte[] buffer)
其余的是样板代码,用于实现 Kafka 所需的编码器/解码器接口.
The rest is boilerplate code to implement the Encoder/Decoder interfaces required by Kafka.
这篇关于如何使用scala在kafka消费者中实现反序列化?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论