我有两列的DataFrame, ID 类型为 Int 和 Vec 类型为 Vector ( org.apache.spark.mllib.linalg.Vector )的
DataFrame如下所示:
ID,Vec 1,[0,0,5] 1,[4,0,1] 1,[1,2,1] 2,[7,5,0] 2,[3,3,4] 3,[0,8,1] 3,[0,0,1] 3,[7,7,7 ] ....我想做一个 groupBy($ ID)然后通过对向量求和来对每个组内的行进行聚合。
上面期望的输出例如:
ID,SumOfVectors 1,[5,2,7] 2, [10,8,4] 3,[7,15,9] ...可用的聚合功能将不起作用,例如 df.groupBy($ ID)。agg(sum($ Vec)将导致ClassCastException。
如何实现自定义聚合函数,使我可以对向量或数组求和或进行任何其他自定义操作?
解决方案火花> = 3.0
您可以将 Summarizer 与 sum
导入org.apache.spark.ml。 stat.Summarizer df .groupBy($ id) .agg(Summarizer.sum($ vec)。alias( vec))火花< = 3.0
我个人不会打扰UDAF。它不仅冗长而且速度也不快(使用ArrayType作为bufferSchema性能的火花UDAF问题),而我只是使用 reduceByKey / foldByKey :
导入org.apache.spark。 sql.Row import breeze.linalg。{DenseVector => BDV} import org.apache.spark.ml.linalg。{Vector,Vectors} def dv(values:Double *):Vector = Vectors.dense(values.toArray) val df = spark.createDataFrame(Seq((1,dv(0,0,5)),(1,dv(4,0,1)),(1,dv (1,2,1)),(2,dv(7,5,0)),(2,dv(3,3,4)),(3,dv(0, 8,1)),(3,dv(0,0,1)),(3,dv(7,7,7)))).toDF( id, vec) val合计= df .rdd .map {case Row(k:Int,v:Vector)=> (k,BDV(v.toDense.values))} .foldByKey(BDV.zeros [Double](3))(_ + = _) .mapValues(v => Vectors.dense (v.toArray)) .toDF( id, vec) 汇总。show // + --- +- ----------- + // | id | vec | // + --- + -------------- + // | 1 | [5.0,2.0,7.0] | // | 2 | [10.0,8.0,4.0] | // | 3 | [7.0,15.0,9.0] | // + --- + -------------- +为了比较,使用了一个简单的 UDAF。必需的导入:
import org.apache.spark.sql.expressions。{MutableAggregationBuffer, UserDefinedAggregateFunction} 导入org.apache.spark.ml.linalg。{向量,向量,SQLDataTypes} 导入org.apache.spark.sql.types。{StructType,ArrayType,DoubleType} 导入org.apache。 spark.sql.Row 导入scala.collection.mutable.WrappedArray类定义:
class VectorSum(n:Int)扩展UserDefinedAggregateFunction { def inputSchema = new StructType()。add( v ,SQLDataTypes.VectorType) def bufferSchema = new StructType()。add( buff,ArrayType(DoubleType)) def dataType = SQLDataTypes.VectorType def确定性=真 def initialize(buffer:MutableAggregationBuffer)= { buffer.update(0,Array.fill(n)(0.0))} def更新(缓冲区:MutableAggregationBuffer,输入:行)= { if(!input.isNullAt( 0)){ val buff = buffer.getAs [WrappedArray [Double]](0) val v = input.getAs [Vector](0).toSparse for(i< -v.indices){ buff(i)+ = v(i)} buffer.update(0,buff)} } def merge(buffer1:MutableAggregationBuffer,buffer2:Row)= { val buff1 = buffer1.getAs [WrappedArray [Double]](0) val buff2 = buffer2.getAs [WrappedArray [ Double]](0) for((x,i)<-buff2.zipWithIndex){ buff1(i)+ = x } buffer1.update(0 ,buff1)} def评估(buffer:Row)= Vectors.dense( buffer.getAs [Seq [Double]](0).toArray)}示例用法:
df.groupBy($$ id))。agg(new VectorSum(3)($$ vec)别名 vec)。show // + --- + -------------- + // | id | vec | // + --- + -------------- + // | 1 | [5.0,2.0,7.0] | // | 2 | [10.0,8.0,4.0] | // | 3 | [7.0,15.0,9.0] | // + --- + -------------- +另请参见:如何在Spark SQL中查找分组的Vector列的平均值?。
I have a DataFrame of two columns, ID of type Int and Vec of type Vector (org.apache.spark.mllib.linalg.Vector).
The DataFrame looks like follow:
ID,Vec 1,[0,0,5] 1,[4,0,1] 1,[1,2,1] 2,[7,5,0] 2,[3,3,4] 3,[0,8,1] 3,[0,0,1] 3,[7,7,7] ....I would like to do a groupBy($"ID") then apply an aggregation on the rows inside each group by summing the vectors.
The desired output of the above example would be:
ID,SumOfVectors 1,[5,2,7] 2,[10,8,4] 3,[7,15,9] ...The available aggregation functions will not work, e.g. df.groupBy($"ID").agg(sum($"Vec") will lead to an ClassCastException.
How to implement a custom aggregation function that allows me to do the sum of vectors or arrays or any other custom operation?
解决方案Spark >= 3.0
You can use Summarizer with sum
import org.apache.spark.ml.stat.Summarizer df .groupBy($"id") .agg(Summarizer.sum($"vec").alias("vec"))Spark <= 3.0
Personally I wouldn't bother with UDAFs. There are more than verbose and not exactly fast (Spark UDAF with ArrayType as bufferSchema performance issues) Instead I would simply use reduceByKey / foldByKey:
import org.apache.spark.sql.Row import breeze.linalg.{DenseVector => BDV} import org.apache.spark.ml.linalg.{Vector, Vectors} def dv(values: Double*): Vector = Vectors.dense(values.toArray) val df = spark.createDataFrame(Seq( (1, dv(0,0,5)), (1, dv(4,0,1)), (1, dv(1,2,1)), (2, dv(7,5,0)), (2, dv(3,3,4)), (3, dv(0,8,1)), (3, dv(0,0,1)), (3, dv(7,7,7))) ).toDF("id", "vec") val aggregated = df .rdd .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) } .foldByKey(BDV.zeros[Double](3))(_ += _) .mapValues(v => Vectors.dense(v.toArray)) .toDF("id", "vec") aggregated.show // +---+--------------+ // | id| vec| // +---+--------------+ // | 1| [5.0,2.0,7.0]| // | 2|[10.0,8.0,4.0]| // | 3|[7.0,15.0,9.0]| // +---+--------------+And just for comparison a "simple" UDAF. Required imports:
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes} import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType} import org.apache.spark.sql.Row import scala.collection.mutable.WrappedArrayClass definition:
class VectorSum (n: Int) extends UserDefinedAggregateFunction { def inputSchema = new StructType().add("v", SQLDataTypes.VectorType) def bufferSchema = new StructType().add("buff", ArrayType(DoubleType)) def dataType = SQLDataTypes.VectorType def deterministic = true def initialize(buffer: MutableAggregationBuffer) = { buffer.update(0, Array.fill(n)(0.0)) } def update(buffer: MutableAggregationBuffer, input: Row) = { if (!input.isNullAt(0)) { val buff = buffer.getAs[WrappedArray[Double]](0) val v = input.getAs[Vector](0).toSparse for (i <- v.indices) { buff(i) += v(i) } buffer.update(0, buff) } } def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { val buff1 = buffer1.getAs[WrappedArray[Double]](0) val buff2 = buffer2.getAs[WrappedArray[Double]](0) for ((x, i) <- buff2.zipWithIndex) { buff1(i) += x } buffer1.update(0, buff1) } def evaluate(buffer: Row) = Vectors.dense( buffer.getAs[Seq[Double]](0).toArray) }And an example usage:
df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show // +---+--------------+ // | id| vec| // +---+--------------+ // | 1| [5.0,2.0,7.0]| // | 2|[10.0,8.0,4.0]| // | 3|[7.0,15.0,9.0]| // +---+--------------+See also: How to find mean of grouped Vector columns in Spark SQL?.
更多推荐
如何定义自定义聚合函数以对Vector的列求和?
发布评论