Pyspark根据列值复制行

编程入门 行业动态 更新时间:2024-10-21 07:28:27
本文介绍了Pyspark根据列值复制行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我想根据每一行上给定列的值复制我的 DataFrame 中的所有行,然后索引每个新行.假设我有:

I would like to replicate all rows in my DataFrame based on the value of a given column on each row, and than index each new row. Suppose I have:

Column A Column B T1 3 T2 2

我希望结果是:

Column A Column B Index T1 3 1 T1 3 2 T1 3 3 T2 2 1 T2 2 2

我能够使用固定值进行类似的操作,但无法使用列中的信息.我当前的固定值工作代码是:

I was able to to something similar with fixed values, but not by using the information found on the column. My current working code for fixed values is:

idx = [lit(i) for i in range(1, 10)] df = df.withColumn('Index', explode(array( idx ) ))

我试图改变:

lit(i) for i in range(1, 10)

lit(i) for i in range(1, df['Column B'])

并将其添加到我的 array() 函数中:

and add it into my array() function:

df = df.withColumn('Index', explode(array( lit(i) for i in range(1, df['Column B']) ) ))

但它不起作用(TypeError:'Column'对象不能解释为整数).

but it does not work (TypeError: 'Column' object cannot be interpreted as an integer).

我应该如何实现这一点?

How should I implement this?

推荐答案

不幸的是你不能迭代一列 就这样.您始终可以使用 udf,但我确实有一个非 udf hack 解决方案,如果您使用的是 Spark 2.1 或更高版本,它应该适合您.

Unfortunately you can't iterate over a Column like that. You can always use a udf, but I do have a non-udf hack solution that should work for you if you're using Spark version 2.1 or higher.

诀窍是利用 pyspark.sql.functions.posexplode() 获取索引值.我们通过重复逗号 Column B 次来创建一个字符串来实现这一点.然后我们用逗号分割这个字符串,并使用 posexplode 来获取索引.

The trick is to take advantage of pyspark.sql.functions.posexplode() to get the index value. We do this by creating a string by repeating a comma Column B times. Then we split this string on the comma, and use posexplode to get the index.

df.createOrReplaceTempView("df") # first register the DataFrame as a temp table query = 'SELECT '\ '`Column A`,'\ '`Column B`,'\ 'pos AS Index '\ 'FROM ( '\ 'SELECT DISTINCT '\ '`Column A`,'\ '`Column B`,'\ 'posexplode(split(repeat(",", `Column B`), ",")) '\ 'FROM df) AS a '\ 'WHERE a.pos > 0' newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index") newDF.show() #+--------+--------+-----+ #|Column A|Column B|Index| #+--------+--------+-----+ #| T1| 3| 1| #| T1| 3| 2| #| T1| 3| 3| #| T2| 2| 1| #| T2| 2| 2| #+--------+--------+-----+

注意:您需要将列名括在反引号中,因为它们中有空格,如本文所述:如何在Spark SQL中表示名称包含空格的列

Note: You need to wrap the column names in backticks since they have spaces in them as explained in this post: How to express a column which name contains spaces in Spark SQL

更多推荐

Pyspark根据列值复制行

本文发布于:2023-05-29 23:08:23,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/355074.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Pyspark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!