如何定义自定义聚合函数以对Vector的列求和?

编程入门 行业动态 更新时间:2024-10-16 00:21:52
本文介绍了如何定义自定义聚合函数以对Vector的列求和?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有两列的DataFrame, ID 类型为 Int 和 Vec 类型为 Vector ( org.apache.spark.mllib.linalg.Vector )的

DataFrame如下所示:

ID,Vec 1,[0,0,5] 1,[4,0,1] 1,[1,2,1] 2,[7,5,0] 2,[3,3,4] 3,[0,8,1] 3,[0,0,1] 3,[7,7,7 ] ....

我想做一个 groupBy($ ID)然后通过对向量求和来对每个组内的行进行聚合。

上面期望的输出例如:

ID,SumOfVectors 1,[5,2,7] 2, [10,8,4] 3,[7,15,9] ...

可用的聚合功能将不起作用,例如 df.groupBy($ ID)。agg(sum($ Vec)将导致ClassCastException。

如何实现自定义聚合函数,使我可以对向量或数组求和或进行任何其他自定义操作?

解决方案

火花> = 3.0

您可以将 Summarizer 与 sum

导入org.apache.spark.ml。 stat.Summarizer df .groupBy($ id) .agg(Summarizer.sum($ vec)。alias( vec))

火花< = 3.0

我个人不会打扰UDAF。它不仅冗长而且速度也不快(使用ArrayType作为bufferSchema性能的火花UDAF问题),而我只是使用 reduceByKey / foldByKey :

导入org.apache.spark。 sql.Row import breeze.linalg。{DenseVector => BDV} import org.apache.spark.ml.linalg。{Vector,Vectors} def dv(values:Double *):Vector = Vectors.dense(values.toArray) val df = spark.createDataFrame(Seq((1,dv(0,0,5)),(1,dv(4,0,1)),(1,dv (1,2,1)),(2,dv(7,5,0)),(2,dv(3,3,4)),(3,dv(0, 8,1)),(3,dv(0,0,1)),(3,dv(7,7,7)))).toDF( id, vec) val合计= df .rdd .map {case Row(k:Int,v:Vector)=> (k,BDV(v.toDense.values))} .foldByKey(BDV.zeros [Double](3))(_ + = _) .mapValues(v => Vectors.dense (v.toArray)) .toDF( id, vec) 汇总。show // + --- +- ----------- + // | id | vec | // + --- + -------------- + // | 1 | [5.0,2.0,7.0] | // | 2 | [10.0,8.0,4.0] | // | 3 | [7.0,15.0,9.0] | // + --- + -------------- +

为了比较,使用了一个简单的 UDAF。必需的导入:

import org.apache.spark.sql.expressions。{MutableAggregationBuffer, UserDefinedAggregateFunction} 导入org.apache.spark.ml.linalg。{向量,向量,SQLDataTypes} 导入org.apache.spark.sql.types。{StructType,ArrayType,DoubleType} 导入org.apache。 spark.sql.Row 导入scala.collection.mutable.WrappedArray

类定义:

class VectorSum(n:Int)扩展UserDefinedAggregateFunction { def inputSchema = new StructType()。add( v ,SQLDataTypes.VectorType) def bufferSchema = new StructType()。add( buff,ArrayType(DoubleType)) def dataType = SQLDataTypes.VectorType def确定性=真 def initialize(buffer:MutableAggregationBuffer)= { buffer.update(0,Array.fill(n)(0.0))} def更新(缓冲区:MutableAggregationBuffer,输入:行)= { if(!input.isNullAt( 0)){ val buff = buffer.getAs [WrappedArray [Double]](0) val v = input.getAs [Vector](0).toSparse for(i< -v.indices){ buff(i)+ = v(i)} buffer.update(0,buff)} } def merge(buffer1:MutableAggregationBuffer,buffer2:Row)= { val buff1 = buffer1.getAs [WrappedArray [Double]](0) val buff2 = buffer2.getAs [WrappedArray [ Double]](0) for((x,i)<-buff2.zipWithIndex){ buff1(i)+ = x } buffer1.update(0 ,buff1)} def评估(buffer:Row)= Vectors.dense( buffer.getAs [Seq [Double]](0).toArray)}

示例用法:

df.groupBy($$ id))。agg(new VectorSum(3)($$ vec)别名 vec)。show // + --- + -------------- + // | id | vec | // + --- + -------------- + // | 1 | [5.0,2.0,7.0] | // | 2 | [10.0,8.0,4.0] | // | 3 | [7.0,15.0,9.0] | // + --- + -------------- +

