五张图带你理解 RocketMQ 消费者启动过程

网站建设4年前发布
15 0 0

大家好,我是君哥。,今天来分享 RocketMQ 中一个关键的知识点,消费者的启动过程。,多数消息队列中,消费者和 Broker 通信的方式有两种,PUSH 模式和 PULL 模式:,首先看下面这张图:,图中可以看出,消费者需要注册到 Name Server,拉取消息的时候可以从 Broker 主节点拉取,也可以从 Broker 从节点拉取。,在 RocketMQ 的源码中,拉模式有两个消费者相关的类,其中 DefaultMQPullCons umer 类已经被废弃,官方推荐使用 Defau ltLitePullConsumer 类。下面代码来自官方示例:,上面代码中消费者属于消费组 lite_pull _consumer_test,订阅了【TopicTest 】这个 Topic 下的所有 tag。下面一起看一下启动方法。下图是消费者启动过程中类调用关系图,图中心的 pullRequestQueu e 是核心,pull 请求会先发送到这个队列,然后循环地拉取处理。,消费者启动时首先会检查配置,检查的配置项如下:,这部分源代码见 DefaultLitePullConsum erImpl#checkConfig。,如果是集群模式,实例名称改为【进程 ID + “ #” + 系统时间(纳秒 )】,代码如下:,创建一个 MQClientInstance 实例,然后把消费者注册到 MQClientInstance。,对 RebalanceLitePullImpl 实例初始化,给下面的参数赋值:,负载均衡线程启动后,默认每 20s 做一次负载均衡,见如下代码:,PullAPIWrapper 这个 Wrapper 类是 MQ-ClientInstance 类的 Wrapper 类,类中 pullKernelImpl 方法对 MQClientInstance 类中的 pullMessage 方法进行了装饰,这个装饰类主要增加了下面功能:,代码如下 :,offset 存储器的 UML 类图如下:,有两个实现类分别对应集群模式和广播模式,本文讨论的集群模式的实现类是 RemoteBrokerOffsetStore。offset 可以存储在本地或者远端服务器。,启动 MQ 客户端主要包括如下步骤:,从下面的代码可以看出,PULL 拉取消息最终使用了 DefaultMQPushConsumer Impl,所以 PULL 模式和 PUSH 模式拉取消息的逻辑是一样的。,5.启动 MessageQueue 负载均衡线程。,6.启动生产者线程;7.把 serviceState 改为 Running。,7.源码参考 MQClientInstance#start。,这个定时任务默认每 30s 执行一次,用于监听每个 Topic 下的 MessageQueue 是否发生变化。代码见 startScheduleTask 方法。,轨迹消息主要用于跟踪消息发送、消息消费的轨迹,用于记录详细日志。代码如下:,这里不详细展开了,后面再单独讨论。,本文通过源码分析讲解了 RocketMQ 中 PULL 模式下的消费者启动过程,在生产上使用比较多的还是 PUSH 模式,PULL 模式拉取消息的方法跟 PUSH 模式一样,不同的是 PULL 模式需要应用程序进行拉取动作,可以通过 PULL 模式的学习更容易的理解 PUSH 模式。最后,分析一个 PULL 模式启动过程涉及的 UML 类图:,

© 版权声明

相关文章