一文解开主流开源变更数据捕获技术之Flink CDC的入门使用 全球新要闻
@
【资料图】
flink-cdc-connectors 官网 https://github.com/ververica/flink-cdc-connectors 源码release最新版本2.4.0
flink-cdc-connectors 文档地址 https://ververica.github.io/flink-cdc-connectors/master/
flink-cdc-connectors 源码地址 https://github.com/ververica/flink-cdc-connectors
CDC Connectors for Apache Flink 是Apache Flink的一组源连接器,使用更改数据捕获(CDC)从不同的数据库摄取更改,其集成了Debezium作为捕获数据变化的引擎,因此它可以充分利用Debezium的能力。
Flink CDC是由Flink社区开发的flink-cdc-connectors 的source组件,基于数据库日志的 Change Data Caputre 技术,实现了从 MySQL、PostgreSQL 等数据库全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。
什么是CDC?这里也简单说明下,CDC为三个英文Change Data Capture(变更数据捕获)的缩写,核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其它服务进行订阅及消费。
CDC的分类CDC主要分为基于查询的CDC和基于binlog的CDC,两者之间区别主要如下:
特性支持读取数据库快照,即使发生故障,也只进行一次处理,继续读取事务日志。数据流API的CDC连接器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。用于表/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源来监视单个表上的更改。应用场景数据分发,将一个数据源分发给多个下游,常用于业务解耦、微服务。数据集成,将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。数据迁移,常用于数据库备份、容灾等。支持数据源CDC Connectors for Apache Flink支持从多种数据库到Flink摄取快照数据和实时更改,然后转换和下沉到各种下游系统
支撑数据源包括如下:
实战Flink DataStream方式代码示例这里以MySQL作为数据源为例,通过flink-connector-mysql-cdc实现数据变更获取,先准备MySQL环境,这里复用前面<<实时采集MySQL数据之轻量工具Maxwell实操>>的文章环境,数据库有两个my_maxwell_01,my_maxwell_02,每个数据库都有相同account和product表。pom文件引入依赖
4.0.0 cn.itxs.flink flink-cdc-demo 1.0-SNAPSHOT 8 8 1.17.1 2.4.0 8.0.29 1.2.83 org.apache.flink flink-streaming-java ${flink.version} provided org.apache.flink flink-clients ${flink.version} provided org.apache.flink flink-connector-base ${flink.version} provided org.apache.flink flink-table-api-java-bridge ${flink.version} provided org.apache.flink flink-table-planner-loader ${flink.version} provided org.apache.flink flink-table-runtime ${flink.version} provided mysql mysql-connector-java ${mysql.client.version} com.ververica flink-connector-mysql-cdc ${flink.cdc.version} com.alibaba fastjson ${fastjson.version} org.apache.maven.plugins maven-shade-plugin 3.2.4 package shade com.google.code.findbugs:jsr305 org.slf4j:* log4j:* *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA 创建DataStreamDemo.java,
package cn.itxs.cdc;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DataStreamDemo { public static void main(String[] args) throws Exception { MySqlSource mySqlSource = MySqlSource.builder() .hostname("mysqlserver") .port(3306) .databaseList("my_maxwell_01,my_maxwell_02") .tableList("my_maxwell_01.*,my_maxwell_02.product") .username("root") .password("12345678") .deserializer(new JsonDebeziumDeserializationSchema()) // 将SourceRecord转换为JSON字符串 .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint env.enableCheckpointing(3000); env .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") // 设置平行度为4 .setParallelism(4) .print().setParallelism(1); // 对sink打印使用并行性1来保持消息顺序 env.execute("Print MySQL Snapshot + Binlog"); }} 由于上面flink的依赖配置
启动程序,查看日志可以看到从mysql读取目前全量的数据,my_maxwell_02也只读取product表数据
修改两个库的表后可以看到相应修改信息,其中也确认my_maxwell_02的account没有读取变更数据。
{"before":{"id":7,"name":"李丹","age":44},"after":{"id":7,"name":"李丹","age":48},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856595000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"account","server_id":1,"gtid":null,"file":"binlog.000025","pos":2798,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856598620,"transaction":null}{"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856605000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3140,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856608748,"transaction":null}{"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856628000,"snapshot":"false","db":"my_maxwell_02","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3486,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856631643,"transaction":null}打包后放到集群上,执行
bin/flink run -m hadoop1:8081 -c cn.itxs.cdc.DataStreamDemo ./lib/flink-cdc-demo-1.0-SNAPSHOT.jar 可以看到的日志也成功输出表的全量的日志和刚才修改增量数据
如果需要断点续传可以使用状态后端存储来实现
CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointStorage("hdfs://hadoop111:9000/checkpoints/flink/cdc"); checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2)); checkpointConfig.setTolerableCheckpointFailureNumber(5); checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1)); checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);FlinkSQL方式代码示例创建SqlDemo.java文件
package cn.itxs.cdc;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;public class SqlDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql("CREATE TABLE account (\n" + " id INT NOT NULL,\n" + " name STRING,\n" + " age INT,\n" + " PRIMARY KEY(id) NOT ENFORCED\n" + ") WITH (\n" + " "connector" = "mysql-cdc",\n" + " "hostname" = "mysqlserver",\n" + " "port" = "3306",\n" + " "username" = "root",\n" + " "password" = "12345678",\n" + " "database-name" = "my_maxwell_01",\n" + " "table-name" = "account"\n" + ");"); Table table = tableEnv.sqlQuery("select * from account"); DataStream rowDataStream = tableEnv.toChangelogStream(table); rowDataStream.print("account_binlog===="); env.execute(); }}
启动程序,查看日志可以看到从mysql读取my_maxwell_01库account表的全量的数据,修改表数据也确认读取变更数据。
本人博客网站IT小神www.itxiaoshen.com关键词:
上一篇:【共同缔造安全江夏㉘】宣传教育“接地气”,安全发展“有底气”
下一篇:最后一页
广告
X 关闭
X 关闭
-
-
京张高铁每日开行17对冬奥列车
京张高铁每日开行17对冬奥列车 预计冬奥服务保障期运送运动员、技术官员、持票观众等20万人次 2月6日,2022北京新闻中心举行“北
-
-
北京冬奥会开幕式上 小学生朱德恩深情演绎《我和我的祖国》
北京冬奥会开幕式上 小学生朱德恩深情演绎《我和我的祖国》 9岁小号手苦练悬臂吹响颂歌 2月4日晚,在北京冬奥会开幕式上,9岁的
-
-
2022北京冬奥会开幕式这19首乐曲串烧不简单
多名指挥家列曲目单 再由作曲家重新编曲 本报专访冬奥开幕式音乐总监赵麟 开幕式这19首乐曲串烧不简单 “二十四节气”倒计时、
-
-
“一墩难求” 冰墩墩引爆购买潮
设计师:没想到冰墩墩成爆款一墩难求冰墩墩引爆购买潮 北京冬奥组委:会源源不断供货北京冬奥会吉祥物冰墩墩近日引爆购买潮,导致一墩难求
