4.2.3 Flink-流处理框架-Table API 与 SQL-流转表+表转流+创建临时视图(Temporary View)

编程入门 行业动态 更新时间:2024-10-27 06:30:11

目录

1.DataStream流转表Table

1.1 基本概念

1.2 实现代码

1.3 数据类型与Table schema 的对应

2.表转流

2.1 基本概念

2.2 Table API 中表到 DataStream 有两种模式

2.3 实现代码

3.创建临时视图(Temporary View)


1.DataStream流转表Table

1.1 基本概念

        Flink 允许我们把 Table 和 DataStream 做转换:我们可以基于一个 DataStream,先流式 地读取数据源,然后 map 成 POJO,再把它转成 Table。Table 的列字段(column fields),就是 POJO 里的字段,这样就不用再麻烦地定义 schema 了。

1.2 实现代码

        代码中实现非常简单,直接用 tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次 map 操作(或者 Table API 的 select 操作)。

        代码具体如下:

DataStream<String> inputStream = env.readTextFile("sensor.txt");
DataStream<SensorReading> dataStream = inputStream.map( line -> {String[] fields = line.split(",");
    return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    } );
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");

1.3 数据类型与Table schema 的对应

        在上节的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照类中的字段名来对应的(name-based mapping),所以还可以用 as 做重命名。

Table sensorTable = tableEnv.fromDataStream(dataStream, "timestamp as ts, id as myId, temperature");

        Flink 的 DataStream 和 DataSet API 支持多种类型。

        组合类型,比如元组(内置 Scala 和 Java 元组)、POJO、Scala case 类和 Flink 的 Row 类型等,允许具有多个字段的嵌套数据结构,这些字段可以在 Table 的表达式中访问。其他类型,则被视为原子类型。

2.表转流

2.1 基本概念

        表可以转换为 DataStream 或 DataSet。这样,自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了。将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型。通常,最方便的转换类型就是 Row。当然,因为结果的所有字段类型都是明确的,我们也经常会用元组类型来表示。

2.2 Table API 中表到 DataStream 有两种模式

        表作为流式查询的结果,是动态更新的。所以,将这种动态查询转换成的数据流,同样 需要对表的更新操作进行编码,进而有不同的转换模式。

2.3 实现代码

        没有经过 groupby 之类聚合操作,可以直接用 toAppendStream 来转换;而如果经过了聚合,有更新操作,一般就必须用 toRetractDstream

DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable, Row.class);

resultStream.print("result");
aggResultStream.print("aggResult");

3.创建临时视图(Temporary View)

        创建临时视图的第一种方式,就是直接从 DataStream 转换而来。同样,可以直接对应字段转换;也可以在转换的时候,指定相应的字段。代码如下:

tableEnv.createTemporaryView("sensorView", dataStream);
tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, timestamp as ts");

        另外,当然还可以基于 Table 创建视图

tableEnv.createTemporaryView("sensorView", sensorTable);

更多推荐

4.2.3 Flink-流处理框架-Table API 与 SQL-流转表+表转流+创建临时视图(Temporary View)

本文发布于:2023-06-14 07:45:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1452899.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:视图   框架   Table   Flink   API

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!