如何使用scala在kafka消费者中实现反序列化?

编程入门 行业动态 更新时间:2024-10-23 08:36:11
本文介绍了如何使用scala在kafka消费者中实现反序列化?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我的 kafka 消费者代码中有以下行.

I have the following line in my kafka consumer's code.

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) 

如何将此流行"反序列化为原始对象?通过将类扩展为可序列化,在 kafka 生产者中实现了可序列化.我正在使用 Scala 在 spark 中实现这一点.

How to deserialize this stream "lines" into original object? Serialisability was implemented in the kafka producer by extending class to serialisable. I am implementing this in spark using scala.

推荐答案

你需要实现一个自定义的Decoder 并将预期的类型信息与解码器一起提供给 createStream 函数.

You need to implement a custom Decoder and provide the expected type information together with the decoder to the createStream function.

KafkaUtils.createStream[KeyType, ValueType, KeyDecoder, ValueDecoder] (...)

例如,如果您使用 String 作为键和 CustomContainer 作为值,您的流创建将如下所示:

For example, if you are using String as key and CustomContainer as value, your stream creation will look like this:

val stream = KafkaUtils.createStream[String, CustomContainer, StringDecoder, CustomContainerDecoder](...)  

鉴于您将消息以 new KeyedMessage[String,String] 的形式发送到 kafka,正确的解码器是这样的字符串解码器:

Given that you are enconding the messages to kafka as new KeyedMessage[String,String], the right decoder is a string decoder like this:

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](topic,...)

这会给你一个 DStream[String,String] 作为你处理的基础.

that will give you a DStream[String,String] as basis for your processing.

如果你想发送/接收一个特定的对象类型,你需要实现一个 Kafka 编码器 和 解码器.幸运的是,PcapPacket 已经实现了你需要的方法:

If you want to send/receive a specific object type you need to implement a Kafka Encoder and Decoder for it. Luckily for you, PcapPacket already implements the methods that you require to do that:

PcapPacket -> byte[]: public int transferStateAndDataTo(byte[] buffer)

byte[] -> PcapPacket:public PcapPacket(byte[] buffer)

byte[] -> PcapPacket: public PcapPacket(byte[] buffer)

其余的是样板代码,用于实现 Kafka 所需的编码器/解码器接口.

The rest is boilerplate code to implement the Encoder/Decoder interfaces required by Kafka.

这篇关于如何使用scala在kafka消费者中实现反序列化?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 12:38:27,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/963100.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:如何使用   消费者   序列化   kafka   scala

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!