RocketMQ 用法详解,你学会了吗?

网站建设4年前发布
31 0 0

大家好,我是指北君。,20230306103620446bda042eb76ce5194272517700ab3d1d015a600,消息中间件是我们工作中使用最频繁的一类中间件,它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。今天,指北君就来详细讲讲RocketMQ生产者和消费者在使用时的一些注意事项。,1)消息大小,建议消息大小不要超过512K。,2)异步发送,默认的发送为同步发送,send方法会一直阻塞,等待broker端的响应。如果你关注性能问题,可以通过send(msg, callback)来发起异步调用。,3)生产者组,正常情况下生产者组是没有作用的,但是在发送事务消息时,如果producer中途意外宕机,broker会主动回调producer group 内的任意一台机器来确认事务的状态。(目前开源版本还不支持事务消息)。,4)线程安全问题,生产者实例是线程安全的,在应用中只需要实例化一次即可。,5)性能问题,如果你希望在一个jvm进程内使用多个producer实例来提高发送能,我们建议:,使用异步发送,并且producer实例只需要3 ~ 5个即可 对每一个producer 调用 setInstanceName,区别不同的生产者。,6)发送超时时间,当客户端向broker发送请求超时时,客户端会抛出 RemotingTimeoutException,默认的超时时间是3秒。通过调用send(msg, timeout) 可以设置超时时间。超时时间建议不要设置过小,因为 broker 可能需要时间刷盘或向 slave 同步数据。,7)对于同一个应用最好只使用一个Topic,消息的子类型可以使用 tags 来标识,tags 可以由应用自由设置。当发送的消息设置了 tags 时,消费方在订阅消息时可以使用 tags 在 broker 做消息过滤。注意这里的命名虽然是复数,但是一条消息只能有一个tag。,8)消息在业务层面的唯一标识可以设置到 keys 字段,方便根据 keys 来定位消息。broker 会为每个消息创建索引(哈希索引),应用可以通过topic 、key 查询这条消息的内容(MessageExt),以及消息被谁消费(MessageTrack,精确到consumer group)。由于是哈希索引,请尽量保证key 的唯一,这样可以避免潜在的哈希冲突。,9)消息发送不管是成功还是失败都要打印消息日志,日志内容务必包含 sendResult 和 key 字段。,10)对于消息不可丢失的应用,务必要有消息重发机制。例如如果消息发送失败,可以将消息存储到数据库,然后通过定时程序或者人工的方式触发重发。,11)调用send 同步发送消息时,假定此时设置了 isWaitStoreMsgOK=true(default is true),只要不抛出异常就代表发送成功,但当 isWaitStoreMsgOK = false 时,发送永远返回 SEND_OK。但是对于发送“成功”会有多个状态,在 SendStatus 中定义如下:,FLUSH_DISK_TIMEOUT,如果 broker 设置的 FlushDiskType = SYNC_FLUSH,当 broker 的在刷盘超时时(MessageStoreConfig.syncFlushTimeout,默认5秒)会返回该状态。此时消息任然保存在内存中,只有broker 宕机时消息才会丢失。,FLUSH_SLAVE_TIMEOU,如果 broker 的 role 是 SYNC_MASTER,当 slave 同步数据的时间超过了 MessageStoreConfig.syncFlushTimeout (默认5秒) 时会返回此状态。此时只有主从都宕机,并且主也没有刷盘时,消息才会丢失。,SLAVE_NOT_AVAILABLE,如果 broker 的 role 是 SYNC_MASTER,并且此时 slave 不可用时会返回该状态。,SEND_OK,发送成功。为了保证消息不丢失还需要配置 SYNC_MASTER or SYNC_FLUSH。,12)消息重复,当发送消息时返回 FLUSH_DISK_TIMEOUT/FLUSH_SLAVE_TIMEOUT,若非常不幸的 broker 也宕机了,消息将会丢失。此时如果什么都不做,消息可能会丢失,如果重发消息,消息可能会出现重复。,通常我们建议发送端重发消息,由消费方来保证消息消费的幂等性。,Producer 的 send 方法本生支持内部重试,重试逻辑如下:,至多重试3次 如果发送失败,则轮转到下一个broker 这个方法的总耗时时间不超过 sendMsgTimeout,默认3秒 所以发送消息已经产生超时异常的话就不会再重试。以上策略仍不能保证消息发送一定成功,为保证消息发送一定成功,建议应用这么做:如果调用 send 同步发送失败,则尝试将消息存储到db,由后台线程定时重试,保证消息一定到达 Broker。,对于可靠性要求不高的应用,可以采用 oneway 的发送形式,oneway 形式不等待应答。,顺序消息分为分区有序和全局有序。,分区有序要求 producer 在send 时传入 MessageQueueSelector 的实现类,最终将某一类消息发送到同一队列。但是一旦发生通信异常、broker 重启等,由于队列总数发生变化,哈希取模后定位的队列会变化,会产生短暂的顺序不一致。如果业务能容忍在集群异常情况下(如某个 broker 宕机或者重启)消息短暂的乱序,使用分区有序比较合适。,全局严格有序的消息即便在异常情况下也能保证消息的有序性,但是却牺牲了分布式的 failover 特性,即 broker 集群中只有要一台机器不可用,则整个集群都不可用,服务可用性会大大降低。,顺序消息的缺点:,发送顺序消息无法利用集群的 FailOver 特性 消费顺序消息的并行度依赖于队列数量 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消费堆积问题 遇到消费失败的消息,无法跳过,当前队列需要暂停 5.发送事务消息 目前暂不支持。,不同的消费者组可以独立消费相同的topic,这点类似于ActiveMQ的虚拟 topic. 另外对于相同的消费者组,需要确保组内的消费者订阅消息的规则是一致的!,MQ 里的一个Consumer Group 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个 Consumer Group 下通常会挂载多个 Consumer 实例。订阅关系一致指的是同一个 Consumer Group 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。,由于 MQ 的订阅关系主要由 Topic+Tag 共同组成,因此,保持订阅关系一致意味着同一个 Consumer Group 下所有的实例需在以下两方面均保持一致:,订阅的 Topic 必须一致;订阅的 Topic 中的 Tag 必须一致。,技术架构 > Consumer 最佳实践 > image2017-11-15 15:50:13.png,1)顺序消费 MessageListenerOrderly,顺序消费时消费者会锁定队列,以确保消息被顺序消费,但是这样也会造成一定的性能损耗。当消费出现异常的时候,建议不要抛出异常,而是返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,让消费暂停一会,暂停时间由 context.setSuspendCurrentQueueTimeMillis 方法指定。,2)并发消费,并发消费是推荐的消费方式,在此种模式下,消息将被并发的消费。消费出现异常时不建议抛出异常,只需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 即可。为了保证消息肯定被至少消费一次,消息将会被重发回 broker (topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置,通过 delayLevelWhenNextConsume 和 MessageStoreConfig.messageDelayLevel 设置)后,再次投递到这个 ConsumerGroup,而如果一直这样重复消费都持续失败到一定次数(默认是16次,DefaultMQPushConsumer.maxReconsumeTimes),就会投递到DLQ队列。应用可以监控死信队列来做人工干预。,3)返回状态,在并行消费时可以通过返回 RECONSUME_LATER 来告诉 Consumer 当前无法消费该消息,等延时一段时间再重新消费,但是此时消费不会停止,你可以继续消费其他消息。但在顺序消费时,因为要保证消费的顺序性,所以你不能跳过失败的消息,此时你可以通过返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 来告诉 Consumer 先暂停一会。,4)阻塞,不建议阻塞Listener,因为这会阻塞住线程池,同时也有可能造成消费者线程终止。,consumer 内部通过一个 ThreadPoolExecutor 来消费消息,可以通过 setConsumeThreadMin 和 setConsumeThreadMax 来改变线程池的大小。,当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。,如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:,CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息。,CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍。,CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 注意:这些配置只对全新的消费组有效,老的消费组都是按已经存储过的消费进度继续消费。,对于老消费组想跳过历史消息可以采用以下几种方法:,1)判断消息的发送时间,太老的消息直接返回 CONSUME_SUCCESS。,2)判断消息的 offset 和 MAX_OFFSET 的差距,如果落后太多,可以直接。返回 CONSUME_SUCCESS。,3)消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用命令 resetOffsetByTimeStamp,详见 ResetOffsetByTimeCommand.java。,由于 RocketMQ 无法避免消费重复,所以如果业务对消息重复非常敏感,务必在业务层面去重。,1)提高消费并行度,大部分消息消费行为都属于 IO 密集型业务,适当的提高并发度可以显著的改善消费的吞吐量。,2)批量方式消费,默认情况下 consumer 的 consumeMessageBatchMaxSize 为1,即一次只消费一个消息,如果应用可以批量消费消息,则可以很大程度上提高消费吞吐量。,3)跳过非重要消息,当消堆积严重时可以丢弃不重要的消息。,4)优化消息消费过程,建议在消费入口方法打印消息,方便后续排查问题,消费失败时也打印失败日志。,好了,RocketMQ生产者与消费者的使用事项就总结完毕了,相信大家对RocketMQ的使用应该会更有信心了。,

© 版权声明

相关文章