PyFlink 有状态流处理实例 实时排行榜

编程入门 行业动态 更新时间:2024-10-21 18:38:29

01 UDAF 聚合函数的使用

自定义聚合函数(UDAF),将多条记录聚合成一条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。

需要注意的是:当前聚合函数仅在流模式下的 GroupBy 聚合和 Group Window 聚合中支持通用的用户定义聚合功能;对于批处理模式,当前不支持该模式,需要使用向量化聚合函数。

1.1 UDAF 的处理逻辑

聚合函数的处理过程以累加器 aumulator 的为中心,累加器是一种中间数据结构,用于存储将多行输入计算出的最终聚合结果,即用来存储聚合的中间结果。

围绕累加器 aumulator,一个聚合任务还需要如下三个方法:

create_aumulator():用来初始化自定义的累加其 aumulator,将内部定义的变量赋值为空或者0。aumulate():定义根据输入更新 aumulator 的逻辑,主要是编写中间的逻辑代码,根据输入变量来更新输出中间变量。get_value():定义如何返回 aumulator 中存储的中间结果,作为UDAF的最终结果。

一个聚合处理过程如下图所示:

上例中,我们想要计算出饮品价目表中最高的价格,其中饮品价目表包含三个属性 (id, name, price) 和五条数据。

聚合处理过程中:首先,使用 create_aumulator() 为要处理的数据构造一个空累加器;然后,使用 aumulate() 方法根据输入的每条数据更新累加器中存储的中间结果;在所有数据都处理完成后,使用 get_value() 方法计算中间结果中最大值,即要返回的最终结果。

1.2 聚合函数的使用

如果要定义 Python 聚合函数, 可以通过继承 pyflink.table 中的基类 AggregateFunction,并实现 aumulate() 方法。 聚合函数的返回结果类型和累加器类型可以通过两种方式指定:

实现 get_result_type() 方法和 get_aumulator_type() 方法使用 udaf 装饰器封装函数实例并指明类型参数 result_typeaumulator_type

一个 UDAF 的使用示例如下:

定义 UDAF

class WeightedAvg(AggregateFunction):# 为了计算加权平均值,累加器需要存储所有已累计数据的加权和和计数。在本示例中,我们使用行对象作为累加器。def create_aumulator(self):# Row(sum, count)return Row(0, 0)def get_value(self, aumulator):if aumulator[1] == 0:return Noneelse:return aumulator[0] / aumulator[1]def aumulate(self, aumulator, value, weight):aumulator[0] += value * weightaumulator[1] += weight# retract() 方法常应用在当前聚合操作之前存在可能生成收回消息的操作,例如组聚合、外部联接。def retract(self, aumulator, value, weight):aumulator[0] -= value * weightaumulator[1] -= weightdef get_result_type(self):return DataTypes.BIGINT()def get_aumulator_type(self):return DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())])

使用 UDAF

# 创建流处理环境
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)# 注册 UDAF 
# 可以在其中指定结果类型,定义中已经实现了`get_result_type()` 和 `get_aumulator_type()` 方法指明了类型,不需要在重复指明
# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), aumulator_type=...)
weighted_avg = udaf(WeightedAvg())t = table_env.from_elements([(1, 2, "Lee"),(3, 4, "Jay"),(5, 6, "Jay"),(7, 8, "Lee")]).alias("value", "count", "name")# 调用 UDAF
result = t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg")).to_pandas()
print(result)

其他调用 UDAF 的方式

Table API 中注册及调用 UDAF
# 注册 UDAF
table_env.create_temporary_function("weighted_avg", WeightedAvg())# Table API 中调用注册的 UDAF
result = t.group_by(t.name).select(call("weighted_avg", t.value, t.count).alias("avg")).to_pandas()
print(result)
SQL 中注册及调用 UDAF
# 注册 UDAF
table_env.create_temporary_view("source", t)# SQL 中调用注册的 UDAF
result = table_env.sql_query("SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY name").to_pandas()
print(result)
在 GroupBy Window 聚合中使用 Python 聚合函数
tumble_window = Tumble.over(lit(1).hours) \.on(col("rowtime")) \.alias("w")result = t.window(tumble_window) \.group_by(col('w'), col('name')) \.select("w.start, w.end, weighted_avg(value, count)") \.to_pandas()
print(result)

