在Databricks中为每个元素调用一个函数作为流

编程入门 行业动态 更新时间:2024-10-25 00:26:16
本文介绍了在Databricks中为每个元素调用一个函数作为流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我在Databricks中有一个DataFrame流,并且我想对每个元素执行一个操作。在网上,我发现了特定目的的方法,例如将其写入控制台或转储到内存中,但是我想添加一些业务逻辑,并将一些结果放入Redis中。

I have a DataFrame stream in Databricks, and I want to perform an action on each element. On the net I found specific purpose methods, like writing it to the console or dumping into memory, but I want to add some business logic, and put some results into Redis.

更具体地说,这是非流情况下的样子:

To be more specific, this is how it would look like in non-stream case:

val someDataFrame = Seq( ("key1", "value1"), ("key2", "value2"), ("key3", "value3"), ("key4", "value4") ).toDF() def someFunction(keyValuePair: (String, String)) = { println(keyValuePair) } someDataFrame.collect.foreach(r => someFunction((r(0).toString, r(1).toString)))

但是,如果 someDataFrame 不是简单的数据帧而是流数据帧(确实来自Kafka),则会出现错误消息是这样的:

But if the someDataFrame is not a simple data frame but a stream data frame (indeed coming from Kafka), the error message is this:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

有人可以帮我解决这个问题吗?

Could anyone please help me solving this problem?

一些重要说明:

  • 我已经阅读了相关文档,例如Spark Streaming或Databricks Streaming以及其他一些文档

  • I've read the relevant documentation, like Spark Streaming or Databricks Streaming and a few other descriptions as well.

我知道必须有类似 start()和 awaitTermination ,但我不知道确切的语法。这些描述无济于事。

I know that there must be something like start() and awaitTermination, but I don't know the exact syntax. The descriptions did not help.

需要花一些页面列出我尝试过的所有可能性,所以我宁愿不提供它们。

It would take pages to list all the possibilities I tried, so I rather not provide them.

我不是想要解决显示结果的特定问题。即请不要针对这种情况提供解决方案。 someFunction 看起来像这样:

I do not want to solve the specific problem of displaying the result. I.e. please do not provide a solution to this specific case. The someFunction would look like this:

val someData = readSomeExternalData() if (condition containing keyValuePair and someData) { doSomething(keyValuePair); }

(问题 Spark结构化流中ForeachWriter的目的是什么?没有提供有效的示例,因此无法回答我的问题。)

(Question What is the purpose of ForeachWriter in Spark Structured Streaming? does not provide a working example, therefore does not answer my question.)

推荐答案

以下是使用foreachBatch进行阅读以使用流式API将每个项目保存到redis的示例。

Here is an example of reading using foreachBatch to save every item to redis using the streaming api.

与上一个问题(将DataFrame转换为RDD [(String,String)] )

// import spark and spark-redis import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.streaming._ import org.apache.spark.sql.types._ import com.redislabs.provider.redis._ // schema of csv files val userSchema = new StructType() .add("name", "string") .add("age", "string") // create a data stream reader from a dir with csv files val csvDF = spark .readStream .format("csv") .option("sep", ";") .schema(userSchema) .load("./data") // directory where the CSV files are // redis val redisConfig = new RedisConfig(new RedisEndpoint("localhost", 6379)) implicit val readWriteConfig: ReadWriteConfig = ReadWriteConfig.Default csvDF.map(r => (r.getString(0), r.getString(0))) // converts the dataset to a Dataset[(String, String)] .writeStream // create a data stream writer .foreachBatch((df, _) => sc.toRedisKV(df.rdd)(redisConfig)) // save each batch to redis after converting it to a RDD .start // start processing

更多推荐

在Databricks中为每个元素调用一个函数作为流

本文发布于:2023-11-26 21:09:27,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1635203.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:中为   元素   一个函数   Databricks

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!