如题,有两个单体项目,一个是管理后台,一个是接口服务,有一个子模块群组消息队列,管理后台应用和接口都引用了,现在是同一个消费组,同一个消费者,因为不能重复消费,消费的时候进行轮询,现在的问题是
大家是否会碰到这种问题,docker 多实例部署会考虑定时任务,消息队列,在多个容器运行的情况吗, 定时任务已经通过 xxl-job 去解决这个问题,但消息队列不知道怎么处理,望赐教
1
199808lanlan1111 2023-02-02 17:02:57 +08:00 via Android
分成两个项目就意味着每次要打开两个 idea ,分支要高俩,发板要搞俩等等,可以先搞在一起,但是模块分开,后面请求量上来可以单独拆分保证稳定性可靠性
|
2
wangxin3 2023-02-02 17:04:05 +08:00
具体什么消息队列呢,rabbitmq 绑定在同一个队列上的消费者组是不会重复消费的
kafka 也是同理,消费者配置为同一个消费者组也是不会重复消费的 |
3
mooyo 2023-02-02 17:05:21 +08:00
前司分开了,现司没分开。感觉没区别。
|
4
dolorain 2023-02-02 17:06:17 +08:00
看体量多大了,小打小闹肯定没必要了
|
5
simonlu9 OP @199808lanlan1111 现在就是同一个项目的,只是不同 application ,就消费队列这个问题不好处理,能不能中心化
|
6
199808lanlan1111 2023-02-02 17:08:41 +08:00 via Android
@199808lanlan1111 没审题 说错了,但 op 的问题我只看懂了最后一个问题,需不需要考虑多实例问题。
首先肯定要考虑的,这就是分布式系统的特性。消息队列你不需要考虑,消息会靠队列进行负载均衡,每个实例会会处理一个或者多个队列的消息 |
7
koloonps 2023-02-02 17:10:24 +08:00
“如果接口部署了多实例,同一个消费者会争夺同一个消息进行处理,浪费了大量线程资源,也不能提高消费效率” rabbitmq 在消息没有退回 /超时之前 mq 服务器不会重新推送
|
8
simonlu9 OP @wangxin3 同一个消费组,是用 redis stream,只是消费者名称都是写死在代码里面,所以多实例,最终还是同时消费一条消息,除非部署多实例的时候消费者名称动态配置
|
9
wangxin3 2023-02-02 17:18:52 +08:00
@simonlu9 #8 原文:“@wangxin3 同一个消费组,是用 redis stream,只是消费者名称都是写死在代码里面,所以多实例,最终还是同时消费一条消息,除非部署多实例的时候消费者名称动态配置”
====== 回复:不理解你说的 可以画个架构图? |
10
colincat 2023-02-02 17:22:05 +08:00
@simonlu9 该消息模块是否可以动态传入 groupName ,当管理后台引用是使用管理后台消费组-配置到配置文件中,当接口服务使用时使用 接口消费组
|
11
simonlu9 OP @wangxin3 代码逻辑大概是这样,考虑以下方法再多实例运行,消费组是同一个,消费者名称是写死
@Override public void run(ApplicationArguments args) throws Exception { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, ChatGroupUserDTO>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(10) .executor(executor) .pollTimeout(Duration.ofSeconds(5)) .targetType(ChatGroupUserDTO.class) .build(); StreamMessageListenerContainer<String, ObjectRecord<String, ChatGroupUserDTO>> container = StreamMessageListenerContainer.create(redisConnectionFactory, options); prepareChannelAndGroup(redisTemplate.opsForStream(), MESSAGE_STREAM, MESSAGE_GROUP); container.receive(Consumer.from(MESSAGE_GROUP, "consumer-1"), StreamOffset.create(MESSAGE_STREAM, ReadOffset.lastConsumed()), messageListener); this.container = container; // 启动监听 this.container.start(); logger.info("{}启动成功",MESSAGE_STREAM); } |
12
leeraya 2023-02-02 17:30:07 +08:00
现司就是专门搞了一个消息队列中转服务,根据不同的策略接收转发消息。
好处就是整个服务网格内几乎所有用到消息的服务都走中转,问题排查集中在中转站存的消息日志。 不过就是要专门有人维护这种基础设施服务,又是搞消息的,费人工。 |
13
wangxin3 2023-02-02 17:30:54 +08:00
@simonlu9 #11 原文:
回复:consumer-1 保证消费者名称不重复不就行了?你现在可以单实例,把这个代码在运行一份,两份不同的消费者名称,但在同一个消费者组,看看是否重复消费了。 |
14
simonlu9 OP @wangxin3 在同一个消费组,消费者名称一样的话,假设有三个实例,thread-1 thread-2 thread-3, 每条消息只会投递到某个实例,不会三个实例都投放,我的主要问题是如果像这种多个实例,好像会饿死线程,大量浪费
|
15
wangxin3 2023-02-02 17:44:21 +08:00
@simonlu9 #14
====== 回复:怎么会呢,发布订阅模式不就是 redis 有消息才会给消费者发消息吗,redis 只有一条消息,轮询到实例 A 了,就发给实例 A ,实例 B 和 C 该干啥干啥呀,怎么会饿死线程,大量浪费。实例 B 和 C 又不会因为消费者线程阻塞在等 redis 发消息,有消息才会处理呀。 |
16
nothingistrue 2023-02-02 17:54:46 +08:00 1
微服务是跟着业务走,不跟着技术实现方式走的。消息队列消费者,绝大多数情况下,都不对应一种业务(具体的说就是实体、实体表、限界上下文这些),当然不能单独拆成一个服务。
问题 1 ,可以通过配置消费者组进行解决,一个消费者组,同一个消费者只会按调度规则扔给唯一的消费者。RabbitMq 、Kafka 都这只,Spring Cloud Stream 还提供了超简单的实现方式(不过运维要麻烦点,后面说)。 问题 2 ,我没明白你说得是什么,看起来像是系统升级时候的配合问题,这个解决起来稍微麻烦但也不是不能解决。生产者消费者如果不能同时更新,那么消息协议上,就要考虑多版本同时兼容的问题了。 最后说说运维麻烦的地方。多实例的时候,对于接口调用,是不用区分具体哪个示例的,负载均衡机制随便选一个就行了,所以实例无需明确 ID ,随机生成都可以。但是对于消费者组,就不能那么随意了,通常都是要明确给出实例 ID 的,不能随机生成,这会增加部署的麻烦成都。。 |
17
simonlu9 OP @wangxin3 我算一下资源成本,一个业务一个主题,10 个业务就 10 线程在监听队列,如果再单台机器部署多实例,10*n 个线程在跑,没意义
|
18
simonlu9 OP @nothingistrue 但是对于消费者组,就不能那么随意了,通常都是要明确给出实例 ID 的,不能随机生成,可以说说这块运维一般怎么搞的吗,公司运维也是我
|
19
liyanggyang 2023-02-02 18:42:45 +08:00
消费过后,数据存储的在一个地方,那不就没任何问题了。
关于消费者放在哪儿的问题,我理解,看这个消息的类型: 1. 这个消息是用户业务相关的,那么放在接口服务。比如接口服务是账单服务,那么支付系统发送过来的支付消息,就在接口服务。 2. 这个消息只是后台管理相关的,那么就放在管理后台。比如消息队列是 xxx 公司报表系统发送过来的,需要做 xxx 管控,那么就在管理后台,因为它与用户业务无关。 |
20
simonlu9 OP @liyanggyang 举一个很简单例子,比如群吧,解散后会有后续动作,接口端可以解散, 管理后台也可以解散,都是共用一个逻辑
|
21
wolfie 2023-02-02 18:59:37 +08:00
> 如果接口部署了多实例,同一个消费者会争夺同一个消息进行处理,浪费了大量线程资源,也不能提高消费效率
广播? |
22
liyanggyang 2023-02-02 19:21:28 +08:00
@simonlu9 这样来说,业务功能是面向用户的,管理后台是面向软件运维者。
这个例子,那这个很好理解了,群创建或解散,属于业务的功能(当然你说后台也具备功能,这说法没问题,但是后台只是给软件的管理员提供一个集中管理的便捷),所以这个肯定是接口服务,一个群解散的接口服务,消息在接口服务里面消费。管理后台走内部通道去调用接口服务(即接口服务是 api1 ,可以专门写一个内部接口 api2 ,但是 api1 2 里面的实现 service.dell()是一个 ) |
23
kjstart 2023-02-03 05:21:07 +08:00
根据异步负载决定, 因为要分别做弹性伸缩
|
24
kjstart 2023-02-03 05:22:48 +08:00 1
可以用一套代码, 用配置文件决定是否拉取消息就可以了
|
25
nothingistrue 2023-02-03 09:22:09 +08:00
|
26
winglight2016 2023-02-03 09:34:41 +08:00
@nothingistrue #25 说得对,我司以前就是把监听消息和业务处理放在一起,会导致非常严重的问题。我现在正在做优化,完全分离 MQ 和业务,专门做一个单独的应用来负责映射转发。
另外,lz 担心的应用如何确定具体实例问题,一般业务后台如果是多实例部署,前面会放一个 LB ,由 LB 来决定具体哪个实例来响应请求 |
27
nothingistrue 2023-02-03 09:41:48 +08:00 1
关于实例 ID 这部分,这个重点是,消费者必须明确的告知消息中间件两个属性:一共有多少个实例,当前实例区分其他实例的标识(可以简单的只是个序号)。这是业务无关的,只跟运维实例的部署配置有关。配置的内容也不多,只需要消费者多加两个配置项,但是这俩配置项管理起来比较麻烦,因为它们的值不可预定义,只能在部署的时候动态决定,且每次部署都要重新决定。
单纯负载均衡或者随机分配的多实例消费者组的话,理论上是无需理会实例 ID 的。当消息不是自动均衡分配,而是按规则分区分配(比如说 1-5 给实例 1 ,6-10 给实例 2 )的时候,就必须明确给出实例 ID 了。 我这里参考的是 Spring Cloud Stream ( https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-partitioning )。请注意这些规则不是 Spring Cloud Stream 决定的,是消息中间件决定的,Spring Cloud Stream 只是做了上层抽象使其用起来更简单些。 |
28
nothingistrue 2023-02-03 09:49:08 +08:00
@winglight2016 #25
只需要把监听消息和业务处理在应用内部解开就行了,不要拆成不同应用。监听消息和业务处理放在一个服务中,就只是底层的一个线程和异步线程池,对上层业务逻辑没影响。你要分成两个应用,先不考虑资源浪费问题,业务逻辑就搞复杂了。通常来说,映射转发也就几条规则,处理几十个队列,不管是业务逻辑还是性能上,都不具备独立出去的需要。只有映射转发规则多到需要专门的管理界面的时候,才能考虑独立出去。 |
29
morty0 2023-02-03 10:16:56 +08:00
@nothingistrue 全部消息都异步处理, 监听服务收到消息就直接 ack 吗, 还是等异步处理完 ack 呢?
|
30
NoKey 2023-02-03 10:19:49 +08:00
没怎么看懂,大概给你出个主意:
1. 把 kakfa 拿来好好研究一下 2. 如果有很多服务器,然后更新可能不及时,消息连结构都改了,会导致旧的消费者拿到之后出错,那么,新增一个 topic ,新旧 topic 共存一段时间,等全部更新完,应该就 ok 了。 看看有没其他大神有好办法。 |
31
nothingistrue 2023-02-03 10:39:37 +08:00
@morty0 #28 如果是异步处理,那么收到即表示成功,自然是收到就 ack 。但不是所有消息都要异步处理,这个具体要看是啥消息。ack 或者 死信机制,只对同步处理的消息有作用,异步处理的消息,需要有其他机制做异常处理。
|
32
winglight2016 2023-02-03 11:38:35 +08:00
@nothingistrue #28 我们的场景不太一样,需要容器化部署在 k8s 上,而且基于 python ,必须在单独的容器中启动监听服务。然后,这个容器进程如果爆出异常又没有捕获会导致整个容器都不能正常启动,所以必须和业务代码分离。另外,虽然只有几条规则,但是不能写在代码里,不然规则变化也会导致重新打包和部署。
|
33
zeonll 2023-02-03 16:49:05 +08:00
要分开,方便隔灾和扩容
|
34
cnlinjie 2023-03-21 21:45:53 +08:00
spring cloud stream 官方好像没有支持 redis 的库,你是用 `https://github.com/spring-attic/spring-cloud-stream-binder-redis` 这套吗?
|