问题描述
限时送ChatGPT账号..我想将 org.apache.spark.sql.DataFrame
转换为 org.apache.spark.rdd.RDD[(String, String)]
在 Databricks 中. 有人可以帮忙吗?
I want to convert an org.apache.spark.sql.DataFrame
to org.apache.spark.rdd.RDD[(String, String)]
in Databricks. Can anyone help?
背景(也欢迎更好的解决方案):我有一个 Kafka 流,它(经过一些步骤后)变成了 2 列数据框.我想把它放到 Redis 缓存中,第一列作为键,第二列作为值.
Background (and a better solution is also welcome): I have a Kafka stream which (after some steps) becomes a 2 column data frame. I would like to put this into a Redis cache, first column as a key and second column as a value.
更具体地说输入的类型是这样的:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]
.我尝试放入Redis如下:
More specifically the type of the input is this: lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]
. I try to put into Redis as follows:
sc.toRedisKV(lastContacts)(redisConfig)
错误信息如下所示:
notebook:20: error: type mismatch;
found : org.apache.spark.sql.DataFrame
(which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)
我已经尝试过一些想法(例如函数 .rdd
),但没有任何帮助.
I already played around with some ideas (like function .rdd
) but none helped.
推荐答案
如果要将行映射到不同的 RDD 元素,可以使用 df.map(row => ...) 将数据帧转换为 RDD.
You can use df.map(row => ...) to convert the dataframe to a RDD if you want to map a row to a different RDD element.
例如:
val df = Seq(("table1",432),
("table2",567),
("table3",987),
("table1",789)).
toDF("tablename", "Code").toDF()
df.show()
+---------+----+
|tablename|Code|
+---------+----+
| table1| 432|
| table2| 567|
| table3| 987|
| table1| 789|
+---------+----+
val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]
OR
val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd //Type: RDD[(String,String)]
请参考https:///community.hortonworks/questions/106500/error-in-spark-streaming-kafka-integration-structu.html 关于AnalysisException:必须使用 writeStream.start() 执行具有流媒体源的查询
您需要使用 query.awaitTermination() 等待查询终止防止进程在查询处于活动状态时退出.
You need to wait for the termination of the query using query.awaitTermination() To prevent the process from exiting while the query is active.
这篇关于DataFrame 到 RDD[(String, String)] 的转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论