Spark Streamming:从具有多个模式的 kafka 读取数据

编程入门 行业动态 更新时间:2024-10-28 02:22:26
本文介绍了Spark Streamming:从具有多个模式的 kafka 读取数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我正在为 Spark 流的实现而苦苦挣扎.

I am struggling with the implementation in spark streaming.

来自 kafka 的消息看起来像这样,但有更多的字段

The messages from the kafka looks like this but with with more fields

{"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}
{"event":"databasedata", "mysql":"sensors", "payload": {"actual data as a json}}
{"event":"eventApi", "source":"event1", "payload": {"actual data as a json}}
{"event":"eventapi", "source":"event2", "payload": {"actual data as a json}}

我正在尝试从 Kafka 主题(具有多个模式)读取消息.我需要阅读每条消息并查找事件和源字段并决定将其存储为数据集的位置.实际数据在字段有效负载中作为 JSON,它只是一条记录.

I am trying to read the messages from a Kafka topic (which has multiple schemas). I need to read each message and look for an event and source field and decide where to store as a Dataset. The actual data is in the field payload as a JSON which is only a single record.

有人可以帮我实施这个或任何其他替代方案吗?

Can someone help me to implement this or any other alternatives?

在同一主题中发送具有多个模式的消息并使用它是一种好方法吗?

Is it a good way to send the messages with multiple schemas in the same topic and consume it?

提前致谢,

推荐答案

您可以从传入的 JSON 对象创建一个 Dataframe.

You can create a Dataframe from the incoming JSON object.

创建 JSON 对象的 Seq[Sring].

Create Seq[Sring] of JSON object.

使用 val df=spark.read.json[Seq[String]].

对您选择的 dataframe df 执行操作.

Perform the operations on the dataframe df of your choice.

这篇关于Spark Streamming:从具有多个模式的 kafka 读取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 12:40:28,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/961771.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:多个   模式   数据   Spark   Streamming

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!