问题描述
限时送ChatGPT账号..我正在为 Spark 流的实现而苦苦挣扎.
I am struggling with the implementation in spark streaming.
来自 kafka 的消息看起来像这样,但有更多的字段
The messages from the kafka looks like this but with with more fields
{"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}
{"event":"databasedata", "mysql":"sensors", "payload": {"actual data as a json}}
{"event":"eventApi", "source":"event1", "payload": {"actual data as a json}}
{"event":"eventapi", "source":"event2", "payload": {"actual data as a json}}
我正在尝试从 Kafka 主题(具有多个模式)读取消息.我需要阅读每条消息并查找事件和源字段并决定将其存储为数据集的位置.实际数据在字段有效负载中作为 JSON,它只是一条记录.
I am trying to read the messages from a Kafka topic (which has multiple schemas). I need to read each message and look for an event and source field and decide where to store as a Dataset. The actual data is in the field payload as a JSON which is only a single record.
有人可以帮我实施这个或任何其他替代方案吗?
Can someone help me to implement this or any other alternatives?
在同一主题中发送具有多个模式的消息并使用它是一种好方法吗?
Is it a good way to send the messages with multiple schemas in the same topic and consume it?
提前致谢,
推荐答案
您可以从传入的 JSON 对象创建一个 Dataframe
.
You can create a Dataframe
from the incoming JSON object.
创建 JSON 对象的 Seq[Sring]
.
Create Seq[Sring]
of JSON object.
使用 val df=spark.read.json[Seq[String]]
.
对您选择的 dataframe df
执行操作.
Perform the operations on the dataframe df
of your choice.
这篇关于Spark Streamming:从具有多个模式的 kafka 读取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论