一文带你理解 RocketMQ 广播模式实现机制

网站建设5年前发布
15 0 0

2023030610280482ed61b050237bd034c2903a944161f6a59a40257,大家好,我是君哥。今天聊聊 RocketMQ 的广播消息实现机制。,RocketMQ 有两种消费模式,集群模式和广播模式。,集群模式是指 RocketMQ 中的一条消息只能被同一个消费者组中的一个消费者消费。如下图,Producer 向 TopicTest 这个 Topic 并发写入 3 条新消息,分别被分配到了 MessageQueue1~MessageQueue3 这 3 个队列,然后 Group 中的三个 Consumer 分别消费了一条消息:,202303061028055650b9231dc7d9e3e7d7535327efd56b7789c5989,广播模式是  RocketMQ 中的消息会被消费组中的每个消费者都消费一次,如下图:,20230306103341b5bea70568b9a051802560296940e299bbad69211,使用 RocketMQ 的广播模式时,需要在消费端进行定义,下面是一段官方示例:,从代码中可以看到,在定义 Consumer 时,通过 messageModel 这个属性指定消费模式,这里指定为 BROADCASTING,也就启动了广播模式的消费者。,以 RocketMQ 推模式为例,看一下消费者调用关系类图:,2023030610341069bb81c3213a7336f22793902e9ca737079f8f446,DefaultMQPushConsumer 作为启动入口类,它的 start 方法调用了 DefaultMQPushConsumerImpl 类的 start 方法,下面重点看一下这个方法。,start 方法中调用了 copySubscription 方法,代码如下:,这里的代码有一点需要注意:集群模式会创建一个重试 Topic 的订阅关系,而广播模式是不会创建这个订阅关系的。也就是说广播模式不考虑重试。,下面是初始化 offset 的代码:,从上面的代码可以看到,广播模式使用了 LocalFileOffsetStore,也就是说偏移量保存在客户端本地,除了在内存中会保存,在本地文件中也会保存。,ConsumeMessageService 是真正拉取消息的地方,消费者初始化时会初始化 ConsumeMessageService,并且这里会区分并发消息还是顺序消息。,在集群模式下,需要获取到 processQueue 的锁才会拉取消息,而在广播模式下,不用获取锁,直接就可以拉取消息。判断逻辑如下:,这里有个疑问,对于顺序消息,获取锁是必须的,这样才能保证一个 processQueue 只能由一个线程进行处理,从而保证消费的顺序性。那对于广播模式,为什么不用获取 processQueue 的锁呢?难道广播模式不支持顺序消息?,对于并发消息,广播模式不同的是,对消费结果的处理。集群模式消费失败后需要把消息发送回 Broker 等待再次被拉取,而广播模式则不需要重试。代码如下:,这再次说明,广播模式是不支持消息重试的。,在消费者启动过程中,会调用 RebalanceService 的 start 方法,进行重平衡。从重平衡的代码中可以看到,广播模式消费者会消费所有 MessageQueue,而集群模式下会根据负载均衡策略选择其中几个 MessageQueue。代码如下:,上面 updateProcessQueueTableInRebalance 这个方法调用前,要获取到需要消费的 MessageQueue 集合。广播模式下,直接取了订阅的 Topic 下的所有集合元素,而集群模式下,则需要通过负责均衡获取当前消费者自己要消费的 MessageQueue 集合。,本文主要讲解了 RocketMQ 广播消息的实现机制,理解广播消息,要把握下面几点:,1.偏移量保存在消费者本地内存和文件中。,2.广播消息不支持重试。,3.从源码上看,广播模式并不能支持顺序消息。,4.广播模式消费者订阅了 Topic 下的所有 MessageQueue,不会重平衡。

© 版权声明

相关文章