PyFlink Table API 基础程序结构

编程入门 行业动态 更新时间:2024-10-20 07:54:10

01 Python Table API 程序的基本结构

所有的 Table API 和 SQL 程序,不管批模式,还是流模式,都遵循相同的结构。

首先创建 TableEnvironment然后创建输入输出表接着基于输入表做查询并计算最后将计算结果写入输出表

下面代码示例展示了上述 Table API 和 SQL 程序的基本结构:

from pyflink.table import EnvironmentSettings, TableEnvironment# 1. 创建 TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings) # 2. 创建 source 表
table_env.execute_sql("""CREATE TABLE datagen (id INT,data STRING) WITH ('connector' = 'datagen','fields.id.kind' = 'sequence','fields.id.start' = '1','fields.id.end' = '10')
""")# 3. 创建 sink 表
table_env.execute_sql("""CREATE TABLE print (id INT,data STRING) WITH ('connector' = 'print')
""")# 4. 查询 source 表,同时执行计算
# 通过 Table API 创建一张表:
source_table = table_env.from_path("datagen")
# 或者通过 SQL 查询语句创建一张表:
source_table = table_env.sql_query("SELECT * FROM datagen")result_table = source_table.select(source_table.id + 1, source_table.data)# 5. 将计算结果写入给 sink 表
# 将 Table API 结果表数据写入 sink 表:
result_table.execute_insert("print").wait()
# 或者通过 SQL 查询语句来写入 sink 表:
table_env.execute_sql("INSERT INTO print SELECT * FROM datagen").wait()

02 创建 TableEnvironment

TableEnvironment 是 Table API 和 SQL 集成的核心概念

关于创建 TableEnvironment 的更多细节,请查阅 TableEnvironment 文档。

TableEnvironment 可以用来:

创建 Table将 Table 注册成临时表执行 SQL 查询,更多细节可查阅 SQL注册用户自定义的 (标量,表值,或者聚合) 函数配置作业管理 Python 依赖提交作业执行

下面代码示例展示了如何创建一个 TableEnvironment:

from pyflink.table import EnvironmentSettings, TableEnvironment# create a streaming TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)# or create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

03 创建表

Table 作为 Python Table API 的核心组件,是 Table API 作业中间结果的逻辑表示

一个 Table 实例总是与一个特定的 TableEnvironment 相绑定

不支持在同一个查询中合并来自不同 TableEnvironments 的表,例如 join 或者 union 它们

PyFlink 提供了三种方式创建表:

通过列表类型的对象创建通过 DDL 创建通过 Catalog 创建

3.1 通过列表类型的对象创建表

可以通过使用一个列表对象输入静态数据创建一张表

from pyflink.table import EnvironmentSettings, TableEnvironment# 创建 批 TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
table.to_pandas()

结果为:

   _1     _2
0   1     Hi
1   2  Hello

你也可以创建具有指定列名的表:

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table.to_pandas()

结果为:

   id   data
0   1     Hi
1   2  Hello

默认情况下,表结构是从数据中自动提取的。

如果自动生成的表模式不符合你的要求,你也可以手动指定:

table_without_schema = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
# 默认情况下,“id” 列的类型是 64 位整型
default_type = table_without_schema.to_pandas()["id"].dtype
print('By default the type of the "id" column is %s.' % default_type)from pyflink.table import DataTypes
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),DataTypes.FIELD("data", DataTypes.STRING())]))
# 现在 “id” 列的类型是 8 位整型
type = table.to_pandas()["id"].dtype
print('Now the type of the "id" column is %s.' % type)

结果为:

默认情况下,“id” 列的类型是 64 位整型。
现在 “id” 列的类型是 8 位整型。

3.2 通过 DDL 创建表

使用 SQL 通过 DDL 创建一张表:

