V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
BBCCBB
V2EX  ›  Kafka

[Kafka] 求助, 同一个服务如何组播消费 kafka 某个 topic 的消息呢?

  •  
  •   BBCCBB · 2020-08-21 09:20:30 +08:00 · 3128 次点击
    这是一个创建于 1554 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我现在用的是启动的时候动态生成 groupId, 比如 name + uuid 的方式

    但是这样重启后就会导致原来的 consumerGroup 对应的实例都被销毁了.但 kafka 里依然存在原来的 consumerGroup, 监控上看已经被销毁的 consumerGroup 也会发现堆积越来越严重, 有谁知道正确的使用姿势吗??

    不胜感激

    😢

    31 条回复    2020-08-21 20:48:13 +08:00
    useben
        1
    useben  
       2020-08-21 09:43:14 +08:00
    生成唯一 groupId, 存到文件, 启动时读文件, 有就用原来的, 没有再生成写到文件...
    BBCCBB
        2
    BBCCBB  
    OP
       2020-08-21 09:53:11 +08:00
    @useben 这个在固定机器上是可以的, 但我们这里 docker 镜像每次都不知道到哪个机器上了. 😢
    SingeeKing
        3
    SingeeKing  
       2020-08-21 09:54:10 +08:00 via iPhone
    环境变量?
    hustmisa
        4
    hustmisa  
       2020-08-21 09:55:02 +08:00
    首先 consumer 的使用业务需求是什么,新启动的 groupId 对启动期间的数据是否可以丢弃?
    如果可以丢弃 kafka 配置 retention 很短就可以了,这样不会堆积;如果不能丢弃,配置 retention 长一些这样重启换 groupId 也能续接数据(大概是你现在的方式),但是就不要频繁换 groupId 了啊。我是这么理解的不知道对不对
    BBCCBB
        5
    BBCCBB  
    OP
       2020-08-21 10:08:29 +08:00
    @SingeeKing 都是同一个服务, 不同实例,这个不方便给每个实例加环境变量, 哈哈

    @hustmisa 可以丢弃, 因为应用上有 ack 超时重试机制, 要命的是重启后老的 groupId 不会自动心跳超时消失, 会在监控上看到消息不断堆积. 其实我想实现的就是 rocketmq 的广播的功能.. 但我们使用 kafka.
    mosesyou
        6
    mosesyou  
       2020-08-21 10:11:18 +08:00
    为什么用唯一或者固定的 groupId 不行
    BBCCBB
        7
    BBCCBB  
    OP
       2020-08-21 10:19:32 +08:00
    @mosesyou 这个业务场景需要同一个服务里的不同实例全都消费到每个 MQ
    zardly666
        8
    zardly666  
       2020-08-21 10:28:17 +08:00
    用 redis 做一个类似选 ID 的东西,服务启动份数等于 ID 份数。

    for (int i = 0; i < 启动份数; i++) {
    if (redisUtil.setnx( + i, “lockthing”,time )) {
    bucketConfig.setConsumeZsetBucketNum(i);
    log.info("此实例的消费者为" + i);
    break;
    }
    }

    服务启动的时候,第一个服务拿到 consumerId+1 ;
    第二个服务拿到 consumerId+2 ;
    这样,就复用几个了吧。
    zardly666
        9
    zardly666  
       2020-08-21 10:30:00 +08:00
    代码没删干净,大概意思就是服务启动动态去拿自己所属的 consumer
    wisej
        10
    wisej  
       2020-08-21 10:30:58 +08:00 via Android
    另一种思路,服务同一个 groupid,分发由服务自己来做(拿到服务其它实例的 ip )

    另外旧 cg 堆积会有什么负面影响么?除了消息会冗余地保存,直到 retention 设置的时间被清除
    sonice
        11
    sonice  
       2020-08-21 10:40:54 +08:00
    想多了,consumerGroup 堆积能有多少,起停一次多一个,也不会有很多啊。这也不会导致 zk 性能降低啊
    amwyyyy
        12
    amwyyyy  
       2020-08-21 11:04:23 +08:00
    原来 consumerGroup 的堆积只是个数字,消息数据只有一份,不管你有几个 consumerGroup 。过期的 consumerGroup 会被清理掉。
    kifile
        13
    kifile  
       2020-08-21 11:05:34 +08:00
    我的理解,题主的意思是因为 ConsumerGroup 的 GroupId 每次重启会重新生成一个新的,导致监控面板上出现了废弃的 groupId 的 Lag 不断增大的现象。

    如果重启时 Consumer 的 offset 没有什么意义,那就在重启新应用前,删除老的 ConsumerGroup,做一个这种策略不就好了?
    BBCCBB
        14
    BBCCBB  
    OP
       2020-08-21 11:13:58 +08:00
    @zardly666 这个倒是可以做, 类似 snowflake 算法 workid 的生成. 但相对较麻烦, 老哥但还有没有简单点的解决办法啊 😿
    @wisej consumergroup 堆积就是监控上看着有点慌. 磁盘会占用.
    @sonice 额, 我们多个实例发布的时候重启, 那就会一次性有多个 old consumerGroup, 监控上看着蛇皮的很, 比较难搞.
    @kifile 是, 这是一种方案, 但开发没有 KafkaAdmin 的权限, 所以代码里删除不掉, 只能手动了.... 哈哈


    谢谢各位, 期待更好的方案 🐶
    mosesyou
        15
    mosesyou  
       2020-08-21 11:23:21 +08:00
    纯 docker 么,如果是 k8s 的话,用 statefulset,可以实现每个实例有固定递增编码 0,1,2....
    BBCCBB
        16
    BBCCBB  
    OP
       2020-08-21 11:32:54 +08:00
    @mosesyou 我们将 docker 镜像上传到云上, 然后后续的流程我得研究一下, 问一下我们负责这一块的同事, 如果可行的话这得确是一个好办法. 多谢.
    yangbonis
        17
    yangbonis  
       2020-08-21 11:35:21 +08:00 via iPhone
    mq 不是本来就组播工作的?所有订阅都会收到。
    BBCCBB
        18
    BBCCBB  
    OP
       2020-08-21 11:37:57 +08:00
    @yangbonis 是需要同一个服务不同实例都收到, 你说的这个大概是不同服务.
    j2gg0s
        19
    j2gg0s  
       2020-08-21 12:39:55 +08:00
    @BBCCBB 瞎逼设计,每个实例根据消息在自己的内存里面做些什么工作吗?不能搞个 redis 或者 db ?

    然后,kafka 的监控看到堆积是没有什么大影响的,因为消息只存一份。
    如果你觉得不爽,可以在实例 shutdown 的时候了,把 consumergroup 注销掉?
    j2gg0s
        20
    j2gg0s  
       2020-08-21 12:40:34 +08:00
    @j2gg0s 每次重启,重头还是重新开始消费呢?
    sampeng
        21
    sampeng  
       2020-08-21 12:42:26 +08:00 via iPhone
    @j2gg0s 嗯。然后磁盘就爆了…
    BBCCBB
        22
    BBCCBB  
    OP
       2020-08-21 12:42:59 +08:00
    @j2gg0s 业务场景你都不清楚你瞎 bb 个毛.
    BBCCBB
        23
    BBCCBB  
    OP
       2020-08-21 12:43:55 +08:00
    im 推消息, 量小, 所以还不想做路由中心. 所以采用广播.
    j2gg0s
        24
    j2gg0s  
       2020-08-21 12:44:09 +08:00
    @sampeng kafka 的磁盘被爆炸了?
    rockyou12
        25
    rockyou12  
       2020-08-21 13:24:00 +08:00
    我觉得最好用其它 mq,kafka 本来就不适合这种场景,你这业务看起来也不需要持久化,redis 的 sub/pub 可能都够了
    lwldcr
        26
    lwldcr  
       2020-08-21 14:38:37 +08:00
    你这个问题 加一个预处理步骤就可以了吧。

    比如你一组应用有 10 个实例,那你提前分配好 groupId 名字,如 cg_1, cg_2,..., 然后存到一个地方:DB 、Redis 等

    然后每个应用实例启动时 去存储的地方请求分配一个 groupId,用这个 groupId 启动 kafka 消费服务不就完事了
    JKeita
        27
    JKeita  
       2020-08-21 15:07:03 +08:00
    固定 group id 每次启动清除 offset 怎样?
    yty2012g
        28
    yty2012g  
       2020-08-21 16:56:06 +08:00
    固定 group id,每次设置 offset 到最新应该就可以满足
    IamNotShady
        29
    IamNotShady  
       2020-08-21 19:45:51 +08:00 via iPhone
    redis 的 pub/sub 不香吗?
    timonwong
        30
    timonwong  
       2020-08-21 20:39:39 +08:00
    不要用 High Level Consumer API 就完了,之前用 go 写了一个,也用到了线上一年,不过不保证无 bug

    https://github.com/imperfectgo/kafkasub
    timonwong
        31
    timonwong  
       2020-08-21 20:48:13 +08:00
    原理是手动维护 offset,如果程序不死 retry 的时候保持 offset,程序死了从最新的来,可以按照自己的需求来调整。

    不过有一点要注意的真的是 IM 的话,因为 kafka 的 partition reblance IO 相当大,可能造成非常大的 E2E 的 latency,这点要注意(虽然可以通过配置限制 IO 来绕过)。 总的来说,其实不适合 IM 这个场景
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1481 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 17:17 · PVG 01:17 · LAX 09:17 · JFK 12:17
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.