如何获得与Spark RDD等效的SQL row

编程入门 行业动态 更新时间:2024-10-28 18:23:14
本文介绍了如何获得与Spark RDD等效的SQL row_number?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我需要为具有许多列的数据表生成完整的行号列表.

I need to generate a full list of row_numbers for a data table with many columns.

在SQL中,它看起来像这样:

In SQL, this would look like this:

select key_value, col1, col2, col3, row_number() over (partition by key_value order by col1, col2 desc, col3) from temp ;

现在,假设在Spark中,我有一个(K,V)形式的RDD,其中V =(col1,col2,col3),所以我的输入就像

Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like

(key1, (1,2,3)) (key1, (1,4,7)) (key1, (2,2,3)) (key2, (5,5,5)) (key2, (5,5,9)) (key2, (7,5,5)) etc.

我想使用sortBy(),sortWith(),sortByKey(),zipWithIndex等命令订购这些,并使用正确的row_number创建一个新的RDD

I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number

(key1, (1,2,3), 2) (key1, (1,4,7), 1) (key1, (2,2,3), 3) (key2, (5,5,5), 1) (key2, (5,5,9), 2) (key2, (7,5,5), 3) etc.

(我不在乎括号,因此形式也可以是(K,(col1,col2,col3,rownum)))

(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)

我该怎么做?

这是我的第一次尝试:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3)) val temp1 = sc.parallelize(sample_data) temp1.collect().foreach(println) // ((3,4),5,5,5) // ((3,4),5,5,9) // ((3,4),7,5,5) // ((1,2),1,2,3) // ((1,2),1,4,7) // ((1,2),2,2,3) temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println) // ((((1,2),1,2,3),1),0) // ((((1,2),1,4,7),1),1) // ((((1,2),2,2,3),1),2) // ((((3,4),5,5,5),1),3) // ((((3,4),5,5,9),1),4) // ((((3,4),7,5,5),1),5) // note that this isn't ordering with a partition on key value K! val temp2 = temp1.???

还请注意,函数sortBy不能直接应用于RDD,但必须先运行collect(),然后输出的不是RDD,而是数组.

Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println) // ((1,2),1,4,7) // ((1,2),1,2,3) // ((1,2),2,2,3) // ((3,4),5,5,5) // ((3,4),5,5,9) // ((3,4),7,5,5)

这里还有一些进步,但是仍然没有分区:

Here's a little more progress, but still not partitioned:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1)) temp2.collect().foreach(println) // ((1,2),1,4,7,1) // ((1,2),1,2,3,2) // ((1,2),2,2,3,3) // ((3,4),5,5,5,4) // ((3,4),5,5,9,5) // ((3,4),7,5,5,6)

推荐答案

row_number() over (partition by ... order by ...)功能已添加到Spark 1.4.此答案使用PySpark/DataFrames.

The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.

创建一个测试数据框:

from pyspark.sql import Row, functions as F testDF = sc.parallelize( (Row(k="key1", v=(1,2,3)), Row(k="key1", v=(1,4,7)), Row(k="key1", v=(2,2,3)), Row(k="key2", v=(5,5,5)), Row(k="key2", v=(5,5,9)), Row(k="key2", v=(7,5,5)) ) ).toDF()

添加分区的行号:

from pyspark.sql.window import Window (testDF .select("k", "v", F.rowNumber() .over(Window .partitionBy("k") .orderBy("k") ) .alias("rowNum") ) .show() ) +----+-------+------+ | k| v|rowNum| +----+-------+------+ |key1|[1,2,3]| 1| |key1|[1,4,7]| 2| |key1|[2,2,3]| 3| |key2|[5,5,5]| 1| |key2|[5,5,9]| 2| |key2|[7,5,5]| 3| +----+-------+------+

更多推荐

如何获得与Spark RDD等效的SQL row

本文发布于:2023-10-07 20:01:34,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1470395.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:如何获得   Spark   RDD   SQL   row

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!