如题,在 mysql 通过 canal 发送数据到 kafka 后,进行增量同步数据到 es ,现在的问题是如何检验增量同步过程中有没有问题呢?这个工作可能要周期运行!举例如下:此刻比对昨天以前的增量数据的总数,及抽样某个表的一些行来对比 es 中的数据。查 mysql 之前,数据无变化,查 es 期间,mysql 中某些数据已经被更新或者删除了,但数据更新还没写入 es ,这样查出来的数据比对结果不太一致。各位有什么好的方案吗?
1
aoxg2019 OP ps:表有 update_time 时间。
|
2
aoxg2019 OP 还是说增量同步过程中不需要检验?
|
3
q474818917 2022-12-19 10:41:33 +08:00
参考 mysql 主从同步的校验机制
|
4
EXChen 2022-12-19 10:58:26 +08:00
我们以前是数据同步异常会有告警,然后人工排查,有问题重新回放一下 binlog 。
|
5
skymei 2022-12-19 11:04:11 +08:00
我们项目也是这种架构,Mysql canal ES,短时间内的差异是一定会存在的,要求不高的话就定时全量更新下,数据结构不复杂全量一次也很快
|
8
limbo0 2022-12-19 12:22:54 +08:00
实时同步这方面确实有风险, es 因为可读延迟并不能那么实时同步也不好验证, 最好是做一些关键指标的对比, 总量, sum, 有一定误差也算正常
|
9
aoxg2019 OP @limbo0 您看这样行不行呢,比如我今天要校验昨天增量同步的数据,我先在 mysql select update_time=昨天的 ,如果今天有数据更新 /删除,并且在 select 的结果集中,在比对两边数据的时候后剔除这部分数据可以吗
|
10
canbingzt 2022-12-19 13:24:32 +08:00
最近刚好看到相关的技术
https://debezium.io/ |
11
liprais 2022-12-19 13:33:17 +08:00
你得先想明白误差从何而来
|
13
7911364440 2022-12-19 13:44:08 +08:00
这不就是 CAP 吗,非要强一致性的话就只能实时同步了,并且在 mysql 写入之后,同步到 es 之前不允许查询
|
14
liprais 2022-12-19 13:52:11 +08:00
@aoxg2019
https://flyingice.github.io/2019/04/21/mysql_redolog.html 这个讲的很清楚了 基本上如果 mysql 没挂过就不用担心一致性的问题 |
15
limbo0 2022-12-19 14:07:38 +08:00
|
16
rekulas 2022-12-19 14:40:09 +08:00
我现在只在源头校验,实现 canal 消费者的时候尽量考虑鲁棒性,单进程插入,在读取 sql ,拼装 sql ,进出队列等逻辑处都需确认成功,一旦不成功就丢入 redis 待处理队列(包括丢队列都是循环重试的不成功一直重试)并通知人工介入,这样理论上来说不会有丢失的情况只是有延迟
目前来看运行很好,因为可能出错的地方都被考虑了,只是有时候手动改了数据库结构时 canal 会失败需要人工维护下基本几个月一次 当然也可能存在意外,但我觉得这种概率很小(几年都未必能遇到一次)暂时不在考虑范围 |
17
miniliuke 2022-12-19 14:50:17 +08:00
我是根据《 DBLog: A Watermark Based Change-Data-Capture Framework 》的原理做了,增量过程中的全量数据校验,论文本身是讲增量过程中的全量传输的。举一反三,我整了个增量过程中的数据全量校验,还水了一篇论文。但是我的目标端也是关系型的,es 不清楚支不支持(主键排序+OFFSET )的取数据方式
|
18
miniliuke 2022-12-19 14:56:55 +08:00
|
19
aoxg2019 OP @rekulas 这种也是一种比较好的方式,在源头把控。现在 leader 要求必须做检验.哈哈
|
21
miniliuke 2022-12-19 15:02:51 +08:00
@aoxg2019 不用看框架,直接看《 DBLog: A Watermark Based Change-Data-Capture Framework 》论文和解读就可以了
|
22
lmshl 2022-12-19 16:24:39 +08:00
我觉得业务层还是需要按照业务层的思路去解决,这和基础设施取舍不同。
比如按照做基础设施的思路来搞,那应该每一步都不可以出错,如果出错了就应该停在当前位置无限重试下去,以保证数据最终一致性。 但按照业务思路来做,因为一条数据出错而导致整个系统数据同步停机是不可接受的。 所以必然是以业务行( row )为单位,多次重试后记录错误并跳过,ES 也仅供搜索,业务事务依然由 RDBMS 保证,如此则需要引入就死信队列( DLQ )与纠错机制。 DLQ 能弥补一部分错误,但无法处理某些内部错误被当作正确处理跳过的场景。例如上游有 BUG ,请求账户积分失败时返回了 0 ,虽然锅是上游的,但修数据依然是下游要处理的。 所以还是需要有一种纠错机制来保证数据的最终一致性。我最近在考虑哈希树( merkle tree )不错,它是区块链用于校准的数据结构,可以快速对比不一致的数据块。 比如我们可以定时在闲暇对数据库做全量或部份 merkel tree 计算并对比两侧结果,最近数据多算,历史数据少算。这样对比出的不一致结果再通知给开发,找一下是哪里出的问题,以及手工对数据做补偿等等。 |
25
EXChen 2022-12-19 17:55:51 +08:00
@EXChen 我们以前 binlog 往 ES 里面写数据是自己写代码实现的(其实是移植的 canal ),能够 cache 异常并告警处理。
|
26
gengzi 2022-12-20 17:16:40 +08:00
@miniliuke 大佬,想请教下,对于 Chunk 的选择,从主键 id 最小开始嘛?下一次选择上次主键最大后的下一个 Chunk ,直到所有的 chunk 选择完?关于全量数据校验,这个如何校验,查源表 Chunk 和 目标表 chunk 数据是否一致吗?
|
27
potatowish 2022-12-20 18:59:57 +08:00
Debezium 的增量临时快照可以看一下,基于《 DBLog: A Watermark Based Change-Data-Capture Framework 》论文的一个解决方案。
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-incremental-snapshots https://github.com/debezium/debezium-design-documents/blob/main/DDD-3.md |