PyFlink 流处理基础实例 MySQL CDC方式实时备份

编程入门 行业动态 更新时间:2024-10-21 14:18:44

01 JDBC SQL 连接器

JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。

如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息;否则,它将以 append 模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。

1.1 下载依赖包

针对关系型数据库实现 Flink 通过建立 JDBC 连接器来执行 SQL 查询,要下载 flink-connector-jdbc 依赖包,其下载地址为 flink-connector-jdbc_2.11-1.14.0.jar

在连接到具体数据库时,也需要对应的驱动依赖,MySql的驱动 jar 包下载地址为 mysql-connector-java

1.2 创建 JDBC 表

作业中加入上述依赖包之后,使用 Table API 的 JDBC table 可以按如下定义:

-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (id BIGINT,name STRING,age INT,status BOOLEAN,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' = 'users'
);-- 从另一张表 "T" 将数据写入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;-- 查看 JDBC 表中的数据
SELECT id, name, age, status FROM MyUserTable;-- JDBC 表在时态表关联中作为维表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;

JDBC SQL 连接器的一些参数与含义如下:

connector:指定使用什么类型的连接器,这里应该是’jdbc’。url:JDBC 数据库 url。table-name:连接到 JDBC 表的名称。driver:用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。username:JDBC 用户名。如果指定了 ‘username’ 和 ‘password’ 中的任一参数,则两者必须都被指定。password:JDBC 密码。

02 JAR 依赖管理

如果作业中需要用到第三方的 jar 包,Python Table API 提供如下方式指明 jar 包

# 方式1:本地上传
table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")# 方式2:运行时加载
table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")

本地上传方式,通过将本地文件 URL 指向的 jar 包上传到 Flink 群集。在配置中通过 pipeline.jars 指定 jar包的 URL 列表,不同 jar 包之间用分号 ; 隔开。

运行时加载方式,指定确保 URL 在客户端和群集上都可访问的 jar 包路径规则,作业运行时根据路径加载指定 jar 包。

Python DataStream API 中提供了相似的方法指明 jar 包:本地上传 add_jars() 和运行时加载 add_classpaths()

stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")stream_execution_environment.add_classpaths("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")

03 实时 MySql CDC数据备份实例

本实例展示了如何用 Flink 进行 MySQL 数据库的实时同步。实例中通过监听 当前 MySQL 数据库的 binlog 数据变更,然后根据 binlog 日志将数据变更实时同步到另一个 MySQL 数据库中。

3.1 Flink CDC 连接器

CDC 的简介与用途

CDC (change data capture) ,即变化数据捕捉,它是数据库进行备份的一种方式,常用于大量数据的备份工作。

CDC 分为入侵式的和非入侵式两种:

入侵式:如基于触发器备份、基于时间戳备份、基于快照备份。非入侵式:如基于日志的备份。

本实例采用非入侵式,基于 MySql 的 binlog 日志进行备份。

所以要实现基于日志的 CDC 数据备份, MySQL 就是要开启 binlog 选项。 MySQL 8.0.X 是默认开启 binlog 的,你可以通过执行下面的命令,查看 log_bin 变量的值是否为 ON来检查 MySQL 是否开启 binlog。

show variables like '%log_bin%';

使用 Flink CDC 连接器

首先下载 MySql CDC 连接器 jar 包,下载地址为 flink-sql-connector-mysql-cdc

使用 Table API 可以创建如下 mysql cdc table, 通过 MySQL-CDC 连接器从 MySQL 的 binlog 里提取更改

-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (id INT NOT NULL,name STRING,description STRING,weight DECIMAL(10,3)
) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'inventory','table-name' = 'products'
);-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;

3.2 实时 MySql CDC数据备份实例

(1)初始化流处理环境并指定依赖 jar 包

为了在 Flink 中实现 MySQL 的 CDC,我们需要准备下面 3 个 jar 包:

flink-connector-jdbc_2.11-1.14.0.jar:通过 JDBC 连接器来从 MySQL 里读取或写入数据;用于创建 sink 表,定义连接器为 jdbcmysql-connector-java-8.0.16.jar:JDBC 连接器的驱动( 帮助 java 连接 MySQL )flink-sql-connector-mysql-cdc-2.0.2.jar:通过 MySQL-CDC 连接器从 MySQL 的 binlog 里提取更改;用于创建 source 表,定义连接器为 mysql-cdc JAR 名称下载地址flink-connector-jdbc_2.11-1.14.0.jar下载地址mysql-connector-java-8.0.16.jar下载地址flink-sql-connector-mysql-cdc-2.0.2.jar下载地址