完整代码

from pyflink.mon import Row
from pyflink.table import AggregateFunction, DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.expressions import call
from pyflink.table.udf import udaf
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumbleclass WeightedAvg(AggregateFunction):def create_aumulator(self):# Row(sum, count)return Row(0, 0)def get_value(self, aumulator):if aumulator[1] == 0:return Noneelse:return aumulator[0] / aumulator[1]def aumulate(self, aumulator, value, weight):aumulator[0] += value * weightaumulator[1] += weightdef retract(self, aumulator, value, weight):aumulator[0] -= value * weightaumulator[1] -= weightdef get_result_type(self):return DataTypes.BIGINT()def get_aumulator_type(self):return DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())])env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# the result type and aumulator type can also be specified in the udaf decorator:
# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), aumulator_type=...)
weighted_avg = udaf(WeightedAvg())
t = table_env.from_elements([(1, 2, "Lee"),(3, 4, "Jay"),(5, 6, "Jay"),(7, 8, "Lee")]).alias("value", "count", "name")# call function "inline" without registration in Table API
result = t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg")).to_pandas()
print(result)# register function
table_env.create_temporary_function("weighted_avg", WeightedAvg())# call registered function in Table API
result = t.group_by(t.name).select(call("weighted_avg", t.value, t.count).alias("avg")).to_pandas()
print(result)# register table
table_env.create_temporary_view("source", t)# call registered function in SQL
result = table_env.sql_query("SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY name").to_pandas()
print(result)# use the general Python aggregate function in GroupBy Window Aggregation
tumble_window = Tumble.over(lit(1).hours) \.on(col("rowtime")) \.alias("w")result = t.window(tumble_window) \.group_by(col('w'), col('name')) \.select("w.start, w.end, weighted_avg(value, count)") \.to_pandas()
print(result)

1.3 聚合函数的视图 View

PyFlink 提供了更加高效的列表和字典存储结构 ListViewMapView,可以用于存储更大量的数据。

但是将 ListViewMapView 用于聚合操作是,累加器 aumulator 必须是 Row,且 ListViewMapView 必须是被声明在第一层。

使用方法入下所示:

from pyflink.table import ListViewclass ListViewConcatAggregateFunction(AggregateFunction):def get_value(self, aumulator):return aumulator[1].join(aumulator[0])def create_aumulator(self):return Row(ListView(), '')def aumulate(self, aumulator, *args):aumulator[1] = args[1]aumulator[0].add(args[0])def get_aumulator_type(self):return DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.LIST_VIEW(DataTypes.STRING())),DataTypes.FIELD("f1", DataTypes.BIGINT())])def get_result_type(self):return DataTypes.STRING()

1.4 向量化聚合函数

前面我们已经提到当前聚合函数仅在流模式下的 GroupBy 聚合和 Group Window 聚合中支持通用的用户定义聚合功能;对于批处理模式,当前不支持该模式,需要使用向量化聚合函数。

PyFlink 中向量化聚合函数以一个或多个 pandas.Series 类型的参数作为输入,并返回一个标量值作为输出。

向量化聚合函数不支持部分聚合,而且一个组或者窗口内的所有数据, 在执行的过程中,会被同时加载到内存,所以需要确保所配置的内存大小足够容纳这些数据。

如下示例中,展示了如何定义一个自定义向量化聚合函数,并在 GroupBy Aggregation、GroupBy Window Aggregation、Over Window Aggregation 中使用该函数。

定义自定义向量化聚合函数

# func_type="pandas" 输入类型
@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def weighted_avg(value):return value.mean()

使用自定义向量化聚合函数

