SparkSQL 自定义UDAF聚合函数

编程入门 行业动态 更新时间:2024-10-28 15:35:20

SparkSQL <a href=https://www.elefans.com/category/jswz/34/1771438.html style=自定义UDAF聚合函数"/>

SparkSQL 自定义UDAF聚合函数

在spark-shell中简单定义UDF函数

scala> val df = spark.read.json("people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Park |
|  30|Tom  |
|  19| JSON|
+----+-------+scala> spark.udf.register("addName", (x:String)=> "Name:"+x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))scala> df.createOrReplaceTempView("people")scala> spark.sql("Select addName(name), age from people").show()
+-----------------+----+
|UDF:addName(name)| age|
+-----------------+----+
|     Name:Park|null|
|      Name:Tom|  30|
|     Name:JSON|  19|
+-----------------+----+

强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。
弱类型用户自定义聚合函数:通过继承 来实现用户自定义聚合函数。下面展示一个求平均工资的自定义聚合函数

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/*** 自定义UDAF函数求平均值* 用户自定义聚合函数*/class CustomUDAF extends UserDefinedAggregateFunction {//聚合函数输入参数的数据类型override def inputSchema: StructType = {StructType(StructField("inputColumn", LongType) :: Nil)}//聚合缓冲区中值的类型override def bufferSchema: StructType = {StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)}//返回值的类型override def dataType: DataType = {DoubleType}//幂等性 对于相同的输出是否一值返回相同的输出override def deterministic: Boolean = {true}//初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 1L}//相同Executor之间的数据合并override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (input.isNullAt(0)) {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}}//不同Executor之间的数据合并override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}//计算最终的结果override def evaluate(buffer: Row): Any = {//求平均值buffer.getLong(0).toDouble / buffer.getLong(1)}
}object CustomUDAF {val conf: SparkConf = new SparkConf().setAppName("customUDF").setMaster("local[*]")val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()spark.udf.register("myAvage", new CustomUDAF)val df: DataFrame = spark.read.json("examples/src/main/resources/employees.json")df.createGlobalTempView("people")df.show()val result: DataFrame = spark.sql("select myAvage(salary) as avgSalary from people")result.show()
}

强类型用户自定义聚合函数:通过继承Aggregator来实现强类型自定义聚合函数,同样是求平均工资

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
// 既然是强类型,可能有case类
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Employee, Average, Double] {
// 定义一个数据结构,保存工资总数和工资总个数,初始都为0
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// 聚合不同execute的结果
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 计算输出
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 设定之间值类型的编码器,要转换成case类
// Encoders.product是进行scala元组和case类转换的编码器 
def bufferEncoder: Encoder[Average] = Encoders.product
// 设定最终输出值的编码器
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}import spark.implicits._val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
+--------------+
|average_salary|
+--------------+
|        3750.0|
+--------------+

更多推荐

SparkSQL 自定义UDAF聚合函数

本文发布于:2023-07-28 20:15:41,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1298202.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:自定义   函数   SparkSQL   UDAF

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!