弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!

网站建设4年前发布
28 0 0

​大家好,我是君哥。,在 RocketMQ 4.x 版本,使用延时消息来实现消息的定时消费。延时消息可以一定程度上实现定时发送,但是有一些局限。,RocketMQ 新版本基于时间轮算法引入了定时消息,目前,精确到秒级的定时消息实现的 pr 已经提交到社区,今天来介绍一下。,RocketMQ 的延时消息是指 Producer 发送消息后,Consumer 不会立即消费,而是需要等待固定的时间才能消费。在一些场景下,延时消息是很有用的,比如电商场景下关闭 30 分钟内未支付的订单。,使用延时消息非常简单,只需要给消息的 delayTimeLevel 属性赋值就可以。参考下面代码:,延时消息有 18 个级别,如下:,延时消息的实现原理如下图:,20230306134402a18805974ae0da456dd906a905acb8dc7c26cc402,Producer 把消息发送到 Broker 后,Broker 判断到是延时消息,首先会把消息投递到延时队列(Topic = SCHEDULE_TOPIC_XXXX,queueId = delayTimeLevel - 1)。定时任务线程池会有 18 个线程来对延时队列进行调度,每个线程调度一个延时级别,调度任务把延时消息再投递到原始队列,这样 Consumer 就可以拉取到了。,延时消息存在着一些不足:,1.延时级别只有 18 个,并不能满足所有场景;,2.如果通过修改 messageDelayLevel 配置来自定义延时级别,并不灵活,比如一个在大规模的平台上,延时级别成百上千,而且随时可能增加新的延时时间;,3.延时时间不准确,后台的定时线程可能会因为处理消息量大导致延时误差大。,为了弥补延时消息的不足,RocketMQ 5.0 引入了定时消息。,为了解决定时任务队列遍历任务导致的性能开销,RocketMQ 定时消息引入了秒级的时间轮算法。如下图:,20230306134518216b21f31137e5ef4304130082d73cf52009ba374,图中是一个 60s 的时间轮,时间轮上会有一个指向当前时间的指针定时地移动到下一个时间(秒级)。,时间轮算法的优势是不用去遍历所有的任务,每一个时间节点上的任务用链表串起来,当时间轮上的指针移动到当前的时间时,这个时间节点上的全部任务都执行。,虽然上面只是一个 60s 的时间轮,但是对于所有的时间延时,都是支持的。可以在每个时间节点增加一个 round 字段,记录时间轮转动的圈数,比如对于延时 130s 的任务,round 就是 2,放在第 10 个时间刻度的链表中。这样当时间轮转到一个节点,执行节点上的任务时,首先判断 round 是否等于 0,如果等于 0,则把这个任务从任务链表中移出交给异步线程执行,否则将 round 减 1 继续检查后面的任务。,基于时间轮算法的思想,RocketMQ 实现了精准的定时消息。使用 RocketMQ 定时消息时,客户端定义消息的示例代码如下:,2.3.1 消息投递,上面的代码构中,Producer 创建消息时给消息传了一个系统属性 deliveryTimestamp,这个属性指定了消息投递的时间,并且封装到消息的 TIMER_DELIVER_MS 属性,代码如下:,Broker 收到这个消息后,如果判断到 TIMER_DELIVER_MS 这个属性有值,就会把这个消息投递到 Topic 是 rmq_sys_wheel_timer 的队列中,queueId 是 0,同时会保存原始消息的 Topic、queueId、投递时间(TIMER_OUT_MS)。,TimerMessageStore 中有个定时任务 TimerEnqueueGetService 会从 rmq_sys_wheel_timer 这个 Topic 中读取消息,然后封装 TimerRequest 请求并放到队列 enqueuePutQueue。,2.3.2 绑定时间轮,RocketMQ 使用 TimerLog 来保存消息的原始数据绑定到时间轮上。首先看一下 TimerLog 保存的数据结构,如下图:,2023030613440374fdb91116c8b59a731281548a0484ca00db31370,参考下面代码:,TimerEnqueuePutService 这个定时任务从上面的 enqueuePutQueue(2.3.1节) 取出 TimerRequest 然后封装成  TimerLog。,那时间轮是怎么跟 TimerLog 关联起来的呢?RocketMQ 使用 TimerWheel 来描述时间轮,TimerWheel 中每一个时间节点是一个 Slot,Slot 保存了这个延时时间的 TimerLog 信息。数据结构如下图:,20230306134404d79ca7b264a6b2f99d4965b3fad88eb324a727963,参考下面代码:,这样时间轮跟 TimerLog 就关联起来了,见下图:,202303061344049855fe05732496e6ff8961191b94c09a3d7e8a286,如果时间轮的一个时间节点(Slot)上有一条新的消息到来,那只要新建一个 TimerLog,然后把它的指针指向该时间节点的最后一个 TimerLog,然后把 Slot 的 lastPos 属性指向新建的这个 TimerLog,如下图:,20230306134405e5c003771289b88f2f8587d7d038f552a93613842,从源码上看,RocketMQ 定义了一个 7 天的以秒为单位的时间轮。,2.3.3 时间轮转动,转动时间轮时,TimerDequeueGetService 这个定时任务从当前时间节点(Slot)对应的 TimerLog 中取出数据,封装成 TimerRequest 放入 dequeueGetQueue 队列。,2.3.4 CommitLog 中读取消息,定时任务 TimerDequeueGetMessageService 从队列 dequeueGetQueue 中拉取 TimerRequest 请求,然后根据 TimerRequest 中的参数去 CommitLog(MessageExt) 中查找消息,查出后把消息封装到 TimerRequest 中,然后把 TimerRequest 写入 dequeuePutQueue 这个队列。,2.3.5 写入原队列,定时任务 TimerDequeuePutMessageService 从 dequeuePutQueue 队列中获取消息,把消息转换成原始消息,投入到原始队列中,这样消费者就可以拉取到了。,RocketMQ 4.x 版本只支持延时消息,有一些局限性。而 RocketMQ 新版本引入了定时消息,弥补了延时消息的不足。定时消息的处理流程如下图:,2023030613451909f5c9f677a92f8ad17453b0f8a57590e1ece2905,可以看到,RocketMQ 的定时消息的实现还是有一定复杂度的,这里用到 5 个定时任务和 3 个队列来实现。,最后,对于定时时间的定义,客户端、Broker 和时间轮的默认最大延时时间定义是不同的,使用的时候需要注意。,

© 版权声明

相关文章