V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
chenfang
V2EX  ›  程序员

RocketMQ 消费写入 MySQL 问题

  •  
  •   chenfang · 2023-04-17 10:21:01 +08:00 · 3313 次点击
    这是一个创建于 621 天前的主题,其中的信息可能已经有所发展或是发生改变。

    公司需求是把项目中产生的数据,实时入库到 MySQL,这些数据分别属于不同的 table.

    现在的方案是把产生的数据通过 tomcat -> RocketMQ 的一个 topic 中,然后启用一个 MQ 消费者组合并消息(如果不合并入库次数太多也会慢),然后 batch 入库,这个方案单表没有任何问题.

    如果设计到多个表入库,因为 mq 消费是顺序读取 topic 中的消息,也就导致如果一个表需要入库的数据量大,那么入库时间就会长,这会导致整体消费变慢,从而导致实时表不实时的问题.

    这就是我现在遇到的问题

    1. 入库 MySQL 的时间长,解决时间长的问题(这个我目前想不到解决办法)
    2. 如果解决不了入库时间长的问题,那么就让他不影响其他表入库

    我自己想过解决方案,针对问题 2 的

    方案 1

    从一个消费者组 改为 按表名创建消费者组,但是可用性不高

    按表名创建消费者组,使用过一段时间,因为表比较多,然后在一个 jvm(如果多个程序分别启动不同的消费者组也不好管理)中启动那么多消费者组,首先内存会占用很多(应该是每个消费者组都会缓存一定数量的消息),也就是内存的大小跟实时表的数量是等比上涨的.其次我查了一下说是消费者组数量也不是可以无限加的,目前我们是 110 多张表,也比较多了.考虑到之后还会加表,这个方案试行了一段时间就废掉了.

    或者这个方案有没有优化空间?

    方案 2

    把入库时间长的表单独分一个消费者组,可用性比第一个还低

    经过实践,不晓得哪个时间点,表里数据生产就会增多,所以这样也会导致可能突然就延迟了....很不可控

    然后就是大家有什么思路么?万分感谢!

    28 条回复    2023-04-18 10:02:44 +08:00
    Seulgi
        1
    Seulgi  
       2023-04-17 10:30:57 +08:00
    一个队列,消费时按表丢缓存,异步任务定时 10 秒,30 秒自己看能接受的延迟,记得设置缓存大小触发强制写。
    举个例子,table_a ,table_b ,异步定时写任务 10 秒触发一次,表缓存强制 50mb 时触发写数据库操作。注意锁问题
    chenfang
        2
    chenfang  
    OP
       2023-04-17 10:36:12 +08:00
    @Seulgi 这就是现在正在跑的版本,也是用了 jvm 的内存做缓存这种机制,还有强制触发写操作,但是还是不成,首先缓存强制触发不可以无限触发,比如同时入一个表的线程最多是 3 个,那么最后还是需要等待入库完成,从 MQ 消费读取消息也会触发等待...
    Seulgi
        3
    Seulgi  
       2023-04-17 10:38:56 +08:00   ❤️ 1
    表过多,建议将表名设置为 tag ,按 tag 切分消费组,多部署消费者提高消费速度,实时性较高的表,可以单独一个 tag 一个消费组。
    举例:比如 table_a,table_b,table_c ,table_a 要求实时,部署 4 个 pod 只消费 tag:table_a 的数据,配置定时异步写任务为 1 秒或者为实时写。table_b,table_c 不要求实时,部署 4 个 pod 只消费 tag:table_b|table_c ,配置定时异步写任务为 30 秒
    Seulgi
        4
    Seulgi  
       2023-04-17 10:39:59 +08:00
    @chenfang 强制触发有等待,那说明你们的消费速度>写速度。要做的时将消费均摊。也就是 pod 要多部署。
    AS4694lAS4808
        5
    AS4694lAS4808  
       2023-04-17 11:02:57 +08:00   ❤️ 1
    有类似的场景,不过是 AWS 云上。
    目前是 API (tomcat) -> Kinesis data stream (RocketMQ) -> Kinesis Firehose (Flink/Fluentd) -> S3 (MinIO)
    -> OpenSearch (ES)
    每日定时把 S3 的数据 Load 到 Redshift (Mysql)里,删除 ES 7 日以上 index

    查询的时候 7 日以下 -> ES
    7 日以上 -> Redshift (MYSQL 分库分表)

    我们业务一开始也是读写一起走库的,但是显然只能支撑小数据量,而且后面查询也多了,就重构了一遍。
    8355
        6
    8355  
       2023-04-17 11:17:46 +08:00
    问题 1 为什么不直接同步入库而采用异步入库的形式?就算使用异步入库也不用很多表或者说达到 110 张表都异步去写吧。
    问题 2 现在每天写入量大概是多少?入库 MySQL 的时间长大概是多长 每条 sql 写入多少行?慢写入有多少个索引多少个字段?
    chenfang
        7
    chenfang  
    OP
       2023-04-17 11:19:17 +08:00
    @AS4694lAS4808 很多项目都是读数据库表里的表,改成 ES 很难...费时费力估计老板不会同意去搞
    chenfang
        8
    chenfang  
    OP
       2023-04-17 11:27:57 +08:00
    @8355
    答案 1 单个消息里存不了太多数据,单次入库的时间加起来,是比批量入库的时间长的,还是跟表数据量太大有关系
    答案 2 最大的表迁移到了 Doris 一次入库 50-80 万条左右,设置的是间隔 50s 一次强制写入,入库时间现在是 40-50s

    慢写入有多少个索引多少个字段? 这个我不晓得..
    8355
        9
    8355  
       2023-04-17 11:34:40 +08:00
    不管在方案 1 还是方案 2 都无法满足你的原始需求 项目中产生的数据,实时入库到 MySQL
    当执行 update 操作时 你前台返回操作成功 但后台并不一定能绝对执行成功
    当消息堆积时现开消费者时来不及的,会出现执行增删改操作后查询还是原来的值

    项目复杂度会指数型上升 你需要特别小心的处理缓存数据和刷新的时机 为了这个方案你的代码量起码要翻一倍
    所以你这两个方案都是及其糟糕的

    主力削峰业务进队列 99%的业务应该同步读写 最简单的就是最好的也是最不容易出问题的
    8355
        10
    8355  
       2023-04-17 11:40:51 +08:00
    @chenfang #8 那我的理解你的核心最大的问题是在写库时间 而不是为了解决这个问题再上面增加复杂度
    哪怕你的队列消费的再快你的写库时间还是会很长,还会牵扯到刷盘策略问题,失败异常数据全丢问题更大。
    pkoukk
        11
    pkoukk  
       2023-04-17 11:51:47 +08:00
    如果非要把数据直接写入 MySQL ,那你这个场景的瓶颈在 MySQL 上啊
    几十万的数据再 mysql 配置再高也得好几秒吧,就算你分 topic 了,其它写入也会被阻塞住
    分库吧,按消息的某个 ID 字段分 HASH ,提高下游处理速度
    liprais
        12
    liprais  
       2023-04-17 12:34:25 +08:00
    搞个 flink 完事
    dlmy
        13
    dlmy  
       2023-04-17 12:37:46 +08:00
    这个问题的核心是 “MQ 中消息消费速度远大于入库速度,并且需要实时入库到 MySQL”,如果一定要坚持 “实时入库”,那么不管你用什么方式解决,系统的复杂度相对都会变很高,也可能会带来新的问题,如果去掉 “实时” 这两个字,单就入库来说,会有很多解决方案。

    标记一下,蹲大佬的 trade-off 方案
    standchan
        14
    standchan  
       2023-04-17 13:22:37 +08:00
    这个实时性有点麻烦啊,一个是很快的消息队列,一个是必须要进行 io 的数据库。要不试试 clickhouse ?但是换数据库要大改更麻烦
    lolizeppelin
        15
    lolizeppelin  
       2023-04-17 13:30:07 +08:00
    搞笑啊 所有 mq 只要多个消费者都可能出现写入顺序问题 还要实时
    拍啥脑门写方案呀
    hhjswf
        16
    hhjswf  
       2023-04-17 14:03:59 +08:00   ❤️ 2
    谁做的选型啊,mq 本来就是拿来做异步,要求实时不是搞笑?
    fkdog
        17
    fkdog  
       2023-04-17 14:15:07 +08:00
    入库的数据除了 insert 是不是也包括 update ?
    如果是 update 是不是需要考虑并行消费顺序不一致脏写问题?
    kafka 开多个 partition 不同表写入不同的 partition ,或者干脆不同表设计不同 topic 不知道能不能满足你的需求。
    没用过 rocket ,不过应该有对应概念。
    Red998
        18
    Red998  
       2023-04-17 14:25:15 +08:00
    看业务对实时怎么看待了、技术角度都不是实时、只是延迟时间长短问题。 或者可以使用 binlog 方式去监听然后消费 MQ
    、canal 了解下。
    urnoob
        19
    urnoob  
       2023-04-17 15:03:46 +08:00
    OP 需要澄清下 实时 这个需求.是真的实时还是可以接受一定范围内的延迟.
    真实时,那为啥不直接入库,就想常见的 CRUD 那样,何必放个 MQ
    可延迟,那上面已经提供了一部分方案了.
    对于方案 1 的优化
    可采用高低搭配,量大的表部署更多消费者,硬件配置也更好.(OP 方案 2)
    还取决于具体业务.
    比如 tomcat 过来的数据进同一张表,数据之间是否有关联.
    有关联 比如同一个设备位置更新. 但是设备之间无关联,那就按设备唯一标识符区分,确保对于同一设备入库先后顺序.
    如果有关联可合并,那就在写入 MQ 前做一定的合并操作减少总量.
    没关联 那就直(批量)入库.

    很奇怪 OP 的方案 2
    不晓得哪个时间点,表里数据生产就会增多
    数据已经入库 但凡有个创建时间都应该知道什么时候变多,怎么会不知道呢.我觉得是需要具体深入挖掘的.


    另外还有一种优化,就是 tomcat 那作为生产者是可以知道某几个表数据量增长的 它可以
    临时增加消费者
    用 MQ 通知所有消费者,对消费策略做一定更改.不分表的情况下将量多的表做批量写入 或者搭配方案 1,对这部分数据再写入 MQ,然后再按表(Tag)进行消费 这都能减小其他表的写入延迟影响..

    但没有具体业务场景也只能说的泛泛

    如果大到 Mysql 性能跟不上,那就肯定要做扩容了.
    buddyy
        20
    buddyy  
       2023-04-17 15:31:15 +08:00
    建议你看一下 MySQL 所在机器的磁盘 IO 情况,是否出现了 IO 饱和的情况。如果 IO 饱和你必须得分库了。
    burymme11
        21
    burymme11  
       2023-04-17 15:32:22 +08:00 via Android
    要实时的去写入 mysql ???我猜下是不是有其他地方要实时的去读?
    如果是这样就有别的曲线救国方案。
    burymme11
        22
    burymme11  
       2023-04-17 15:33:25 +08:00 via Android
    在 mq 和 mysql 之间加一层
    lopssh
        23
    lopssh  
       2023-04-17 15:44:55 +08:00
    没看懂需求,要实时的数据,为什么还走 MQ 呢?
    DinnyXu
        24
    DinnyXu  
       2023-04-17 15:57:26 +08:00
    @burymme11 我觉得你说的对,如果不实时读,干嘛要实时写,实时写一般是为了应付统计之类的,50-80w 的数据要进行实时写,对 MySQL 来说有很大的压力。

    如果要实时读,可以根据读的业务来缓存数据。
    DinnyXu
        25
    DinnyXu  
       2023-04-17 15:58:03 +08:00
    @lopssh MQ 就是一个中转层,数据没法第一时间入库,只能通过 MQ 中转
    wqhui
        26
    wqhui  
       2023-04-17 16:23:54 +08:00
    问题是多表的变更都在一个 topic 上串行处理,不同表的数据也要串行吗?如果只是单表串行,是不是可以分开消费线程跟工作线程,每个表开一个工作线程,消费线程直接把接收到的消息扔给对应表的工作线程写库。不过有个风险就是某一个表的数据特别多,对应工作线程处理不过来缓存被挤爆了,而且因为消费线程接收完就 ack ,实际接收到的消息还在工作线程排队写入,会丢数据。另外你的 mysql 扛的住吗,扛不住就分库
    zcxey2911
        27
    zcxey2911  
       2023-04-17 18:47:59 +08:00
    和 mq 就没关系啊,OP 的问题瓶颈是 Mysql ,说白了就是数据量大,写库慢啊

    实时写入 Mysql 数据应该采用 Binlog Load 机制实现。Binlog Load 提供了一种使 Doris 增量同步用户对 Mysql 数据库的数据更新操作的 CDC(Change Data Capture)功能
    raysonlu
        28
    raysonlu  
       2023-04-18 10:02:44 +08:00
    尝试解读一下 OP 的需求。不考虑 mysql 写入性能,从 OP 视角来说,项目的问题通过 MQ 对数据库写入进行 batch 入库处理就能解决了,毕竟 OP 看到现在项目是“单表单队列进行 batch 入库”起到优化作用。这种情况下,如果是多表写入想 batch ,能有什么好的方案?
    队列的设计我比较喜欢用现实场景类比:景区入口可以设有 1 ~ 3 个入口队伍。站在门口的保安发现有旅行团来,就临时增设“旅行团通道”;如果了解到 xx 旅行团来了一大批人,临时增设“xx 旅行团专属通道”;如果收到通知景区与 xx 旅行团签订了长期合作协议,应该就可以长期保留一个“xx 旅行团专属 vip 通道”。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2587 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 15:25 · PVG 23:25 · LAX 07:25 · JFK 10:25
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.