# 创建批处理环境
settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(settings)weighted_avg = udaf(WeightedAvg())my_table = table_env.from_elements([(1, 2, "Lee"),(3, 4, "Jay"),(5, 6, "Jay"),(7, 8, "Lee")]).alias("value", "count", "name")# 在 GroupBy Aggregation 中使用向量化聚合函数
my_table.group_by(my_table.name).select(my_table.name, weighted_avg(add(my_table.value)))# 在 GroupBy Window Aggregation 中使用向量化聚合函数
tumble_window = Tumble.over(expr.lit(1).hours) \.on(expr.col("rowtime")) \.alias("w")my_table.window(tumble_window) \.group_by("w") \.select("w.start, w.end, weighted_avg(value)")# 在 Over Window Aggregation 中使用向量化聚合函数
table_env.create_temporary_function("weighted_avg", weighted_avg)
table_env.sql_query("""SELECT name,weighted_avg(value)over (PARTITION BY a ORDER BY rowtimeROWS BETWEEN UNBOUNDED preceding AND UNBOUNDED FOLLOWING)FROM MyTable""")

02 Kafka 连接器

Flink 的 Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力

2.1 下载依赖包

为了使用Kafka连接器,使用构建自动化工具(如Maven或SBT)的项目和使用SQL JAR包的SQL Client项目都需要下载依赖项 flink-connector-kafka_2.11。

Kafka 连接器目前并不包含在 Flink 的二进制发行版中,请查阅 这里 了解如何在集群运行中引用 Kafka 连接器。

2.2 创建 Kafka 表

作业中加入上述依赖包之后,使用 SQL / Table API 的 Kafka table 可以按如下定义:

