PyFlink 批处理基础实例 词频统计

编程入门 行业动态 更新时间:2024-10-20 16:05:49

01 词频统计业务逻辑

该实例的处理过程是通过 Flink 对文件存储系统里的数据进行离线批处理,统计指定文件下的单词数,并将统计结果存储到其他文件下。

该实例业务实现过程如下:

首先使用 filesystem 作为连接器,按照指定的 csv 格式来批量地读取指定路径的文件或文件夹,以此创建源数据表。然后,在 Flink 中执行批处理实例逻辑,完成批处理任务。最后,使用 filesystem 连接器,将处理后结果写入目标文件或文件夹内,构建结果表。

02 Table API 实现词频统计

Apache Flink提供了两种顶层的关系型API,分别为Table API和SQL,Flink通过Table API&SQL实现了批流统一。其中Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合关系运算符(例如select,where和join)的查询。Flink SQL基于Apache Calcite 实现了标准的SQL,用户可以使用标准的SQL处理数据集。Table API和SQL与Flink的DataStream和DataSet API紧密集成在一起,用户可以实现相互转化,比如可以将DataStream或者DataSet注册为table进行操作数据。

2.1 创建 TableEnvironment

编写Flink Python Table API程序的第一步是创建TableEnvironment,这是Python Table API作业的入口类。

一个Table必定属于一个具体的TableEnvironment,不可以将不同TableEnvironment的表放在一起使用(比如join,union等操作)

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

Flink提供了两种planner,分别为 old planner 和 blink planner,对于不同的planner而言,Table API & SQL底层的执行与转换是有所不同的。

old planner 根据是流处理作业还是批处理作业,Table API &SQL会被转换成DataStream或者DataSet程序无论是批处理作业还是流处理作业,如果使用的是 blink planner,底层都会被转换为DataStream程序

2.2 创建源表和结果表

创建源表:创建及注册表名为 source 的源表。 源表 source 只有一列: word,该表代表了从input_path所指定的输入文件中读取的单词

t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())
tab = t_env.from_path('source')

创建结果表:创建及注册表名为 sink 的结果表。结果表 sink 有两列: word和count,该表会将计算结果输出到文件 output_path所指定的输出文件中

t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())

2.3 创建 Flink 作业

创建一个 Flink 作业:该作业读取源表 source 中的数据,进行一些变换,然后将结果写入结果表 sink

@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):for s in line[0].split():yield Row(s)# 计算 word count
tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()

上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name) 被调用的时候, 作业才会被真正提交到集群或者本地进行执行

2.4 执行 Flink 作业

首先,你需要在文件 “input_path” 中准备好输入数据。如果在程序中没有指定输入数据的路径,可以选择通过如下命令准备输入数据:

echo -e  "flink\npyflink\nflink" > input_path

接下来,可以在命令行中运行作业(假设作业名为WordCount.py):

python WordCount.py

上述命令会构建Python Table API程序,并在本地mini cluster中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例。与提交到远端集群一样,也提交到本地 Flink 集群的命令运行如下

flink run -m localhost:8081 -py WordCount.py

使用浏览器访问localhost:8081查看集群处理情况如下:

最后,可以通过如下命令查看你的运行结果:

$ cat output_path
flink	2
pyflink	1

03 SQL API 实现词频统计

前面已经提到,Flink SQL是基于Apache Calcite 实现了标准的SQL,用户可以使用标准的SQL处理数据集。相较于 Table API,SQL可以直接使用DDL语句进行建表、查询等操作。

3.1 创建 TableEnvironment

使用SQL实现的第一步也是创建TableEnvironment,作为作业入口类。

TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:在内部的 catalog 中注册 Table、注册外部的 catalog、加载可插拔模块、执行 SQL 查询、注册自定义函数 (scalar、table 或 aggregation)、DataStream 和 Table 之间的转换(面向 StreamTableEnvironment )等作用。

Table 总是与特定的 TableEnvironment 绑定。 不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。 TableEnvironment 可以通过静态方法 TableEnvironment.create() 创建。

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

3.2 创建源表和结果表

使用 DDL 语句创建源表:创建及注册表名为 source 的源表。 源表 source 只有一列: word,该表代表了从input_path所指定的输入文件中读取的单词

source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')
""".format(input_path)t_env.execute_sql(source_ddl)

使用 DDL 语句创建结果表:创建及注册表名为 sink 的结果表。结果表 sink 有两列: word和count,该表会将计算结果输出到文件 output_path所指定的输出文件中

sink_ddl = """create table sink (word STRING,`count` BIGINT) with ('connector' = 'filesystem','format' = 'canal-json','path' = '{}')
""".format(output_path)t_env.execute_sql(sink_ddl)

3.3 创建 Flink 作业

创建一个 Flink 作业:该作业读取源表 source 中的数据,进行一些变换,然后将结果写入结果表 sink

@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):for s in line[0].split():yield Row(s)# 计算 word count
tab = t_env.from_path('source')
tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()

上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name) 被调用的时候, 作业才会被真正提交到集群或者本地进行执行

3.4 执行 Flink 作业

首先,你需要在文件 “input_path” 中准备好输入数据。然后,在命令行中运行作业(假设作业名为WordCount.py):

python WordCount.py

04 PyFlink 词频统计完整代码

参考资料

Flink官方文档 TableAPI & SQL

Flink官方文档 TableAPI教程

PyFlink 从入门到精通

更多推荐

词频,批处理,实例,基础,PyFlink

本文发布于:2023-05-29 23:10:06,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/355095.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:词频   批处理   实例   基础   PyFlink

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!