雪花中如何将增量更新数据转换为结构化表格

编程入门 行业动态 更新时间:2024-10-28 20:28:53
本文介绍了雪花中如何将增量更新数据转换为结构化表格的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有以下来自S3 Avro格式的增量交易数据。

{ "after": { "COM_PCT": null, "DEPT_ID": 30, "EMAIL": "AKHOO", "EMPLOYEE_ID": 115, "FIRST_NAME": "ALEX", "LAST_NAME": "TIM", "HIRE": "1995-05-18 00:00:00", "MANAGER_ID": 114 }, "before": {}, "current_ts": "2018-05-18 00:00:00:00", "op_ts": "2018-05-18 00:00:00:00", "op_type": "I", "pos": "00000000001123", "primary_keys": ["EMPLOYEE_ID"], "table": "HR.EMPLOYEE" }, { "after": { "COM_PCT": null, "DEPT_ID": 11, "EMAIL": "AKHOO", "EMPLOYEE_ID": 115, "FIRST_NAME": "ALEX", "LAST_NAME": "TIM", "HIRE": "1995-05-18 00:00:00", "MANAGER_ID": 114 }, "before": {}, "current_ts": "2018-05-19 00:00:00:00", "op_ts": "2018-05-19 00:00:00:00", "op_type": "U", "pos": "00000000001124", "primary_keys": ["EMPLOYEE_ID"], "table": "HR.EMPLOYEE" }, { "after": { "COM_PCT": null, "DEPT_ID": 30, "EMAIL": "AKHOO", "EMPLOYEE_ID": 115, "FIRST_NAME": "ALEX", "LAST_NAME": "TIM", "HIRE": "1995-05-18 00:00:00", "MANAGER_ID": 114 }, "before": {}, "current_ts": "2018-05-20 00:00:00:00", "op_ts": "2018-05-20 00:00:00:00", "op_type": "U", "pos": "00000000001125", "primary_keys": ["EMPLOYEE_ID"], "table": "HR.EMPLOYEE" } 第一个事务是同一主键的插入事务, 第二个是更新事务, 我无法使用流管道处理增量更新,是否有办法将其转换为结构化表格,并仅显示该主键的最新插入/更新事务?

推荐答案

我假设该文件只包含一个表的更改,如果不包含,则您只需要对特定表的更改进行过滤操作。我还假设数据是表中的VARIANT类型,但这对您如何提取数据的解决方案没有影响。

我建议此解决方案:

  • 一开始,您应该使用QUALIFY函数并将数据过滤到记录的最新版本。
  • 然后您可以执行合并操作以插入或更新记录。
  • 如果您的数据还允许删除操作,则它们应该包含在代码中。

示例数据:

CREATE OR REPLACE TABLE SAMPLE_RAW (samples variant); INSERT INTO SAMPLE_RAW SELECT parse_json('{ "after": { "COM_PCT": null, "DEPT_ID": 30, "EMAIL": "AKHOO", "EMPLOYEE_ID": 115, "FIRST_NAME": "ALEX", "LAST_NAME": "TIM", "HIRE": "1995-05-18 00:00:00", "MANAGER_ID": 114 }, "before": {}, "current_ts": "2018-05-18 00:00:00:00", "op_ts": "2018-05-18 00:00:00:00", "op_type": "I", "pos": "00000000001123", "primary_keys": ["EMPLOYEE_ID"], "table": "HR.EMPLOYEE" }') UNION ALL SELECT parse_json('{ "after": { "COM_PCT": null, "DEPT_ID": 11, "EMAIL": "AKHOO", "EMPLOYEE_ID": 115, "FIRST_NAME": "ALEX", "LAST_NAME": "TIM", "HIRE": "1995-05-18 00:00:00", "MANAGER_ID": 114 }, "before": {}, "current_ts": "2018-05-19 00:00:00:00", "op_ts": "2018-05-19 00:00:00:00", "op_type": "U", "pos": "00000000001124", "primary_keys": ["EMPLOYEE_ID"], "table": "HR.EMPLOYEE" }') UNION ALL SELECT parse_json('{ "after": { "COM_PCT": null, "DEPT_ID": 30, "EMAIL": "AKHOO", "EMPLOYEE_ID": 115, "FIRST_NAME": "ALEX", "LAST_NAME": "TIM", "HIRE": "1995-05-18 00:00:00", "MANAGER_ID": 114 }, "before": {}, "current_ts": "2018-05-20 00:00:00:00", "op_ts": "2018-05-20 00:00:00:00", "op_type": "U", "pos": "00000000001125", "primary_keys": ["EMPLOYEE_ID"], "table": "HR.EMPLOYEE" }');

解决方案:

WITH src AS ( SELECT TO_TIMESTAMP(s.samples:op_ts::string, 'YYYY-MM-DD HH24:MI:SS:FF') AS op_ts , s.samples:op_type::string AS op_type , s.samples:after:COM_PCT::string AS COM_PCT , s.samples:after:DEPT_ID As DEPT_ID , s.samples:after:EMAIL::string As EMAIL , s.samples:after:EMPLOYEE_ID As EMPLOYEE_ID , s.samples:after:FIRST_NAME::string As FIRST_NAME , s.samples:after:LAST_NAME::string AS LAST_NAME , TO_TIMESTAMP(s.samples:after:HIRE::string, 'YYYY-MM-DD HH24:MI:SS') As HIRE , s.samples:after:MANAGER_ID AS MANAGER_ID FROM SAMPLE_RAW AS s QUALIFY ROW_NUMBER() OVER(PARTITION BY EMPLOYEE_ID ORDER BY op_ts DESC) = 1 ) MERGE INTO HR.EMPLOYEE AS trg USING src ON trg.EMPLOYEE_ID = src.EMPLOYEE_ID WHEN MATCHED AND src.op_type = 'U' THEN UPDATE SET trg.EMAIL = src.EMAIL ... WHEN MATCHED AND src.op_type = 'D' THEN DELETE WHEN NOT MATCHED THEN INSERT (EMAIL, FIRST_NAME, LAST_NAME, ...) VALUES (src.EMAIL, src.FIRST_NAME, src.LAST_NAME, ...)

引用:QUALIFY,MERGE

更多推荐

雪花中如何将增量更新数据转换为结构化表格

本文发布于:2023-10-28 08:57:53,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1536159.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:增量   转换为   如何将   雪花   结构化

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!