一文解开主流开源变更数据捕获技术之Flink CDC的入门使用 全球新要闻

来源:2023-06-28 06:20:00    时间:博客园

@


【资料图】

目录概述定义什么是CDC?CDC的分类特性应用场景支持数据源实战Flink DataStream方式代码示例FlinkSQL方式代码示例概述定义

flink-cdc-connectors 官网 https://github.com/ververica/flink-cdc-connectors 源码release最新版本2.4.0

flink-cdc-connectors 文档地址 https://ververica.github.io/flink-cdc-connectors/master/

flink-cdc-connectors 源码地址 https://github.com/ververica/flink-cdc-connectors

CDC Connectors for Apache Flink 是Apache Flink的一组源连接器,使用更改数据捕获(CDC)从不同的数据库摄取更改,其集成了Debezium作为捕获数据变化的引擎,因此它可以充分利用Debezium的能力。

Flink CDC是由Flink社区开发的flink-cdc-connectors 的source组件,基于数据库日志的 Change Data Caputre 技术,实现了从 MySQL、PostgreSQL 等数据库全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。

什么是CDC?

这里也简单说明下,CDC为三个英文Change Data Capture(变更数据捕获)的缩写,核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其它服务进行订阅及消费。

CDC的分类

CDC主要分为基于查询的CDC和基于binlog的CDC,两者之间区别主要如下:

特性支持读取数据库快照,即使发生故障,也只进行一次处理,继续读取事务日志。数据流API的CDC连接器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。用于表/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源来监视单个表上的更改。应用场景数据分发,将一个数据源分发给多个下游,常用于业务解耦、微服务。数据集成,将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。数据迁移,常用于数据库备份、容灾等。支持数据源

CDC Connectors for Apache Flink支持从多种数据库到Flink摄取快照数据和实时更改,然后转换和下沉到各种下游系统

支撑数据源包括如下:

实战Flink DataStream方式代码示例

