Flink SQL Over 聚合详解

编程入门 行业动态 更新时间:2024-10-22 17:21:50

Flink SQL Over 聚合<a href=https://www.elefans.com/category/jswz/34/1770044.html style=详解"/>

Flink SQL Over 聚合详解

Over 聚合定义(⽀持 Batch\Streaming):**特殊的滑动窗⼝聚合函数,拿 Over 聚合 与 窗⼝聚合 做对⽐。

窗⼝聚合:不在 group by 中的字段,不能直接在 select 中拿到

Over 聚合:能够保留原始字段

注意: ⽣产环境中,Over 聚合的使⽤场景较少。

**应⽤场景:**计算最近⼀段滑动窗⼝的聚合结果数据。

**实际案例:**查询每个产品最近⼀⼩时订单的⾦额总和:

SELECT order_id,order_time,amount,SUM(amount) OVER (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM Orders

Over 聚合语法如下:

SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),...
FROM ...

ORDER BY:必须是时间戳列(事件时间、处理时间);

PARTITION BY:标识了聚合窗⼝的聚合粒度,如上述案例是按照 product 进⾏聚合;

range_definition:标识聚合窗⼝的聚合数据范围,在 Flink 中有两种指定数据范围的⽅式。第⼀种为 按照⾏数聚合 ,第⼆种为 按照时间区间聚合 。

1.时间区间聚合

**案例:**输出一个产品最近⼀⼩时数据的 amount 之和。

结果就是最近⼀⼩时数据的 amount 之和。

CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.order_id.min' = '1','fields.order_id.max' = '2','fields.amount.min' = '1','fields.amount.max' = '10','fields.product.min' = '1','fields.product.max' = '2'
);CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT product,order_time,amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是⼀个 product 的最近 1 ⼩时的数据RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM source_table

结果如下:

+I[2, 2021-12-24T22:08:26.583, 7, 73]
+I[2, 2021-12-24T22:08:27.583, 7, 80]
+I[2, 2021-12-24T22:08:28.583, 4, 84]
+I[2, 2021-12-24T22:08:29.584, 7, 91]
+I[2, 2021-12-24T22:08:30.583, 8, 99]
+I[1, 2021-12-24T22:08:31.583, 9, 138]
+I[2, 2021-12-24T22:08:32.584, 6, 105]
+I[1, 2021-12-24T22:08:33.584, 7, 145]
2.⾏数聚合

**案例:**输出一个产品最近 5 ⾏数据的 amount 之和。

CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.order_id.min' = '1','fields.order_id.max' = '2','fields.amount.min' = '1','fields.amount.max' = '2','fields.product.min' = '1','fields.product.max' = '2'
);CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT product,order_time,amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是⼀个 product 的最近 5 ⾏数据ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM source_table

结果如下:

+I[2, 2021-12-24T22:18:19.147, 1, 9]
+I[1, 2021-12-24T22:18:20.147, 2, 11]
+I[1, 2021-12-24T22:18:21.147, 2, 12]
+I[1, 2021-12-24T22:18:22.147, 2, 12]
+I[1, 2021-12-24T22:18:23.148, 2, 12]
+I[1, 2021-12-24T22:18:24.147, 1, 11]
+I[1, 2021-12-24T22:18:25.146, 1, 10]
+I[1, 2021-12-24T22:18:26.147, 1, 9]
+I[2, 2021-12-24T22:18:27.145, 2, 11]
+I[2, 2021-12-24T22:18:28.148, 1, 10]
+I[2, 2021-12-24T22:18:29.145, 2, 10]

在⼀个 SELECT 中有多个聚合窗⼝,简化写法如下:

SELECT order_id,order_time,amount,SUM(amount) OVER w AS sum_amount,AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使⽤下⾯⼦句,定义 Over Window
WINDOW w AS (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)

更多推荐

Flink SQL Over 聚合详解

本文发布于:2023-11-17 03:28:42,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1636564.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:详解   Flink   SQL

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!