我正在尝试对 pyspark 数据框中的行应用自定义函数.此函数采用相同维度的行和 2 个其他向量.它为第二个向量中的行中的每个匹配值输出第三个向量的值的总和.
I'm trying to apply a custom function over rows in a pyspark dataframe. This function takes the row and 2 other vectors of the same dimension. It outputs the sum of the values of the third vector for each matching values from the row in the second vector.
import pandas as pd import numpy as np功能:
def V_sum(row,b,c): return float(np.sum(c[row==b]))我想用熊猫实现的目标很简单:
What I want to achieve is simple with pandas:
pd_df = pd.DataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], columns=['t1', 't2', 't3', 't4']) t1 t2 t3 t4 0 0 1 0 0 1 1 1 0 0 2 0 0 1 0 3 1 0 1 1 4 1 1 0 0 B = np.array([1,0,1,0]) V = np.array([5,1,2,4]) pd_df.apply(lambda x: V_sum(x, B, V), axis=1) 0 4.0 1 9.0 2 7.0 3 8.0 4 9.0 dtype: int64我想在 pyspark 中执行相同的操作.
I would like to perform the same action in pyspark.
from pyspark import SparkConf, SparkContext, SQLContext sc = SparkContext("local") sqlContext = SQLContext(sc) spk_df = sqlContext.createDataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], ['t1', 't2', 't3', 't4']) spk_df.show() +---+---+---+---+ | t1| t2| t3| t4| +---+---+---+---+ | 0| 1| 0| 0| | 1| 1| 0| 0| | 0| 0| 1| 0| | 1| 0| 1| 1| | 1| 1| 0| 0| +---+---+---+---+我想过使用 udf 但我无法让它工作
I thought about using udf but I can't get it to work
from pyspark.sql.types import FloatType import pyspark.sql.functions as F V_sum_udf = F.udf(V_sum, FloatType()) spk_df.select(V_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))).alias("results")).show()显然我做错了什么,因为它产生了:
Clearly I'm doing something wrong because it yields:
Py4JJavaError: An error occurred while calling o27726.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 90.0 failed 1 times, most recent failure: Lost task 0.0 in stage 90.0 (TID 91, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 推荐答案如果您想在函数中使用非列数据以及列数据来计算新列,则使用 UDF + 闭包 + withColumn如此处所述一个很好的起点.
If you've got non-column data that you want to use inside a function along with column data to compute a new column, a UDF + closure + withColumn as described here is a good place to start.
B = [2,0,1,0] V = [5,1,2,4] v_sum_udf = F.udf(lambda row: V_sum(row, B, V), FloatType()) spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))))更多推荐
pyspark 数据框上的自定义函数
发布评论