01 词频统计业务逻辑
该实例的处理过程是通过 Flink 对文件存储系统里的数据进行离线批处理,统计指定文件下的单词数,并将统计结果存储到其他文件下。
该实例业务实现过程如下:
首先使用filesystem
作为连接器,按照指定的 csv
格式来批量地读取指定路径的文件或文件夹,以此创建源数据表。然后,在 Flink 中执行批处理实例逻辑,完成批处理任务。最后,使用 filesystem
连接器,将处理后结果写入目标文件或文件夹内,构建结果表。
02 Table API 实现词频统计
Apache Flink提供了两种顶层的关系型API,分别为Table API和SQL,Flink通过Table API&SQL实现了批流统一。其中Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合关系运算符(例如select,where和join)的查询。Flink SQL基于Apache Calcite 实现了标准的SQL,用户可以使用标准的SQL处理数据集。Table API和SQL与Flink的DataStream和DataSet API紧密集成在一起,用户可以实现相互转化,比如可以将DataStream或者DataSet注册为table进行操作数据。
2.1 创建 TableEnvironment
编写Flink Python Table API程序的第一步是创建TableEnvironment,这是Python Table API作业的入口类。
一个Table必定属于一个具体的TableEnvironment,不可以将不同TableEnvironment的表放在一起使用(比如join,union等操作)
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
Flink提供了两种planner,分别为 old planner 和 blink planner,对于不同的planner而言,Table API & SQL底层的执行与转换是有所不同的。
old planner 根据是流处理作业还是批处理作业,Table API &SQL会被转换成DataStream或者DataSet程序无论是批处理作业还是流处理作业,如果使用的是 blink planner,底层都会被转换为DataStream程序2.2 创建源表和结果表
创建源表:创建及注册表名为 source 的源表。 源表 source 只有一列: word,该表代表了从input_path
所指定的输入文件中读取的单词
t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())
tab = t_env.from_path('source')
创建结果表:创建及注册表名为 sink 的结果表。结果表 sink 有两列: word和count,该表会将计算结果输出到文件 output_path
所指定的输出文件中
t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())
2.3 创建 Flink 作业
创建一个 Flink 作业:该作业读取源表 source 中的数据,进行一些变换,然后将结果写入结果表 sink
@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):for s in line[0].split():yield Row(s)# 计算 word count
tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()
上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name)
被调用的时候, 作业才会被真正提交到集群或者本地进行执行
2.4 执行 Flink 作业
首先,你需要在文件 “input_path” 中准备好输入数据。如果在程序中没有指定输入数据的路径,可以选择通过如下命令准备输入数据:
echo -e "flink\npyflink\nflink" > input_path
接下来,可以在命令行中运行作业(假设作业名为WordCount.py):
python WordCount.py
上述命令会构建Python Table API程序,并在本地mini cluster中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例。与提交到远端集群一样,也提交到本地 Flink 集群的命令运行如下
flink run -m localhost:8081 -py WordCount.py
使用浏览器访问localhost:8081
查看集群处理情况如下:
最后,可以通过如下命令查看你的运行结果:
$ cat output_path
flink 2
pyflink 1
03 SQL API 实现词频统计
前面已经提到,Flink SQL是基于Apache Calcite 实现了标准的SQL,用户可以使用标准的SQL处理数据集。相较于 Table API,SQL可以直接使用DDL语句进行建表、查询等操作。
3.1 创建 TableEnvironment
使用SQL实现的第一步也是创建TableEnvironment,作为作业入口类。
TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:在内部的 catalog 中注册 Table、注册外部的 catalog、加载可插拔模块、执行 SQL 查询、注册自定义函数 (scalar、table 或 aggregation)、DataStream 和 Table 之间的转换(面向 StreamTableEnvironment )等作用。
Table 总是与特定的 TableEnvironment 绑定。 不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。 TableEnvironment 可以通过静态方法 TableEnvironment.create() 创建。
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
3.2 创建源表和结果表
使用 DDL 语句创建源表:创建及注册表名为 source 的源表。 源表 source 只有一列: word,该表代表了从input_path
所指定的输入文件中读取的单词
source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')
""".format(input_path)t_env.execute_sql(source_ddl)
使用 DDL 语句创建结果表:创建及注册表名为 sink 的结果表。结果表 sink 有两列: word和count,该表会将计算结果输出到文件 output_path
所指定的输出文件中
sink_ddl = """create table sink (word STRING,`count` BIGINT) with ('connector' = 'filesystem','format' = 'canal-json','path' = '{}')
""".format(output_path)t_env.execute_sql(sink_ddl)
3.3 创建 Flink 作业
创建一个 Flink 作业:该作业读取源表 source 中的数据,进行一些变换,然后将结果写入结果表 sink
@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):for s in line[0].split():yield Row(s)# 计算 word count
tab = t_env.from_path('source')
tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()
上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name)
被调用的时候, 作业才会被真正提交到集群或者本地进行执行
3.4 执行 Flink 作业
首先,你需要在文件 “input_path” 中准备好输入数据。然后,在命令行中运行作业(假设作业名为WordCount.py):
python WordCount.py
04 PyFlink 词频统计完整代码
参考资料
Flink官方文档 TableAPI & SQL
Flink官方文档 TableAPI教程
PyFlink 从入门到精通
更多推荐
词频,批处理,实例,基础,PyFlink
发布评论