如何在PySpark中进行爆炸?

编程入门 行业动态 更新时间:2024-10-18 22:32:54
本文介绍了如何在PySpark中进行爆炸?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

假设我有一个DataFrame,其中一列用于用户,另一列用于其所写的单词:

Let's say I have a DataFrame with a column for users and another column for words they've written:

Row(user='Bob', word='hello') Row(user='Bob', word='world') Row(user='Mary', word='Have') Row(user='Mary', word='a') Row(user='Mary', word='nice') Row(user='Mary', word='day')

我想将word列聚合为一个向量:

I would like to aggregate the word column into a vector:

Row(user='Bob', words=['hello','world']) Row(user='Mary', words=['Have','a','nice','day'])

似乎我无法使用任何Sparks分组功能,因为它们期望随后的聚合步骤.我的用例是我想将这些数据提供给Word2Vec而不使用其他Spark聚合.

It seems I can't use any of Sparks grouping functions because they expect a subsequent aggregation step. My use case is that I want to feed these data into Word2Vec not use other Spark aggregations.

推荐答案

从spark 2.3版本开始,我们现在有了Pandas UDF(又名Vectorized UDF).下面的函数将完成OP的任务...使用此函数的好处是可以确保保留顺序.在许多情况下,例如时间序列分析,顺序是必不可少的.

As of the spark 2.3 release we now have Pandas UDF(aka Vectorized UDF). The function below will accomplish the OP's task... A benefit of using this function is the order is guaranteed to be preserved. Order is essential in many cases such as time series analysis.

import pandas as pd import findspark findspark.init() import pyspark from pyspark.sql import SparkSession, Row from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StructType, StructField, ArrayType spark = SparkSession.builder.appName('test_collect_array_grouped').getOrCreate() def collect_array_grouped(df, groupbyCols, aggregateCol, outputCol): """ Aggregate function: returns a new :class:`DataFrame` such that for a given column, aggregateCol, in a DataFrame, df, collect into an array the elements for each grouping defined by the groupbyCols list. The new DataFrame will have, for each row, the grouping columns and an array of the grouped values from aggregateCol in the outputCol. :param groupbyCols: list of columns to group by. Each element should be a column name (string) or an expression (:class:`Column`). :param aggregateCol: the column name of the column of values to aggregate into an array for each grouping. :param outputCol: the column name of the column to output the aggregeted array to. """ groupbyCols = [] if groupbyCols is None else groupbyCols df = df.select(groupbyCols + [aggregateCol]) schema = df.select(groupbyCols).schema aggSchema = df.select(aggregateCol).schema arrayField = StructField(name=outputCol, dataType=ArrayType(aggSchema[0].dataType, False)) schema = schema.add(arrayField) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def _get_array(pd_df): vals = pd_df[groupbyCols].iloc[0].tolist() vals.append(pd_df[aggregateCol].values) return pd.DataFrame([vals]) return df.groupby(groupbyCols).apply(_get_array) rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'), Row(user='Bob', word='world'), Row(user='Mary', word='Have'), Row(user='Mary', word='a'), Row(user='Mary', word='nice'), Row(user='Mary', word='day')]) df = spark.createDataFrame(rdd) collect_array_grouped(df, ['user'], 'word', 'users_words').show() +----+--------------------+ |user| users_words| +----+--------------------+ |Mary|[Have, a, nice, day]| | Bob| [hello, world]| +----+--------------------+

更多推荐

如何在PySpark中进行爆炸?

本文发布于:2023-11-22 05:49:35,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1616203.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:如何在   PySpark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!