例如,我有一个像这样的数据集
For example, I have a dataset like this
test = spark.createDataFrame([ (0, 1, 5, "2018-06-03", "Region A"), (1, 1, 2, "2018-06-04", "Region B"), (2, 2, 1, "2018-06-03", "Region B"), (3, 3, 1, "2018-06-01", "Region A"), (3, 1, 3, "2018-06-05", "Region A"), ])\ .toDF("orderid", "customerid", "price", "transactiondate", "location") test.show()我可以通过
overall_stat = test.groupBy("customerid").agg(count("orderid"))\ .withColumnRenamed("count(orderid)", "overall_count") temp_result = test.groupBy("customerid").pivot("location").agg(count("orderid")).na.fill(0).join(overall_stat, ["customerid"]) for field in temp_result.schema.fields: if str(field.name) not in ['customerid', "overall_count", "overall_amount"]: name = str(field.name) temp_result = temp_result.withColumn(name, col(name)/col("overall_count")) temp_result.show()数据看起来像这样
现在,我想通过 overall_count 计算加权平均值,怎么办?
Now, I want to calculate the weighted average by the overall_count, how can I do it?
对于区域A,结果应为(0.66 * 3 + 1 * 1)/4 ,对于区域A,结果应为(0.33 * 3 + 1 * 1)/4 B
The result should be (0.66*3+1*1)/4 for region A, and (0.33*3+1*1)/4 for region B
我的想法:
当然可以通过将数据转换为python/pandas然后进行一些计算来实现,但是在什么情况下我们应该使用Pyspark?
It can certainly be achieved through turning the data into python/pandas and then do some calculation, but in what cases should we use Pyspark?
我可以得到
temp_result.agg(sum(col("Region A") * col("overall_count")), sum(col("Region B")*col("overall_count"))).show()但是感觉不对,特别是如果要计算许多 region 的情况.
but it doesn't feel right, especially if there is many regions to count.
推荐答案您可以通过将上述步骤分为多个阶段来获得加权平均值.
you can achieve a weighted average by breaking your above steps into multiple stages.
请考虑以下内容:
Dataframe Name: sales_table [ total_sales, count_of_orders, location] [ 50 , 9 , A ] [ 80 , 4 , A ] [ 90 , 7 , A ]要计算上述(70)的分组加权平均值,可以分为两个步骤:
To calculate the grouped weighted average of the above (70) is broken into two steps:
如果我们在PySpark代码中将以上内容分为几个阶段,则可以获得以下信息:
If we break the above into several stages within our PySpark code, you can get the following:
new_sales = sales_table \ .withColumn("sales_x_count", col("total_sales") * col("count_orders")) \ .groupBy("Location") \ .agg(sf.sum("total_sales").alias("sum_total_sales"), \ sf.sum("sales_x_count").alias("sum_sales_x_count")) \ .withColumn("count_weighted_average", col("sum_sales_x_count") / col("sum_total_sales"))所以...这里不需要花哨的UDF(并且可能会使您慢下来).
So... no fancy UDF is really necessary here (and would likely slow you down).
更多推荐
Pyspark:按列的加权平均值
发布评论