自定义UDAF聚合函数"/>
SparkSQL 自定义UDAF聚合函数
在spark-shell中简单定义UDF函数
scala> val df = spark.read.json("people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Park |
| 30|Tom |
| 19| JSON|
+----+-------+scala> spark.udf.register("addName", (x:String)=> "Name:"+x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))scala> df.createOrReplaceTempView("people")scala> spark.sql("Select addName(name), age from people").show()
+-----------------+----+
|UDF:addName(name)| age|
+-----------------+----+
| Name:Park|null|
| Name:Tom| 30|
| Name:JSON| 19|
+-----------------+----+
强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。
弱类型用户自定义聚合函数:通过继承 来实现用户自定义聚合函数。下面展示一个求平均工资的自定义聚合函数
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/*** 自定义UDAF函数求平均值* 用户自定义聚合函数*/class CustomUDAF extends UserDefinedAggregateFunction {//聚合函数输入参数的数据类型override def inputSchema: StructType = {StructType(StructField("inputColumn", LongType) :: Nil)}//聚合缓冲区中值的类型override def bufferSchema: StructType = {StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)}//返回值的类型override def dataType: DataType = {DoubleType}//幂等性 对于相同的输出是否一值返回相同的输出override def deterministic: Boolean = {true}//初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 1L}//相同Executor之间的数据合并override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (input.isNullAt(0)) {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}}//不同Executor之间的数据合并override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}//计算最终的结果override def evaluate(buffer: Row): Any = {//求平均值buffer.getLong(0).toDouble / buffer.getLong(1)}
}object CustomUDAF {val conf: SparkConf = new SparkConf().setAppName("customUDF").setMaster("local[*]")val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()spark.udf.register("myAvage", new CustomUDAF)val df: DataFrame = spark.read.json("examples/src/main/resources/employees.json")df.createGlobalTempView("people")df.show()val result: DataFrame = spark.sql("select myAvage(salary) as avgSalary from people")result.show()
}
强类型用户自定义聚合函数:通过继承Aggregator来实现强类型自定义聚合函数,同样是求平均工资
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
// 既然是强类型,可能有case类
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Employee, Average, Double] {
// 定义一个数据结构,保存工资总数和工资总个数,初始都为0
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// 聚合不同execute的结果
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 计算输出
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 设定之间值类型的编码器,要转换成case类
// Encoders.product是进行scala元组和case类转换的编码器
def bufferEncoder: Encoder[Average] = Encoders.product
// 设定最终输出值的编码器
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}import spark.implicits._val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
+--------------+
|average_salary|
+--------------+
| 3750.0|
+--------------+
更多推荐
SparkSQL 自定义UDAF聚合函数
发布评论