flinkcdc笔记
创始人
2025-05-28 12:39:34

       在之前的数据同步中,好比咱们想实时获取数据库的数据,通常采用的架构就是采用第三方工具,好比canal、debezium等,实时采集数据库的变动日志,而后将数据发送到kafka等消息队列。而后再经过其余的组件,好比flink、spark等等来消费kafka的数据,计算以后发送到下游系统。

       在国内,用的比较多的是阿里巴巴开源的canal,咱们可使用canal订阅mysql的binlog日志,canal会将mysql库的变动数据组织成它固定的JSON或protobuf 格式发到kafka,以供下游使用。

canal解析后的json数据格式以下:

{"data": [{"id": "111","name": "scooter","description": "Big 2-wheel scooter","weight": "5.18"}],"database": "inventory","es": 1589373560000,"id": 9,"isDdl": false,"mysqlType": {"id": "INTEGER","name": "VARCHAR(255)","description": "VARCHAR(512)","weight": "FLOAT"},"old": [{"weight": "5.15"}],"pkNames": ["id"],"sql": "","sqlType": {"id": 4,"name": 12,"description": 12,"weight": 7},"table": "products","ts": 1589373560798,"type": "UPDATE"
}

简单讲下几个核心的字段(属性):

  • type : 描述操做的类型,包括‘UPDATE’, 'INSERT', 'DELETE'。

  • data : 表明操做的数据。若是为'INSERT',则表示行的内容;若是为'UPDATE',则表示行的更新后的状态;若是为'DELETE',则表示删除前的状态。

  • old :可选字段,若是存在,则表示更新以前的内容,若是不是update操做,则为 null。

在国外,比较有名的相似canal的开源工具备debezium,它的功能较canal更增强大一些,不只仅支持mysql。还支持其余的数据库的同步,好比 PostgreSQL、Oracle等,目前debezium支持的序列化格式为 JSON 和 Apache Avro 。

debezium format:

{"before": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.18},"after": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.15},"source": {...},"op": "u","ts_ms": 1589362330904,"transaction": null
}

flink cdc connector

对于上面的架构,咱们须要部署canal(debezium)+ kafka,而后flink再从kafka消费数据,这种架构下咱们须要部署多个组件,而且数据也须要落地到kafka,有没有更好的方案来精简下这个流程呢?咱们接下来说讲flink提供的cdc connector。

这个connector并无包含在flink的代码里,具体的地址是在https://github.com/ververica/flink-cdc-connectors里,详情你们能够看下这里面的内容。

这种架构下,flink直接消费数据库的增量日志,替代了原来做为数据采集层的canal(debezium),而后直接进行计算,通过计算以后,将计算结果发送到下游。

使用这种架构是好处有:

  • 减小canal和kafka的维护成本,链路更短,延迟更低

  • flink提供了exactly once语义

  • 能够从指定position读取

  • 去掉了kafka,减小了消息的存储成本

MySQLTableSource

在MySQLTableSource#getScanRuntimeProvider方法里,咱们看到,首先构造了一个用于序列化的对象RowDataDebeziumDeserializeSchema,这个对象主要是用于将Debezium获取的SourceRecord格式的数据转化为flink认识的RowData对象。 咱们看下RowDataDebeziumDeserializeSchem#deserialize方法,这里的操做主要就是先判断下进来的数据类型(insert 、update、delete),而后针对不一样的类型(short、int等)分别进行转换,

最后咱们看到用于flink用于获取数据库变动日志的Source函数是DebeziumSourceFunction,且最终返回的类型是RowData。

也就是说flink底层是采用了Debezium工具从mysql、postgres等数据库中获取的变动数据。

changelog format

当咱们从mysql-cdc获取数据库的变动数据,或者写了一个group by的查询的时候,这种结果数据都是不断变化的,咱们如何将这些变化的数据发到只支持append mode的kafka队列呢?