另请参见:如何在Spark SQL中查找分组的Vector列的平均值?。

I have a DataFrame of two columns, ID of type Int and Vec of type Vector (org.apache.spark.mllib.linalg.Vector).

The DataFrame looks like follow:

ID,Vec 1,[0,0,5] 1,[4,0,1] 1,[1,2,1] 2,[7,5,0] 2,[3,3,4] 3,[0,8,1] 3,[0,0,1] 3,[7,7,7] ....

I would like to do a groupBy($"ID") then apply an aggregation on the rows inside each group by summing the vectors.

The desired output of the above example would be:

ID,SumOfVectors 1,[5,2,7] 2,[10,8,4] 3,[7,15,9] ...

The available aggregation functions will not work, e.g. df.groupBy($"ID").agg(sum($"Vec") will lead to an ClassCastException.

How to implement a custom aggregation function that allows me to do the sum of vectors or arrays or any other custom operation?

解决方案

Spark >= 3.0

You can use Summarizer with sum

import org.apache.spark.ml.stat.Summarizer df .groupBy($"id") .agg(Summarizer.sum($"vec").alias("vec"))

Spark <= 3.0

Personally I wouldn't bother with UDAFs. There are more than verbose and not exactly fast (Spark UDAF with ArrayType as bufferSchema performance issues) Instead I would simply use reduceByKey / foldByKey:

import org.apache.spark.sql.Row import breeze.linalg.{DenseVector => BDV} import org.apache.spark.ml.linalg.{Vector, Vectors} def dv(values: Double*): Vector = Vectors.dense(values.toArray) val df = spark.createDataFrame(Seq( (1, dv(0,0,5)), (1, dv(4,0,1)), (1, dv(1,2,1)), (2, dv(7,5,0)), (2, dv(3,3,4)), (3, dv(0,8,1)), (3, dv(0,0,1)), (3, dv(7,7,7))) ).toDF("id", "vec") val aggregated = df .rdd .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) } .foldByKey(BDV.zeros[Double](3))(_ += _) .mapValues(v => Vectors.dense(v.toArray)) .toDF("id", "vec") aggregated.show // +---+--------------+ // | id| vec| // +---+--------------+ // | 1| [5.0,2.0,7.0]| // | 2|[10.0,8.0,4.0]| // | 3|[7.0,15.0,9.0]| // +---+--------------+

And just for comparison a "simple" UDAF. Required imports:

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes} import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType} import org.apache.spark.sql.Row import scala.collection.mutable.WrappedArray

Class definition:

class VectorSum (n: Int) extends UserDefinedAggregateFunction { def inputSchema = new StructType().add("v", SQLDataTypes.VectorType) def bufferSchema = new StructType().add("buff", ArrayType(DoubleType)) def dataType = SQLDataTypes.VectorType def deterministic = true def initialize(buffer: MutableAggregationBuffer) = { buffer.update(0, Array.fill(n)(0.0)) } def update(buffer: MutableAggregationBuffer, input: Row) = { if (!input.isNullAt(0)) { val buff = buffer.getAs[WrappedArray[Double]](0) val v = input.getAs[Vector](0).toSparse for (i <- v.indices) { buff(i) += v(i) } buffer.update(0, buff) } } def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { val buff1 = buffer1.getAs[WrappedArray[Double]](0) val buff2 = buffer2.getAs[WrappedArray[Double]](0) for ((x, i) <- buff2.zipWithIndex) { buff1(i) += x } buffer1.update(0, buff1) } def evaluate(buffer: Row) = Vectors.dense( buffer.getAs[Seq[Double]](0).toArray) }

And an example usage:

df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show // +---+--------------+ // | id| vec| // +---+--------------+ // | 1| [5.0,2.0,7.0]| // | 2|[10.0,8.0,4.0]| // | 3|[7.0,15.0,9.0]| // +---+--------------+

See also: How to find mean of grouped Vector columns in Spark SQL?.

更多推荐

如何定义自定义聚合函数以对Vector的列求和?

本文发布于:2023-10-26 14:06:21,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1530360.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:自定义   以对   函数   定义   Vector

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!