初始化流处理环境并指定依赖 jar 包

# 初始化流处理环境 
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings) # 编辑 jar 包 URLs 
jars = []
for file in os.listdir(os.path.abspath(os.path.dirname(__file__))):if file.endswith('.jar'):jars.append(os.path.abspath(file))
str_jars = ';'.join(['file://' + jar for jar in jars])
# 指定 jar 依赖 
t_env.get_config().get_configuration().set_string("pipeline.jars", str_jars)

(2)创建 MySql CDC 源表

    # 创建 MySql CDC 源表mysql_cdc_ddl = """CREATE TABLE source (id INT,              name STRING,PRIMARY KEY (id) NOT ENFORCED         ) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3307','database-name' = 'flink','table-name' = 'user','username' = 'root','password' = 'root')"""t_env.execute_sql(mysql_cdc_ddl)

(3)创建数据同步的 JDBC 输出表

将数据写入外部数据库时,Flink 使用 DDL 中定义的主键。

如果定义了主键,则连接器将在 upsert 模式下运行,否则,连接器将在追加模式下运行。

在 upsert 模式下,Flink 将根据主键插入新行或更新现有行,Flink 可以通过这种方式确保幂等性。在追加模式下,Flink 将所有记录解释为 INSERT 消息,如果在基础数据库中发生主键或唯一约束冲突,则 INSERT 操作可能会失败。
    # 创建数据同步的 JDBC 输出表jdbc_sink_ddl = """CREATE TABLE sink (id INT,                            name STRING,                       PRIMARY KEY (id) NOT ENFORCED      -- 需要定义主键,让连接器在 upsert 模式下运行) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3308/flink','driver' = '.mysql.cj.jdbc.Driver','table-name' = 'user','username' = 'root','password' = 'root')"""t_env.execute_sql(jdbc_sink_ddl)

(4)运行实例

指向 MySql CDC 实时数据备份作业之前,我们需要准备实验环境:两个 mysql 数据库。

我们用 docker 容器快速构建实验环境,除了两个 mysql 数据库,还创建一个 adminer 容器:

mysql1 容器作为待同步的数据源mysql2 容器作为备份的数仓adminer 容器允许我们在本地没有安装 mysql 客户端的情况下,使用网页来查看和操作 mysql 容器

2 个 mysql 容器 + 1 个 adminer 容器,编写容器编排 docker-pose.yml如下:

version: "3.5"
services:mysql1:image: mysql:8.0.22     # 5.7 版本本地连接不上mand: ['--default-authentication-plugin=mysql_native_password','--character-set-server=utf8mb4','--collation-server=utf8mb4_unicode_ci','--log-bin=mysql-bin',]ports:- 3307:3306environment:MYSQL_ROOT_PASSWORD: rootvolumes:- ./examples/mysql:/docker-entrypoint-initdb.dcontainer_name: mysql1mysql2:image: mysql:8.0.22          # 5.7 版本本地连接不上mand: ['--default-authentication-plugin=mysql_native_password','--character-set-server=utf8mb4','--collation-server=utf8mb4_unicode_ci']ports:- 3308:3306                # 第二个数据库的端口是 3307environment:MYSQL_ROOT_PASSWORD: rootvolumes:- ./examples/mysql:/docker-entrypoint-initdb.dcontainer_name: mysql2adminer:image: adminerports:- 8089:8080container_name: adminer

使用 docker-pose up 命令启动容器环境,如果出现错误请检查容器端口映射是否冲突。

实验环境启动之后执行 MySql CDC 实时数据备份作业代码:

python flink_mysql_cdc.py

如果本地有 MySQL 客户端,可以同时连接 3307 和 3308 这两个端口(账密都是 root)。

在 3307 对应的实例上的 flink 数据库的 user 这张表里做相应的增删改,然后看看 3308 对应的同名的表里,是否已经实时同步了数据。

(5)实例完整代码

参考资料

Flink 官方文档 JDBC SQL 连接器

Flink 官方文档 依赖管理

flink-cdc-connectors 官方文档

PyFlink 从入门到精通

更多推荐

备份,实时,实例,方式,基础

本文发布于:2023-05-29 23:10:30,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/355101.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:备份   实时   实例   方式   基础

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!