Spark SQL:对数组值使用collect

编程入门 行业动态 更新时间:2024-10-19 14:29:42
本文介绍了Spark SQL:对数组值使用collect_set吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个聚合的DataFrame,其中包含使用collect_set创建的列.现在,我需要再次在此DataFrame上进行聚合,然后再次将collect_set应用于该列的值.问题是我需要套用collect_Set ver的值集-到目前为止,我看到的唯一方法是分解聚合的DataFrame.有更好的方法吗?

I have an aggregated DataFrame with a column created using collect_set. I now need to aggregate over this DataFrame again, and apply collect_set to the values of that column again. The problem is that I need to apply collect_Set ver the values of the sets - and do far the only way I see how to do so is by exploding the aggregated DataFrame. Is there a better way?

示例:

初始DataFrame:

Initial DataFrame:

country | continent | attributes ------------------------------------- Canada | America | A Belgium | Europe | Z USA | America | A Canada | America | B France | Europe | Y France | Europe | X

聚合的DataFrame(我收到的输入数据)-在country上的聚合:

Aggregated DataFrame (the one I receive as input) - aggregation over country:

country | continent | attributes ------------------------------------- Canada | America | A, B Belgium | Europe | Z USA | America | A France | Europe | Y, X

我想要的输出-通过continent聚合:

My desired output - aggregation over continent:

continent | attributes ------------------------------------- America | A, B Europe | X, Y, Z

推荐答案

由于此时您只能容纳少量行,因此您只需按原样收集属性并将结果展平(Spark> = 2.4)

Since you can have only a handful of rows at this point, you just collect attributes as-is and flatten the result (Spark >= 2.4)

import org.apache.spark.sql.functions.{collect_set, flatten, array_distinct} val byState = Seq( ("Canada", "America", Seq("A", "B")), ("Belgium", "Europe", Seq("Z")), ("USA", "America", Seq("A")), ("France", "Europe", Seq("Y", "X")) ).toDF("country", "continent", "attributes") byState .groupBy("continent") .agg(array_distinct(flatten(collect_set($"attributes"))) as "attributes") .show

+---------+----------+ |continent|attributes| +---------+----------+ | Europe| [Y, X, Z]| | America| [A, B]| +---------+----------+

通常情况下,处理起来要困难得多,并且在许多情况下,如果您期望大型列表,每个组具有许多重复项和多个值,则最佳解决方案*是仅从头开始重新计算结果,即

In general case things are much harder to handle, and in many cases, if you expect large lists, with many duplicates and many values per group, the optimal solution* is to just recompute results from scratch, i.e.

input.groupBy($"continent").agg(collect_set($"attributes") as "attributes")

一种可能的替代方法是使用Aggregator

One possible alternative is to use Aggregator

import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.{Encoder, Encoders} import scala.collection.mutable.{Set => MSet} class MergeSets[T, U](f: T => Seq[U])(implicit enc: Encoder[Seq[U]]) extends Aggregator[T, MSet[U], Seq[U]] with Serializable { def zero = MSet.empty[U] def reduce(acc: MSet[U], x: T) = { for { v <- f(x) } acc.add(v) acc } def merge(acc1: MSet[U], acc2: MSet[U]) = { acc1 ++= acc2 } def finish(acc: MSet[U]) = acc.toSeq def bufferEncoder: Encoder[MSet[U]] = Encoders.kryo[MSet[U]] def outputEncoder: Encoder[Seq[U]] = enc }

并按照以下说明应用

case class CountryAggregate( country: String, continent: String, attributes: Seq[String]) byState .as[CountryAggregate] .groupByKey(_.continent) .agg(new MergeSets[CountryAggregate, String](_.attributes).toColumn) .toDF("continent", "attributes") .show

+---------+----------+ |continent|attributes| +---------+----------+ | Europe| [X, Y, Z]| | America| [B, A]| +---------+----------+

但这显然不是Java友好的选项.

but that's clearly not a Java-friendly option.

另请参见如何在groupBy之后将值汇总到集合中?(类似,但没有唯一性约束).

See also How to aggregate values into collection after groupBy? (similar, but without uniqueness constraint).

*这是因为explode可能非常昂贵,尤其是在较旧的Spark版本中,与对SQL集合的外部表示的访问相同.

* That's because explode can be quite expensive, especially in older Spark versions, same as access to external representation of SQL collections.

更多推荐

Spark SQL:对数组值使用collect

本文发布于:2023-11-22 05:46:11,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1616195.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:数组   Spark   SQL   collect

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!