因而flink提供了一种changelog format,其实咱们很是简单的理解为,flink对进来的RowData数据进行了一层包装,而后加了一个数据的操做类型,包括如下几种 INSERT,DELETE, UPDATE_BEFORE,UPDATE_AFTER。这样当下游获取到这个数据的时候,就能够根据数据的类型来判断下如何对数据进行操做了。

好比咱们的原始数据格式是这样的。

{"day":"2020-06-18","gmv":100}

通过changelog格式的加工以后,成为了下面的格式:

{"data":{"day":"2020-06-18","gmv":100},"op":"+I"}

也就是说changelog format对原生的格式进行了包装,添加了一个op字段,表示数据的操做类型,目前有如下几种:

  • +I:插入操做。

  • -U :更新以前的数据内容:

  • +U :更新以后的数据内容。

  • -D :删除操做。

kafka connector和kafka-upsert connector的区别:

先创建一个kafka connector(sql)

CREATE TABLE KafkaTable (`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset', // 消费的offset位置,也可以是latest-offset'format' = 'csv'
)

kafka connector就是kafka中数据是怎么样的,KafkaTable表中的数据就是什么样(kafka中的数据就是表数据导进去的,比如新建一个表连接器为kafka,然后往这个表insert数据就是往kafka中导入)

kafka中表的数据是这种格式:

{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-23 11:32:24","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1201","access_time":"2021-01-23 11:32:55","dt":"2021-01-08"}

kafka-upsert

首先我们要知道,kafka是只能往里面append数据的,不存在删改里面的数据情况

再解释下什么是changelog类型kafka,就是kafka topic中的消息(每一个来的数据)是带有一个属性的,这个属性标记数据是Insert、update before、update after、delete的,再根据主键来进行对sink kafka topic中的数据进行具体的操作,是插入,还是更新,还是删除。

当作为数据源时,upsert-kafka Connector会生产一个changelog流,其中每条数据记录都表示一个更新或删除事件。更准确地说,如果不存在对应的key,则视为INSERT操作。
如果已经存在了相对应的key,则该key对应的value值为最后一次更新的值。

用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。

当作为数据接收器时,upsert-kafka Connector会消费一个changelog流。它将INSERT / UPDATE_AFTER数据作为正常的Kafka消息值写入(即INSERT和UPDATE操作,都会进行正常写入,
如果是更新,则同一个key会存储多条数据,但在读取该表数据时,只保留最后一次更新的值),并将 DELETE 数据以 value 为空的 Kafka 消息写入(key被打上墓碑标记,表示对应 key 的消息被删除)。
Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

上面的key指的是你建表时指定的primary key,不是kafka的key

相关内容

热门资讯

爱的创可贴大结局 ,爱的创可贴... 爱的创可贴大结局 目录爱的创可贴大结局 爱的创可贴第几集在一起爱的创可贴结局是什么?电视剧爱的创可贴...
爱在春天结局是什么 ,爱在春天... 爱在春天结局是什么 目录爱在春天结局是什么 爱在春天结局是什么爱在春天大结局爱在春天大结局爱在春天结...
怎么做水印 ,身份证水印怎么做... 怎么做水印 目录怎么做水印 身份证水印怎么做?ps怎么做水印怎么做水印 2. 设计水印:在设计水印时...
夫妻那些事的演员 ,夫妻那些事... 夫妻那些事的演员 目录夫妻那些事的演员 夫妻那些事主演是谁夫妻那些事演员表夫妻那些事演员表介绍夫妻那...
钉钉,下沉进农田 在这个古老的产业里,数字化没有被放到更高的位置,但难点依旧存在。钉钉恰是...
2023年全国最新二级建造师精... 百分百题库提供二级建造师考试试题、二建考试预测题、二级建造师考试真题、二建证考试题库等,...
点金胜手结局 ,点金胜手结局 ... 点金胜手结局 目录点金胜手结局 点金胜手结局点金胜手30黄宗泽最后跟谁在一起点金胜手大结局点金胜手结...
天苍苍野茫茫是什么歌 ,天苍苍... 天苍苍野茫茫是什么歌 目录天苍苍,野茫茫这句歌词的歌名是什么?天苍苍野茫茫风吹草低见牛羊是哪里的民歌...
未来日记漫画结局,《未来日记》... 未来日记漫画结局目录未来日记漫画结局《未来日记》结局是什么?漫画《未来日记》最后的结局是什么样的?未...
投名状讲的是什么 ,投名状说的... 投名状讲的是什么 目录投名状讲的是什么 投名状说的是什么事情投名状这个电影到底表达了个什么意思投名状...
《Spring Boot 趣味... 牛刀小试——五分钟入门 Spring Boot 万物皆可 Hello World 创建一个 Web ...
硬核~ 阿里人都在内卷的Spr... 前言 这份SpringBoot实战文档,结合典型业务场景,全面介绍基于S...
新娘印度电视剧大结局 ,印度电... 新娘印度电视剧大结局 目录新娘印度电视剧大结局 印度电视剧新娘的最后结局是什么?大结局的印度剧《新娘...
当婆婆遇上妈结局 ,《婆婆遇上... 当婆婆遇上妈结局 目录当婆婆遇上妈结局 《婆婆遇上的妈》电视剧结局是什么?《当婆婆遇上妈》的结局是啥...
洪水来临时正确的做法是什么,发... 洪水来临时正确的做法是什么目录洪水来临时正确的做法是什么发生洪水时的正确做法是什么遇到洪水的正确做法...
baci是什么意思 ,baci... baci是什么意思 目录baci是什么意思 baci是什么意思面基是什么意思啊?baci是什么意思 ...
经典卷积模型回顾26—基于知识... ResNet-152 是由微软亚洲研究院 (Microsoft Research Asia) 发布的...
HTTPS 之fiddler抓... Jmeter接口测试和接口自动化测试从入门到精通,全套项目实战!...
Vmware Ubuntu虚拟... 一、背景 先来说一下我的需求背景,我是在VMware中安装的Ubuntu虚拟机...
友谊的小船说翻就翻是什么意思 ... 友谊的小船说翻就翻是什么意思 目录友谊的小船说翻就翻是什么意思 网络语友谊的小船说翻就翻出自哪里,是...
爱迪奥特曼大结局,关于EVA大... 爱迪奥特曼大结局目录爱迪奥特曼大结局关于EVA大结局eva的大结局是什么?迪迦奥特曼 的结局是什么?...
废青是什么意思 ,自称自己是“... 废青是什么意思 目录00后到底是怎样一代人?自称自己是“废柴”,“废柴”啥意思董遇的三余是什么意思 ...
关于蛇的电影 极速百科网 极速... 关于蛇的电影目录关于蛇的电影关于蛇的电影有关蛇的电影有哪些?关于蛇的电影关于蛇的电影说到关于蛇的电影...
智能自动搬运设备四向穿梭车AG... 四向穿梭车现已成为穿梭车货架系统中的重要核心,作为高新科技先进的自动化物料搬运设备&#...
大数据分析工具Power BI... 导入数据操作介绍进入PowBI,弹出的如下页面也可以直接关闭,在Powe...
不是所有电脑换了固态硬盘=秒开...        即使是做简单的拷贝,其速度可能还不如机械硬盘。而3A游戏大作的文件体积也...
一笑倾人城再笑倾人国出自 ,“... 一笑倾人城再笑倾人国出自 目录一笑倾人城再笑倾人国出自 “一笑倾人城,再笑倾人国。”出自……?一笑倾...
小爸爸插曲是什么,《小爸爸》的... 小爸爸插曲是什么目录小爸爸插曲是什么《小爸爸》的插曲是什么名字小爸爸里面的插曲《小爸爸》里面的所有插...
斗罗大陆大师的结局,斗罗大陆3... 斗罗大陆大师的结局目录斗罗大陆大师的结局斗罗大陆3最后大师和谁结婚了史莱克七怪成神后大师去哪了斗罗大...
TextView用Spanna... textContent.getViewTreeObserver().addOnPreDrawList...