这里以MySQL作为数据源为例,通过flink-connector-mysql-cdc实现数据变更获取,先准备MySQL环境,这里复用前面<<实时采集MySQL数据之轻量工具Maxwell实操>>的文章环境,数据库有两个my_maxwell_01,my_maxwell_02,每个数据库都有相同account和product表。pom文件引入依赖

    4.0.0    cn.itxs.flink    flink-cdc-demo    1.0-SNAPSHOT            8        8        1.17.1        2.4.0        8.0.29        1.2.83                            org.apache.flink            flink-streaming-java            ${flink.version}            provided                            org.apache.flink            flink-clients            ${flink.version}            provided                            org.apache.flink            flink-connector-base            ${flink.version}            provided                            org.apache.flink            flink-table-api-java-bridge            ${flink.version}            provided                            org.apache.flink            flink-table-planner-loader            ${flink.version}            provided                            org.apache.flink            flink-table-runtime            ${flink.version}            provided                            mysql            mysql-connector-java            ${mysql.client.version}                            com.ververica            flink-connector-mysql-cdc            ${flink.cdc.version}                            com.alibaba            fastjson            ${fastjson.version}                                                    org.apache.maven.plugins                maven-shade-plugin                3.2.4                                                            package                                                    shade                                                                                                                                                com.google.code.findbugs:jsr305                                    org.slf4j:*                                    log4j:*                                                                                                                                                                                                *:*                                                                            META-INF/*.SF                                        META-INF/*.DSA                                        META-INF/*.RSA                                                                                                                                                                                                                                                                                                            

创建DataStreamDemo.java,

package cn.itxs.cdc;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DataStreamDemo {    public static void main(String[] args) throws Exception {        MySqlSource mySqlSource = MySqlSource.builder()                .hostname("mysqlserver")                .port(3306)                .databaseList("my_maxwell_01,my_maxwell_02")                .tableList("my_maxwell_01.*,my_maxwell_02.product")                .username("root")                .password("12345678")                .deserializer(new JsonDebeziumDeserializationSchema()) // 将SourceRecord转换为JSON字符串                .build();        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 开启checkpoint        env.enableCheckpointing(3000);        env                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")                // 设置平行度为4                .setParallelism(4)                .print().setParallelism(1); // 对sink打印使用并行性1来保持消息顺序        env.execute("Print MySQL Snapshot + Binlog");    }}

由于上面flink的依赖配置provided,因此在IDEA中启动的话需要勾选下面标红的选项

启动程序,查看日志可以看到从mysql读取目前全量的数据,my_maxwell_02也只读取product表数据

修改两个库的表后可以看到相应修改信息,其中也确认my_maxwell_02的account没有读取变更数据。

{"before":{"id":7,"name":"李丹","age":44},"after":{"id":7,"name":"李丹","age":48},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856595000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"account","server_id":1,"gtid":null,"file":"binlog.000025","pos":2798,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856598620,"transaction":null}{"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856605000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3140,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856608748,"transaction":null}{"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856628000,"snapshot":"false","db":"my_maxwell_02","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3486,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856631643,"transaction":null}

打包后放到集群上,执行

bin/flink run -m hadoop1:8081 -c cn.itxs.cdc.DataStreamDemo ./lib/flink-cdc-demo-1.0-SNAPSHOT.jar 

可以看到的日志也成功输出表的全量的日志和刚才修改增量数据

如果需要断点续传可以使用状态后端存储来实现

CheckpointConfig checkpointConfig = env.getCheckpointConfig();        checkpointConfig.setCheckpointStorage("hdfs://hadoop111:9000/checkpoints/flink/cdc");        checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));        checkpointConfig.setTolerableCheckpointFailureNumber(5);        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
FlinkSQL方式代码示例

创建SqlDemo.java文件

package cn.itxs.cdc;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;public class SqlDemo {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);        tableEnv.executeSql("CREATE TABLE account (\n" +                " id INT NOT NULL,\n" +                " name STRING,\n" +                " age INT,\n" +                " PRIMARY KEY(id) NOT ENFORCED\n" +                ") WITH (\n" +                " "connector" = "mysql-cdc",\n" +                " "hostname" = "mysqlserver",\n" +                " "port" = "3306",\n" +                " "username" = "root",\n" +                " "password" = "12345678",\n" +                " "database-name" = "my_maxwell_01",\n" +                " "table-name" = "account"\n" +                ");");        Table table = tableEnv.sqlQuery("select * from account");        DataStream rowDataStream = tableEnv.toChangelogStream(table);        rowDataStream.print("account_binlog====");        env.execute();    }}

启动程序,查看日志可以看到从mysql读取my_maxwell_01库account表的全量的数据,修改表数据也确认读取变更数据。

本人博客网站IT小神www.itxiaoshen.com

关键词:

文章推荐

  • 赏传统年俗逛非遗庙会 铜官窑古镇重温传统民俗年

    中新网长沙2月6日电 (潘杏琼)在多地倡导就地过年的环境下,位于长沙市城北的铜官窑古镇景区,从1月24日至2月15日举行中国年·湖湘味·铜官

    中新网 2022-02-07
  • 哈尔滨铁路迎节后返程高峰 推出复工专列服务

    中新网哈尔滨2月6日电 (周晓舟 记者 史轶夫)中国铁路哈尔滨局有限公司6日发布消息,哈尔滨铁路迎来春节后返程客流高峰,6日至7日预

    中新网 2022-02-07
  • 冬奥动车组设5G超高清演播室 “瑞雪迎春”号智能化人性化结合

    中新网北京2月6日电 (记者 刘文曦)在时速350公里的高铁列车上首设5G超高清演播室,为北京冬奥会量身定制的新型奥运版智能复兴号动车组瑞

    中新网 2022-02-07
  • 中欧班列“签证官”:日行10公里 用锤子“听诊”

    (新春走基层)中欧班列“签证官”:日行10公里 用锤子“听诊”  中新网郑州2月6日电 题:中欧班列“签证官”:日行10公里,用锤子“

    中新网 2022-02-07
  • 西湖守兰人的春节美丽故事:花苞为伴 手留余香

    中新网杭州2月6日电 (记者 谢盼盼)守望花苞,这是西湖守兰人许晔的春节故事,春节正是兰花花苞开花的重要时期。  今年春节里,浙江

    中新网 2022-02-07
  • 广告

    X 关闭

    X 关闭

  • 众测
  • more+

    京张高铁每日开行17对冬奥列车

      京张高铁每日开行17对冬奥列车  预计冬奥服务保障期运送运动员、技术官员、持票观众等20万人次  2月6日,2022北京新闻中心举行“北

    北京冬奥会开幕式上 小学生朱德恩深情演绎《我和我的祖国》

      北京冬奥会开幕式上 小学生朱德恩深情演绎《我和我的祖国》  9岁小号手苦练悬臂吹响颂歌  2月4日晚,在北京冬奥会开幕式上,9岁的

    2022北京冬奥会开幕式这19首乐曲串烧不简单

      多名指挥家列曲目单 再由作曲家重新编曲 本报专访冬奥开幕式音乐总监赵麟  开幕式这19首乐曲串烧不简单  “二十四节气”倒计时、

    “一墩难求” 冰墩墩引爆购买潮

    设计师:没想到冰墩墩成爆款一墩难求冰墩墩引爆购买潮 北京冬奥组委:会源源不断供货北京冬奥会吉祥物冰墩墩近日引爆购买潮,导致一墩难求