V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
jiobanma
V2EX  ›  程序员

mysql 数据同步 elasticsearch 方案

  •  
  •   jiobanma ·
    banmajio · 202 天前 · 4984 次点击
    这是一个创建于 202 天前的主题,其中的信息可能已经有所发展或是发生改变。

    目前项目想把 mysql 中的数据同步到 elasticsearch 中,接触了几种方案,想咨询一下大家工作中都使用什么方案。

    1. mysql 中的全量数据使用 logstash 导入 es ,然后增量的数据通过 canal 监听 binlog 转发到消息队列,然后数据服务监听详细队列,维护 es 中的数据。 [优点] :代码侵入小,轻量级使用。 [缺点] :a. logstash 导入大量数据效率低,每次任务最多只能同步 100000 条,并且最少要 1 分钟执行一次。b. logstash 先全量,canal 再接着增量,两个时间点可能会出现数据一致性问题。c. 貌似 canal 的坑不少,并且 github 好像也不怎么维护了。

    2. flink cdc 同步 mysql 到 elasticsearch 。看了一下 demo 的演示,感觉挺强大的,完全不用写代码,基本上通过 flink sql 就可以搞定。但是尝试按照官方的教程在本地模拟一下,不知道是不是目前还不支持 es8 ,导致我的数据刚开始可以同步到 es ,但是之后的 CUD 操作都无法同步到 es ,flink 的 webui 也有错误信息。在官方的 github 上咨询了此事,目前还没有回复 https://github.com/ververica/flink-cdc-connectors/discussions/1968 。 [缺点] :网上的资料较少。

    大家工作中都使用什么方案呢

    66 条回复    2023-03-09 14:51:07 +08:00
    voidmnwzp
        1
    voidmnwzp  
       202 天前
    不要求实时性就根据 updatetime 每天跑个定时任务刷一次,要求强一致性就用 flink cdc
    GunsRose
        2
    GunsRose  
       202 天前
    肯定是 flink-cdc 好用,不过这玩意搞起来也有很多的坑
    jiobanma
        3
    jiobanma  
    OP
       202 天前
    @GunsRose 能举个例子吗大佬
    jiobanma
        4
    jiobanma  
    OP
       202 天前
    @voidmnwzp 还是要求强一致性的
    pubby
        5
    pubby  
       202 天前 via iPhone
    现在用的方案是
    记录 binlog 位置后先全量导入
    然后从那个位置开始 canal 同步数据
    NoString
        6
    NoString  
       202 天前
    阿里云全家桶之 DTS...
    577322753
        7
    577322753  
       202 天前
    如果是云服务上的 db 的话,es 索引与表是一对一的话可以直接用 DTS 数据同步来做,表与 es 索引不是一对一的话,可以用 DTS 数据订阅,将 binlog 投递到 kafka 中,自己订阅 kafka 消息解析 binlog ,写到 es 中。
    我们目前是用 DTS 数据订阅作为生产端 + 基于 canal Client 改造的客户端实现的 es 同步
    577322753
        8
    577322753  
       202 天前
    有些 DTS 工具也是支持多表写入到一个索引里的,京东云上的数据同步就是这样的,跟他们沟通过,底层也是用的 flink 那一套
    j1132888093
        9
    j1132888093  
       202 天前
    我们公司操作数据库的时候同步操作 es........
    likeme
        10
    likeme  
       202 天前
    @j1132888093 我也是这么做的,难道不行?(自我怀疑...)
    binge921
        11
    binge921  
       202 天前
    别用 canal 了 我上次用 不小心把生产的 cpu 占用干满了 哭死
    jiobanma
        12
    jiobanma  
    OP
       202 天前
    @j1132888093
    @likeme 这样是可以的,但是涉及到一个历史数据的导入,还是比较麻烦的,当然可以写一个洗数据的接口,但是效率肯定很低。然后要在业务代码里在同步操作 es 还得梳理旧的代码和接口,太费人了。
    gitxuzan
        13
    gitxuzan  
       202 天前
    cancal 提供了第三方中间件语言,对接到自己语言里面,监听解析 binlog ,然后实时批量同步
    hhjswf
        14
    hhjswf  
       202 天前
    @likeme 入侵性太强了
    godleon
        15
    godleon  
       202 天前
    bboss
    matrix1010
        16
    matrix1010  
       202 天前
    可以参考 elastic 官方的这篇文章,简单来说就是通过时间戳增量批量刷新: https://www.elastic.co/cn/blog/how-to-keep-elasticsearch-synchronized-with-a-relational-database-using-logstash 。 但其实可以完全不用 Logstash, 自己写个 cron 脚本按相同的逻辑更新
    vagusss
        17
    vagusss  
       202 天前
    maxwell 好像坑比 canal 少一点
    Morriaty
        18
    Morriaty  
       202 天前
    你是 online service 还只是内部搜索需求?如果是 online service ,同步工具倒是其次,这玩意最恶心的是,你要设计好一个机制,就是每次由于字段变更、分词词库更新等问题需要重新全量索引( full index )时,你的实时索引( realtime index )不能停,因为线上还在服务,用户要一直能看到最新的。可以看看 https://engineering.carsguide.com.au/elasticsearch-zero-downtime-reindexing-e3a53000f0ac
    SoulSleep
        19
    SoulSleep  
       202 天前
    我们用的 canal ,你说的这俩方案我们曾经都试过....
    logstash...慢,不灵活
    flink cdc 还不错,不需要写代码...但是...有上手难度(对我们团队来说

    canal ,我们现在线上运行了几十个实例,有 1-1 ,N-1 ,多种场景,性能,没太多资源,大概做到了 5w/s 的速度,完全满足我们的需要
    我们的链路:
    1-1:mysql--canal--adapter--es

    n-1: mysql--canal--处理程序--es
    terranboy
        20
    terranboy  
       202 天前
    现在的 ORM 的 EVENT 都很完善的 在业务里面同步了
    SoulSleep
        21
    SoulSleep  
       202 天前   ❤️ 1
    @Morriaty #18 对于你这个补充一下
    我们在 es 里只存搜索条件+数据库主键,会有一个单独的服务用搜索条件得到各个表的数据库主键,然后在程序里聚合,查询效果对比:
    1.关系型数据库,多表 join ,几亿数据,大概要 5~10 秒
    2.现有方案,es+单表聚合,几亿数据,大概 0.x~2 秒
    weofuh
        22
    weofuh  
       202 天前
    可不可以历史数据用 datax ,增量数据用 canal 呢?
    Aresxue
        23
    Aresxue  
       202 天前
    flink 没玩过,但是 canal 绝对是个坑,一个是 cpu 老是莫名其妙跑满,还有对主从模式很多地方都没有支持非常坑。
    barbery
        24
    barbery  
       202 天前
    我们也是用 canal 同步,暂时没发现有什么坑,可能是我们的数据量不大😂
    jiobanma
        25
    jiobanma  
    OP
       202 天前
    @SoulSleep 感谢大佬
    Seulgi
        26
    Seulgi  
       202 天前
    dts 贼贵啊。
    hotcool100
        27
    hotcool100  
       202 天前
    flink cdc 太吃内存和资源了,用 canal 好多了
    changdy
        28
    changdy  
       202 天前
    2333 看来只有我一个人用的是 debezium 吗?
    CRUD
        29
    CRUD  
       202 天前
    我们是用 canal ,mysql -> canal -> rabbitmq -> 处理服务 -> es
    jiobanma
        30
    jiobanma  
    OP
       202 天前
    @SoulSleep 看您的描述,整个链路都是用 canal 实现的,那历史数据方便怎么同步到 es 呢?我了解的是 canal 对全量数据的同步支持不是很好
    brader
        31
    brader  
       202 天前
    可以了解下这个项目 https://github.com/brokercap/Bifrost
    用 go 写的,也很节省服务器资源
    potatowish
        32
    potatowish  
       202 天前   ❤️ 1
    debezium 支持全量和增量同步,flink cdc 也是基于它
    FawkesV
        33
    FawkesV  
       202 天前
    学习了
    Desdemor
        34
    Desdemor  
       202 天前
    canal -> kafka -> 处理服务 -> es
    awalkingman
        35
    awalkingman  
       202 天前
    @j1132888093
    @likeme
    那要是写 mysql 成功了,写 es 失败了,,,数据的一致性咋个维护
    v2e0xAdmin2
        36
    v2e0xAdmin2  
       202 天前
    @jiobanma https://github.com/colosobo/drc

    我们公司用的这个项目,可以全量同步,也可以增量同步。写到 kafka 或者 mq 里,然后 flink 消费写到 es 里。
    v2e0xAdmin2
        37
    v2e0xAdmin2  
       202 天前
    @SoulSleep canal 运维太麻烦了。
    QunLeLZ
        38
    QunLeLZ  
       202 天前
    我们现在使用的是从 pg 同步数据到 es ,中间还要做一定逻辑处理,还是比较方便的,直接写 SQL 就行了。不过我们用的是阿里的 Flink 服务,自建 Flink 的话会稍微麻烦点,但我理解应该问题也不会太大。
    wmwgijol28
        39
    wmwgijol28  
       202 天前
    CloudCanal 社区版
    wxw752
        40
    wxw752  
       202 天前
    flink-cdc
    j1132888093
        41
    j1132888093  
       202 天前
    @newskillsget 先写 mysql ,再写 es ,失败就回滚 mysql 。如果是写完 es 再抛出异常的话,就要手动回滚 es 了,但是我们公司这种场景目前很少,还能 cover 住
    potatowish
        42
    potatowish  
       202 天前
    @changdy 第二个
    fengpan567
        43
    fengpan567  
       201 天前
    canal 监听 binlog ,kafka 消费写入 es
    noparking188
        44
    noparking188  
       201 天前
    CDC 到队列,自己写程序或者 lambda 写 ES ,可行吗,是不是更简单一点?
    我之前的公司是双写+重试,错了人肉纠正
    BQsummer
        45
    BQsummer  
       201 天前
    前段时间调研过 doris 的数据同步:
    canal 文档一坨屎, 很多参数得从源代码里找, 数据量大的话会有坑
    flink cdc 开发最省事, 但是集群运维费事
    debezium 中文文档为 0, flink cdc 最早的同步方式是通过它, 后来自己实现了一套
    CloudCanal 是 starrocks 文档里推荐的, 国产, 还不开源, 不敢用
    其实上云是最省力的方式
    SoulSleep
        46
    SoulSleep  
       201 天前
    @jiobanma #30 历史数据 canal 支持一次性迁移,类似 etl
    @v2e0xAdmin2 #37 麻烦在哪儿呢?我们用 canal-admin 管理配置,指标暂时没输出到 grafana...监控主要关注业务异常,主要是稳定,基本上没出过问题
    dabai0806
        47
    dabai0806  
       201 天前
    logstash 其实是可以配置秒级同步的(这也是我碰巧试出来的 文档和网上文章都没看到有人提过)

    线上几万条数据, 我配置的是 10 秒同步一次 就是同步的时候 cpu 会上飙百分之 30 左右

    配置参数
    schedule => "*/10 * * * * *"
    dabai0806
        48
    dabai0806  
       201 天前
    logstash 维护方便些同步数据的自由度也高 写条查询 sql 就完事了, canal 就是个半成品, 难用的要死
    lueluev
        49
    lueluev  
       201 天前
    好熟悉的问题 flume 还有人用吗
    fridaycatye
        50
    fridaycatye  
       201 天前
    datax 可用,基于查询做同步,灵活
    fishtocat
        51
    fishtocat  
       201 天前
    学习了,我们现在是基于 canal 自己实现的 golang 客户端消费更新信息
    liprais
        52
    liprais  
       201 天前
    flink-cdc 开 ckpt 完事
    ufo5260987423
        53
    ufo5260987423  
       201 天前
    我不知道您为啥说 logstash 效率低,不论是全量更新还是增量更新,效率都还不错。我处理过的专利数据大概几百个 G ,都能在可接受时间内完成任务。

    如果需要实时更新,那么在事物里面同步操作 es 就好了。
    可能我了解的还不够多,请您指正。
    ffkjjj
        54
    ffkjjj  
       201 天前
    debezium
    awalkingman
        55
    awalkingman  
       201 天前
    @j1132888093 人工补偿兜底,很真实了。巧妙的人工介入能极大地降低系统复杂度,谓之四两拨千斤。
    v2e0xAdmin2
        56
    v2e0xAdmin2  
       201 天前
    @SoulSleep 加机器麻烦
    zhangxudong
        57
    zhangxudong  
       201 天前
    可以试试阿里云 DTS 原班人马搞的 cloudcanal
    HunterPan
        58
    HunterPan  
       201 天前
    datax
    vgbhfive
        59
    vgbhfive  
       201 天前
    datax 可以考虑下
    GopherDaily
        60
    GopherDaily  
       201 天前
    kafka connect, 如果你们已经有 kafka 的话,
    debezium + es sink 吧
    tairan2006
        61
    tairan2006  
       201 天前
    我记得 datax 是专门搞这个的,不过我也没用过
    leeton
        62
    leeton  
       201 天前
    我们就是用 canal 监听 binlog😅 发 kafka 消息一条一条刷增量数据
    ldh756034624
        63
    ldh756034624  
       201 天前
    cirton
        64
    cirton  
       201 天前
    ogg 可以实时同步到 es 。之前用 ogg 同步 mysql 和 oracle ,效率非常够用,效果也很好,但是出了问题运维是个麻烦事儿。
    刚看到你的 es 版本是 8 的,那就用不了了。
    Morriaty
        65
    Morriaty  
       201 天前
    @SoulSleep 我们说的应该不是一个问题。举个例子,

    es 里有个 title 字段,类似「高启强大规模犯罪集团终落网」,分词有问题,分成了「高启 / 强大 / 规模 / 犯罪 / 集团 / 终 / 落网」,于是我们更新了人名库,因为可能有很多历史 title 都包含这个词,需要把整个索引 reindex 一遍。

    这个过程基于历史数据量不同,耗时不同,可能需要几个小时的时间。但你线上正在提供 online search service 不能停,不能等这几个小时的 reindex 。

    所以就需要两个并行的 full indexing 和 realtime indexing ,以及两个链路的切换逻辑
    SoulSleep
        66
    SoulSleep  
       200 天前
    @Morriaty #64 不错,我们的场景还没有这么复杂,只是加速分库的 mysql 查询效率,这么精细的需求对我们现在的场景来说,就直接刷 db 的 binlog 重新同步去做了,按照最大的效率,估计要 2 个小时左右....确实无论怎么做代价都很大....
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   5869 人在线   最高记录 6067   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 43ms · UTC 06:14 · PVG 14:14 · LAX 23:14 · JFK 02:14
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.