admin管理员组

文章数量:1579356

Flink 1.7 新特性:Temporal Tables和MATCH_RECOGNIZE

  • Flink 1.7 新特性POC:Temporal Tables和MATCH_RECOGNIZE
    • 1 Temporal Tables and Temporal Joins in Streaming SQL
      • 1.1 概念
      • 1.2 前提
        • 1) time attribute
        • 2) primary keys
        • 3) createTemporalTableFunction,registerFunction
      • 1.3 使用示例
        • 1) 输入
          • 左表(append-only)订单
          • 右表(temporal table)
        • 2) 使用
        • 3) 输出(append )
      • 1.4 限制
    • 2 MATCH_RECOGNIZE Support in Streaming SQL(CEP)
      • 2.1 概念
      • 2.2 前提(pom引入)
      • 2.3 样例
        • 1) 输入
        • 2) 使用
        • 3) 输出
      • 2.4 语义关键字解释
        • 1) PARTITION BY
        • 2) ORDER BY
        • 3) MEASURES
        • 4) Output Mode
        • 5) After Match Strategy
          • 1) 输入
          • 2) 使用
          • 3) 不同下次匹配策略的输出
        • 6) PATTERN
          • Greedy & Reluctant Quantifiers
        • 7) DEFINE
          • Logical Offsets
      • 2.5 控制内存消耗
      • 2.6 限制
    • 3 参考文献

Flink 1.7 新特性POC:Temporal Tables和MATCH_RECOGNIZE

本文主要翻译整理了Flink 1.7的新特性,原文见参考文献,还包含了一些Demo验证的小的限制结论。

1 Temporal Tables and Temporal Joins in Streaming SQL

1.1 概念

Temporal Tables(时态表):历史中某个特定时间点上表内容的视图
版本根据主键以类似Map<Key,Value>的形式存储对不同时间更新
对于一个输入的时间,返回最新的版本,即当前时间的Value为该时间最近的值。对于定义的时间属性为event-time时会保存从上一个watermark到当前为止的所有版本。

1.2 前提

1) time attribute

对输入表设置时间属性,根据输入的时间参数决定返回的表版本,根据时间对版本进行跟踪

2) primary keys

对时态表指定更新根据的主键

3) createTemporalTableFunction,registerFunction

对输入表创建时态函数,指定输入的时态表主键和时间,注册

1.3 使用示例

1) 输入
左表(append-only)订单
SELECT * FROM Orders;

o_proctime amount currency
========== ====== =========
10:15           2 Euro
10:30           1 US Dollar
10:32          50 Yen
10:52           3 Euro
11:04           5 US Dollar
右表(temporal table)
SELECT * FROM RatesHistory;

r_proctime currency   rate
========== ======== ======
09:00      US Dollar   102
09:00      Euro        114
09:00      Yen           1
10:45      Euro        116
11:15      Euro        119
11:49      Pounds      108
2) 使用

Java:

Table orders = tEnv.fromDataStream(ordersStream, "amount, currency, o_proctime.proctime");
tEnv.registerTable("Orders", orders);
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "currency, rate, r_proctime.proctime");
tEnv.registerTable("RatesHistory", ratesHistory);

TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); 
tEnv.registerFunction("Rates", rates);

SQL:

SELECT
  o.o_proctime,
  o.amount AS n_amount,
  r.rate AS rate,
  o.currency AS currency
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o_proctime)) AS r
WHERE
  r.currency = o.currency

-------JOIN ON 形式 上下两种效果相同
SELECT
  o_proctime,
  o.amount AS n_amount,
  r.rate AS rate,
  o.currency AS currency
FROM
  Orders AS o,
  JOIN LATERAL TABLE (Rates(o_proctime)) AS r
ON
  r.currency = o.currency
3) 输出(append )
o_proctime amount rate currency
========== ====== ==== ==&#

本文标签: 新特性FlinkMATCHRECOGNIZETablesTemporal