CREATE TABLE KafkaTable (`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
)

Kafka 连接器的一些参数与含义如下:

connector:指定使用什么类型的连接器,这里应该是’kafka’。topic:Kafka 记录的 Topic 名。properties.bootstrap.servers:逗号分隔的 Kafka broker 列表。properties.group.id:Kafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 “KafkaSource-{tableIdentifier}” 作为消费组 ID。scan.startup.mode:Kafka consumer 的启动模式。有效值为:‘earliest-offset’,‘latest-offset’,‘group-offsets’,‘timestamp’ 和 ‘specific-offsets’。format:用来序列化或反序列化 Kafka 消息的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 ‘value.format’ 二者必需其一。

03 实时排行榜

本实例使用 Flink 的有状态流处理和滑动窗口,实现实时点击量排行榜。

该实例统计过去 1 分钟内,点击量最高的男女用户各 10 名及其具体的点击数,同时每隔 1 秒(实时)更新统计结果,等到排行榜数据并将结果同步到 kafka 中。

3.1 构建数据模拟器

首先,我们需要模拟实时产生的用户操作数据。

本实例中,我们编写一个 data_producer.py 的脚本,实时随机产生用户操作数据,并批量写入到 Kafka 中

每条写入 kafka 的用户操作数据包含如下字段:

{"ts": "2020-01-01 01:01:01",  # 当前时间"name": "刘备",  # 从根据性别随机产生的 50 个姓名里随机选择"sex": "男",  # 性别,60%概率为“男”,40%概率为“女”"action": "click",  # 动作,90%概率为“click”,10%概率为“scroll”"is_delete": 0,  # 是否要丢弃,90%概率为“0”(不丢弃),10%概率为1“丢弃”
}

构建候选用户组

我们创建一个用户类,生成候选用户组,并能够随机获取用户信息。

该类用于在向 Kafka 批量写入用户操作数据时随机生成用户信息。

seed = 2020  # 设置随机数种子,保证每次运行的结果都一样
num_users = 50  # 为了使得最后的结果不至于太平均,只初始化了 50 个用户,该 50 个用户有不同的概率来产生上面的数据
fake = Faker(locale='en_US') # fake 第三方库生成随机用户名称
Faker.seed(seed)
random.seed(seed)class UserGroup:def __init__(self):# 为指定数量的用户分配不同的出现概率,每次按概率分布获取用户姓名self.users = [self.gen_male() if random.random() < 0.6 else self.gen_female() for _ in range(num_users)]prob = np.cumsum(np.random.uniform(1, 100, num_users))  # 用户点击次数的累加self.prob = prob / prob.max()  # 点击次数归一化,转换成点击率# 静态方法生成男性用户信息和女性用户信息@staticmethoddef gen_male():return {'name': fake.name_male(), 'sex': 'male'}@staticmethoddef gen_female():return {'name': fake.name_female(), 'sex': 'female'}# 获取随机用户信息def get_user(self):r = random.random()  # 生成一个 0~1的随机数index = np.searchsorted(self.prob, r)return self.users[index]

生成用户操作数据

使用 Kafka 生产者产生用户操作数据

max_msg_per_second = 20  # 每秒钟的最大消息数
run_seconds = 3600  # 脚本最长运行时间,防止无限写入 kafka
topic = "user_action"  # kafka topic
bootstrap_servers = ['localhost:9092']def write_data():group = UserGroup()start_time = datetime.now()# 初始化 kafka 生产者producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda x: dumps(x).encode('utf-8'))# 生产用户操作数据,并发送到 kafkawhile True:# 创建用户操作数据now = datetime.now()user = group.get_user()cur_data = {"ts": now.strftime("%Y-%m-%d %H:%M:%S"),"name": user['name'],"sex": user['sex'],"action": 'click' if random.random() < 0.9 else 'scroll',  # 用户的操作"is_delete": 0 if random.random() < 0.9 else 1  # 10% 的概率丢弃这条数据}# 将数据写入 kafka topicproducer.send(topic, value=cur_data)# 终止条件if (now - start_time).seconds > run_seconds:break# 停止时间sleep(1 / max_msg_per_second)

查看用户操作数据

使用 Kafka 消费者查看已经写入的用户操作数据

消费者的初始化参数:

group_id:在高并发量情况下,则需要有多个消费者协作,此时消费进度由 group_id 统一。例如消费者A与消费者B,在初始化时使用同一个 group_id。在进行消费时,一条消息被消费者A消费后,在kafka中会被标记,如果这条消息被A消费后且正确 mit,则该消息不会再被B消费。auto_offset_reset:该参数指定消费者启动的时刻。通常情况下,消息队列中可能会有已经堆积的未消费消息,有时候需求是从上一次未消费的位置开始读(则该参数设置为earliest);有时候的需求为从当前时刻开始读之后产生的,之前产生的数据不再消费(则该参数设置为latest)。
# 读取 kafka 的用户操作数据并打印
def print_data():consumer = KafkaConsumer(topic,  # topic的名称group_id= 'group', bootstrap_servers=bootstrap_servers,  # 指定kafka服务器auto_offset_reset='latest', )for msg in consumer:print(msg.value.decode('utf-8').encode('utf-8').decode('unicode_escape'))

数据模拟生成完整代码

3.2 根据输入数据和输出结果创建输入输出表

本实例的数据来源于 kafka 并将处理结果也输出到 kafka,所以我们要创建 kafka 表并指定topic, kafka_servers, group_id 等必要参数如下:

kafka_servers = "localhost:9092"
kafka_consumer_group_id = "group1"  # group ID
source_topic = "user_action"  # 源数据
sink_topic = "click_rank"  # 结果

本实例的数据对象就是用户的操作数据,输入数据包含 name:姓名,sex:性别,action:操作,is_delete:删除状态,ts:点击时间共五个字段,创建源表如下:

source_ddl = """CREATE TABLE source (name VARCHAR,                sex VARCHAR,                 action VARCHAR,             is_delete BIGINT,            ts TIMESTAMP(3),             WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 声明 ts 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark) with ('connector' = 'kafka','topic' = '{source_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','format' = 'json')
"""t_env.execute_sql(source_ddl)

本实例的统计结果包含 male_top10:点击量最高的 10 个男性用户,female_top10:点击量最高的 10 个女性用户,start_time:窗口开始时间,end_time:窗口结束时间 共四个字段,创建结果表如下:

sink_ddl = """CREATE TABLE sink (male_top10 STRING,        female_top10 STRING,      start_time TIMESTAMP(3),  end_time TIMESTAMP(3)      ) with ('connector' = 'kafka','topic' = '{sink_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','format' = 'json')
"""t_env.execute_sql(sink_ddl)

3.3 编写用户自定义聚合函数 UDAF

在 PyFlink 中定义 UDAF 需要 Flink >= 1.12,使用 UDAF 可以将多行的标量值映射到新的标量值。

本例子中我们需要使用滑动窗口计算点击率前十的用户,使用向量化的 Python 聚合函数( Pandas UDAF )进行 windows 聚合,即在使用 UDAF 时,指定参数 func_type=“pandas”。

用于统计点击量最多的 10 个男性和女性的向量化聚合函数如下所示:

# 统计点击量最多的 10 个男人(只统计 sex=male、action=click 的数量,忽略 is_delete=1 的数据)
@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def male_click_top10(name, sex):names = name[sex == 'male']return names.value_counts().iloc[:10].to_json()# 统计点击量最多的 10 个女人(只统计 sex=female、action=click 的数量,忽略 is_delete=1 的数据)
@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def female_click_top10(name, sex, action, is_delete):names = name[sex == 'female']return names.value_counts().iloc[:10].to_json()

3.4 流处理完整代码

除了上述源表和结果表的创建,以及定义 UDAF 聚合函数,流处理过程中要需要完成如下任务:

创建流处理环境指定 kafka 依赖注册 UDAF使用 UDAF 完成流处理任务

3.5 打印实时排行榜

完成流处理任务之后,实时排行结果被写入到 kafka 的 click_rank topic 中,我们从该 topic 中读取用户操作数据并打印

"""
读取 kafka 的用户操作数据并打印
"""
from kafka import KafkaConsumer
from reprint import output
import jsontopic = 'click_rank'
bootstrap_servers = ['localhost:9092']
group_id = 'group1'def sink_output():consumer = KafkaConsumer(topic,  group_id=group_id,  bootstrap_servers=bootstrap_servers, auto_offset_reset='latest',  )with output(output_type="list", initial_len=22, interval=0) as output_lines:# 初始化打印行     for i in range(14):if i==0 :output_lines[i] = '=== 窗口时间 ==='elif i==2 :output_lines[i] = '=== 男 ==='elif i == 8 :output_lines[i] = '=== 女 ==='else:output_lines[i] = 'name click'for msg in consumer:# 解析结果data = json.loads(msg.value)male_rank = json.loads(data['male_top10'])female_rank = json.loads(data['female_top10'])start_time = data['start_time']end_time = data['end_time']output_lines[1] = f'开始时间{start_time:6s} 结束时间{end_time}'# 逐行打印for i in range(5):if i < len(male_rank):name = list(male_rank.keys())[i]value = list(male_rank.values())[i]output_lines[i+3] = f'{name:6s} {value}'else:output_lines[i+3] = ''for i in range(5):if i < len(female_rank):name = list(female_rank.keys())[i]value = list(female_rank.values())[i]output_lines[i+9] = f'{name:6s} {value}'else:output_lines[i+9] = ''if __name__ == "__main__":sink_output()

3.6 运行实例

首先我们使用 docker 按照如下容器编排创建一个 kafka,同时构建一个 zookeeper 与 kafka 结合一起使用,用于管理 kafka 的 broker,以及实现负载均衡。

version: "3.5"
services:zookeeper:image: zookeeper:3.6.2ports:- "2181:2181"                        ## 对外暴露的 zookeeper 端口号container_name: zookeeperkafka:image: wurstmeister/kafka:2.13-2.6.0volumes:- /etc/localtime:/etc/localtime      ## kafka 镜像和宿主机器之间时间保持一致ports:- "9092:9092"                        ## 对外暴露的 kafka 端口号depends_on:- zookeeperenvironment:KAFKA_ADVERTISED_HOST_NAME: localhostKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_PORT: 9092KAFKA_BROKER_ID: 1KAFKA_LOG_RETENTION_HOURS: 120KAFKA_MESSAGE_MAX_BYTES: 10000000KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000KAFKA_NUM_PARTITIONS: 3KAFKA_DELETE_RETENTION_MS: 1000KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"      ## 自动创建 topicscontainer_name: kafka

1 启动容器环境

docker-pose up -d

2 运行数据模拟程序

python data_producer.py

3 运行流处理任务程序

flink run -m localhost:8081 -python ranklist.py

4 运行排行榜打印程序

python data_sumer.py

5 运行结果

参考资料

Flink 官方文档:向量化聚合函数官方文档

Flink 官方文档:Apache Kafka SQL 连接器

PyFlink 从入门到精通

更多推荐

实时,实例,状态,排行榜,PyFlink

本文发布于:2023-05-29 23:10:44,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/355103.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:实时   实例   状态   排行榜   PyFlink

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!