from pyflink.table import EnvironmentSettings, TableEnvironment# 创建 流 TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)table_env.execute_sql("""CREATE TABLE random_source (id BIGINT, data TINYINT ) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='3','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='6')
""")
table = table_env.from_path("random_source")
table.to_pandas()

结果为:

   id  data
0   2     5
1   1     4
2   3     6

3.3 通过 Catalog 创建表

TableEnvironment 维护了一个使用标识符创建的表的 catalogs 映射。

Catalog 中的表既可以是临时的,并与单个 Flink 会话生命周期相关联,也可以是永久的,跨多个 Flink 会话可见。

通过 SQL DDL 创建的表和视图, 例如 “create table …” 和 “create view …",都存储在 catalog 中。

你可以通过 SQL 直接访问 catalog 中的表。

如果你要用 Table API 来使用 catalog 中的表,可以使用 “from_path” 方法来创建 Table API 对象:

# 准备 catalog
# 将 Table API 表注册到 catalog 中
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('source_table', table)# 从 catalog 中获取 Table API 表
new_table = table_env.from_path('source_table')
new_table.to_pandas()

结果为:

   id   data
0   1     Hi
1   2  Hello

04 查询

4.1 Table API 查询

Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(…).select(…)。

Table API 文档描述了流和批处理上所有支持的 Table API 操作。

以下示例展示了一个简单的 Table API 聚合查询:

from pyflink.table import EnvironmentSettings, TableEnvironment# 通过 batch table environment 来执行查询
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],['name', 'country', 'revenue'])# 计算所有来自法国客户的收入
revenue = orders \.select(orders.name, orders.country, orders.revenue) \.where(orders.country == 'FRANCE') \.group_by(orders.name) \.select(orders.name, orders.revenue.sum.alias('rev_sum'))revenue.to_pandas()

结果为:

   name  rev_sum
0  Jack       30

Table API 也支持 行操作的 API, 这些行操作包括 Map Operation, FlatMap Operation, Aggregate Operation 和 FlatAggregate Operation

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import pandas as pd# 通过 batch table environment 来执行查询
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],['name', 'country', 'revenue'])map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),result_type=DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("revenue", DataTypes.BIGINT())]),func_type="pandas")orders.map(map_function).alias('name', 'revenue').to_pandas()

结果为:

   name  revenue
0  Jack      100
1  Rose      300
2  Jack      200

4.2 SQL 查询

Flink 的 SQL 基于 Apache Calcite,它实现了标准的 SQL。SQL 查询语句使用字符串来表达。

SQL 文档描述了 Flink 对流和批处理所支持的 SQL。

下面示例展示了一个简单的 SQL 聚合查询:

from pyflink.table import EnvironmentSettings, TableEnvironment# 通过 stream table environment 来执行查询
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)table_env.execute_sql("""CREATE TABLE random_source (id BIGINT, data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='8','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='11')
""")table_env.execute_sql("""CREATE TABLE print_sink (id BIGINT, data_sum TINYINT ) WITH ('connector' = 'print')
""")table_env.execute_sql("""INSERT INTO print_sinkSELECT id, sum(data) as data_sum FROM (SELECT id / 2 as id, data FROM random_source)WHERE id > 1GROUP BY id
""").wait()

结果为:

2> +I(4,11)
6> +I(2,8)
8> +I(3,10)
6> -U(2,8)
8> -U(3,10)
6> +U(2,15)
8> +U(3,19)

实际上,上述输出展示了 print 结果表所接收到的 change log。 change log 的格式为: {subtask id}> {消息类型}{值的字符串格式}

例如,“2> +I(4,11)” 表示这条消息来自第二个 subtask,其中 “+I” 表示这是一条插入的消息,"(4, 11)” 是这条消息的内容。 另外,"-U" 表示这是一条撤回消息 (即更新前),这意味着应该在 sink 中删除或撤回该消息。 “+U” 表示这是一条更新的记录 (即更新后),这意味着应该在 sink 中更新或插入该消息。

所以,从上面的 change log,我们可以得到如下结果:

(4, 11)
(2, 15) 
(3, 19)

4.3 Table API 和 SQL 的混合使用

Table API 中的 Table 对象和 SQL 中的 Table 可以自由地相互转换。

下面例子展示了如何在 SQL 中使用 Table 对象:

# 创建一张 sink 表来接收结果数据
table_env.execute_sql("""CREATE TABLE table_sink (id BIGINT, data VARCHAR ) WITH ('connector' = 'print')
""")# 将 Table API 表转换成 SQL 中的视图
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('table_api_table', table)# 将 Table API 表的数据写入结果表
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()

结果为:

6> +I(1,Hi)
6> +I(2,Hello)

下面例子展示了如何在 Table API 中使用 SQL 表:

# 创建一张 SQL source 表
table_env.execute_sql("""CREATE TABLE sql_source (id BIGINT, data TINYINT ) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='4','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='7')
""")# 将 SQL 表转换成 Table API 表
table = table_env.from_path("sql_source")# 或者通过 SQL 查询语句创建表
table = table_env.sql_query("SELECT * FROM sql_source")# 将表中的数据写出
table.to_pandas()

结果为:

   id  data
0   2     5
1   1     4
2   4     7
3   3     6

05 输出结果

5.1 将结果数据收集到客户端

你可以使用 TableResult.collect 将 Table 的结果收集到客户端,结果的类型为迭代器类型。

以下代码展示了如何使用 TableResult.collect() 方法:

# 准备 source 表
source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])# 得到 TableResult
res = table_env.execute_sql("select a + 1, b, c from %s" % source)# 遍历结果
with res.collect() as results:for result in results:print(result)

结果为:

<Row(2, 'Hi', 'Hello')>
<Row(3, 'Hello', 'Hello')>

5.2 将结果数据转换为Pandas DataFrame,并收集到客户端

你可以调用 “to_pandas” 方法来 将一个 Table 对象转化成 pandas DataFrame👍

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table.to_pandas()

结果为:

   id   data
0   1     Hi
1   2  Hello

5.3 将结果写入到一张 Sink 表中

你可以调用 “execute_insert” 方法来将 Table 对象中的数据写入到一张 sink 表中:

table_env.execute_sql("""CREATE TABLE sink_table (id BIGINT, data VARCHAR ) WITH ('connector' = 'print')
""")table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table.execute_insert("sink_table").wait()

也可以通过 SQL 来完成:

table_env.create_temporary_view("table_source", table)
table_env.execute_sql("INSERT INTO sink_table SELECT * FROM table_source").wait()

结果为:

6> +I(1,Hi)
6> +I(2,Hello)

5.4 将结果写入多张 Sink 表中

你也可以使用 StatementSet 在一个作业中将 Table 中的数据写入到多张 sink 表中:

# 准备 source 表和 sink 表
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view("simple_source", table)
table_env.execute_sql("""CREATE TABLE first_sink_table (id BIGINT, data VARCHAR ) WITH ('connector' = 'print')
""")
table_env.execute_sql("""CREATE TABLE second_sink_table (id BIGINT, data VARCHAR) WITH ('connector' = 'print')
""")# 创建 statement set
statement_set = table_env.create_statement_set()# 将 "table" 的数据写入 "first_sink_table"
statement_set.add_insert("first_sink_table", table)# 通过一条 sql 插入语句将数据从 "simple_source" 写入到 "second_sink_table"
statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")# 执行 statement set
statement_set.execute().wait()

结果为:

7> +I(1,Hi)
7> +I(1,Hi)
7> +I(2,Hello)
7> +I(2,Hello)

参考资料

Flink 官方文档:Python Table API 简介

更多推荐

结构,基础,程序,PyFlink,Table

本文发布于:2023-05-29 23:09:54,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/355093.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:结构   基础   程序   PyFlink   Table

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!