Spark SQL:无法在窗口函数内使用聚合

编程入门 行业动态 更新时间:2024-10-28 20:23:03
本文介绍了Spark SQL:无法在窗口函数内使用聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我使用此SQL为数据集创建一个session_id.如果用户不活动超过30分钟(30 * 60秒),则为Spark SQL分配一个新的session_id(我是新用户),并尝试使用Spark SQL上下文复制相同的过程.但是我遇到了一些错误.

I use this SQL to create a session_id for a dataset. If a user is inactive for more than 30 minutes (30*60 seconds), then a new session_id is assigned I am new to Spark SQL and trying to replicate the same procedure using Spark SQL Context. But I'm encountering some errors.

session_id遵循命名约定:userid_1,userid_2,userid_3,...

session_id follows the naming convention: userid_1, userid_2, userid_3,...

SQL(日期以秒为单位):

SQL (date is in seconds):

CREATE TABLE tablename_with_session_id AS SELECT * , userid || '_' || SUM(new_session) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS session_id FROM (SELECT *, CASE WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) THEN 1 WHEN row_number() over (partition by userid order by date) = 1 THEN 1 ELSE 0 END as new_session FROM tablename ) order by date;

我尝试在Spark-Scala中使用以下相同的SQL:

I tried using the same SQL in Spark-Scala with:

val sqlContext = new org.apache.spark.sql.SQLContext(sc) val tableSessionID = sqlContext.sql("SELECT * , CONCAT(userid,'_',SUM(new_session)) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS new_session_id FROM (SELECT *, CASE WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) THEN 1 WHEN row_number() over (partition by userid order by date) = 1 THEN 1 ELSE 0 END as new_session FROM clickstream) order by date")

一些错误,建议将Spark SQL表达式..sum(new_session)..包装在窗口函数中.

Some Error which suggested to wrap Spark SQL expression ..sum(new_session).. within window function.

我尝试使用多个数据框:

I tried to using multiple data frames:

val temp1 = sqlContext.sql("SELECT *, CASE WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) THEN 1 WHEN row_number() over (partition by userid order by date) = 1 THEN 1 ELSE 0 END as new_session FROM clickstream") temp1.registerTempTable("clickstream_temp1") val temp2 = sqlContext.sql("SELECT * , SUM(new_session) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS s_id FROM clickstream_temp1") temp2.registerTempTable("clickstream_temp2") val temp3 = sqlContext.sql("SELECT * , CONCAT(userid,'_',s_id) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS new_session_id FROM clickstream_temp2")

仅在上述语句上返回错误.``val temp3 = ...''该CONCAT(userid,'_',s_id)不能在窗口函数中使用.

It returns an error only on the above statement. 'val temp3 = ...' That CONCAT(userid,'_',s_id) cannot be used within window function.

解决方法是什么?有其他选择吗?

What's the workaround? Is there an alternative?

谢谢

推荐答案

要将concat与火花窗口函数一起使用,您需要使用用户定义的聚合函数(UDAF).您不能直接将concat函数与window函数一起使用.

To use concat with spark window function you need to use user defined aggregate function(UDAF). You can't directly use concat function with window function.

//Extend UserDefinedAggregateFunction to write custom aggregate function //You can also specify any constructor arguments. For instance you can have //CustomConcat(arg1: Int, arg2: String) class CustomConcat() extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction { import org.apache.spark.sql.types._ import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.Row // Input Data Type Schema def inputSchema: StructType = StructType(Array(StructField("description", StringType))) // Intermediate Schema def bufferSchema = StructType(Array(StructField("groupConcat", StringType))) // Returned Data Type. def dataType: DataType = StringType // Self-explaining def deterministic = true // This function is called whenever key changes def initialize(buffer: MutableAggregationBuffer) = {buffer(0) = " ".toString} // Iterate over each entry of a group def update(buffer: MutableAggregationBuffer, input: Row) = { buffer(0) = buffer.getString(0) + input.getString(0) } // Merge two partial aggregates def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { buffer1(0) = buffer1.getString(0) + buffer2.getString(0) } // Called after all the entries are exhausted. def evaluate(buffer: Row) = {buffer.getString(0)} } val newdescription = new CustomConcat val newdesc1=newdescription($"description").over(windowspec)

您可以将newdesc1用作聚合函数,以在窗口函数中进行串联.有关更多信息,请参见: databricks udaf 我希望这能回答您的问题.

You can use newdesc1 as an aggregate function for concatenation in window functions. For more information you can have a look at : databricks udaf I hope this will answer your question.

更多推荐

Spark SQL:无法在窗口函数内使用聚合

本文发布于:2023-10-24 18:16:20,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1524650.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:函数   窗口   Spark   SQL

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!