,今天我们就来带大家如何玩转MQ的消息。,消息中间件,英文Message Queue,简称MQ。它没有标准定义,一般认为:消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。,高效: 对于消息的处理处理速度快,RocketMQ可以达到单机10万+的并发。,可靠: 一般消息中间件都会有消息持久化机制和其他的机制确保消息不丢失。,异步: 指发送完一个请求,不需要等待返回,随时可以再发送下一个请求,既不需要等待。,消息中间件不生产消息,只是消息的搬运工。,
,首先Message包含的内容主要有几个方面组成:id(MQ自动生成)、Topic、tag、proerties、内容。,消息的发送分为:,普通消息的发送方式主要有三种:发送同步消息、发送异步消息、单向发送。,我们可以先使用 RocketMQ 提供的原生客户端的API,在 SpringBoot、SpringCloudStream 也进行了集成,但本质上这些也是基于原生API的封装,所以我们只需要掌握原生API的时候,其他的也就无师自通了。,想要使用 RocketMQ中的API,就需要先导入对应的客户端依赖。,发送同步消息是说消息发送方发出数据后,同步等待,一直等收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。,流程如下所示:,
,响应结果如下所示:,
,
,在上面代表的是四个queue,而maxOffset代表我们发送消息的数量,之前发送过消息,所以大家现在看到的数量是17、18...这种,当你在运行一次发送消息时,就会看到十条消息会分布在不同机器上。,
,异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。,流程如下:,
,发送成功报文:,
,我们在dashbord下看到已经成功拿到消息了。,
,这种方式不需要我们特别关心发送结果的场景,比如日志发送、单向发送特点是发送方只需要负责发送消息,不需要等待服务器回应且没有回调函数触发,发送请求不需要等待应答,只管发,这种放松方式过程耗时很短,一般在微妙级别。,流程如下:,
,返回报文:,
,这种发送方式,我们客户端不会感受到发送结果,发送完成之后,我们并不知道到底有没有发送成功,我们只能在 top status 中去查看。,
,普通消息的消费方式主要有三种:集群消费、广播消费。,集群消费方式下,一个分组(Group) 下的多个消费者共同消费队列消息,每一个消费者出来处理的消息不一样,一个Consumer Group 中的各个Consumer 实例分摊去消费消息,一条消息只会投递到一个Consumer Group 下的一个实例,如果一个Topic有三个队列,其中一个 Consumer Group 有三个实例,那么每个实例只会消费其中一个队列,集群消费模式是消费者默认的消费方式。,
,实例代码:,我们启动两个实例对象,分别为BalanceConsumer2和BalanceConsumer,我们再去生产者生产十条消息后,我们再去看consumer,分别均摊了这十条消息。,
,广播消费模式中消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。因为一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。每一个消费者下面的消费实例,都会去拿到我们Topic下的每一条消息,但是这种消费进度的保存,不会放在broker里面,而是持久化到我们的本地实例。,流程图如下:,
,具体代码:,我们先启动 BroadcastConsumer和BroadcastConsumer2,生产十条消息以后,我们会看到不管是哪个消费者,都会接收到十条消息,这个就是广播消费模式。,
,消息消费的权衡。,负载均衡模式: 消费端集群化部署,每条消息只需要被处理一次,由于消费进度在服务端维护,可靠性更高。,集群消费模式下,不能保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。每一条消息都只会被分发到一台机器上处理,如果需要被集群下的每一台机器都处理,只能使用广播模式。,广播模式: 每条消息都需要被相同逻辑的多台机器处理,消费进度在客户端维护,出现重复的概率稍大于集群模式。,广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此需要关注消费失败的情况,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息会被自动跳过,这一点是需要注意的地方,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。目前仅 Java 客户端支持广播模式,不支持顺序消息且服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。,顺序消息指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ 可以严格的保证消息有序,可以分为 分区有序 或者 全局有序。,生产消息时在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的 queue ( 分区队列);而消费消息的时候从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue ,消息都是有序的。,
,全局有序主要控制在于创建Topic指定只有一个队列,同步确保生产者与消费者都只有一个实例进行即可,
,在电商业务场景中,订单的流程是:创建、付款、推送、完成。 在加入 RocketMQ 后,一个订单会分别产生对于这个订单的创建、付款、推送、完成等消息,如果我们把所有消息全部送入到 RocketMQ 中的一个主题中,如何实现针对一个订单的消息顺序性呢!如下图:,
,要完成分区有序性,在生产者环节使用自定义的消息队列选择策略,确保订单号尾数相同的消息会被先后发送到同一个队列中(案例中主题有3个队列,生产环境中可设定成10个满足全部尾数的需求),然后再消费端开启负载均衡模式,最终确保一个消费者拿到的消息对于一个订单来说是有序的。,订单消费者:,消息生产者:,
,消息消费者:,
,我们可以看到消息按照顺序进行了消费。使用顺序消息:首先要保证消息是有序进入MQ的,消息放入MQ之前,对id等关键字进行取模,放入指定 messageQueue ,同时 consumer 消费消息失败时,不能返回 reconsume——later ,这样会导致乱序,所以应该返回 suspend_current_queue_a_moment ,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。,Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递(被消费者消费),而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。,
,消息生产和消费有时间窗口要求的场景下,比如在电商交易中超时未支付关闭订单的场景,在订单创建时向 RocketMQ 发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则取消订单、释放库存。如已完成支付则忽略。,Apache RocketMQ 目前只支持固定精度(MQ自己规定的时间段)的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,消息排序不可避免的产生巨大性能开销。(RocketMQ 的商业版本 Aliware MQ 提供了任意时刻的定时消息功能,Apache的 RocketMQ 并没有,阿里并没有开源),Apache RocketMQ 发送延时消息是设置在每一个消息体上的,在创建消息时设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。,RocketMQ 延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。默认支持18个等级的延迟消息,延时等级定义在 RocketMQ 服务端的 MessageStoreConfig 类中。,具体如下所示:,延时消息生产者:,延时消息消费者:,当我们生产消息后,查看消费者信息,延时10秒后,消息才发送完成后,之后进行了消息的消费。,
,批量消息发送: 能显著提高传递小消息的性能。限制是这些批量消息有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB。批量消息是一个 Collection集合,所以送入消息只要是集合就行。,批量接收消息: 能提高传递小消息的性能,同时与顺序消息配合的情况下,还能根据业务主键对顺序消息进行去重(是否可去重,需要业务来决定),减少消费者对消息的处理。,如果我们需要发送10万元素的数组,怎么快速发送完?这里可以使用批量发送,同时每一批控制在1M左右确保不超过消息大小限制。批量切分发送.,批量消息生产者:,批量消息消费者:,这样我们就实现了批量消息的发送,如果我们消息超过了,4M的时候,这个时候可以考虑消息的分割,具体代码如下:,效率的过滤主要分为两种:Tag过滤和SQL语法过滤。,在实际的开发应用中,对于一类消息尽可能使用一个Topic进行存储,但在消费时需要选择想要的消息,这时可以使用 RocketMQ 的消息过滤功能,具体实现是利用消息的Tag和Key。,Key :一般用于消息在业务层面的唯一标识。对发送的消息设置好 Key,根据这个 Key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。RocketMQ 会创建专门的索引文件,用来存储 Key与消息的映射,由于底层实现是 Hash 索引,应尽量使 Key唯一,避免潜在的哈希冲突。,Tag: 可以理解为是二级分类。以电商交易平台为例,订单消息和支付消息属于不同业务类型的消息,分别创建 OrderTopic 和PayTopic ,其中订单消息根据不同的商品品类以不同的 Tag 再进行细分,如手机类、家电类、男装类、女装类、化妆品类,最后它们都被各个不同的系统所接收。通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。,Key和Tag的主要差别是使用场景不同,Key主要用于通过命令行命令查询消息,而Tag用于在消息端的代码中,用来进行服务端消息过滤。,使用Tag过滤的方式是在消息生产时传入感兴趣的Tag标签,然后在消费端就可以根据Tag来选择您想要的消息。具体的操作是在创建Message的时候添加,一个Message只能有一个Tag。,使用案例:,消费者:,我们生成了 TagA\b\c 三条消息,但是消费者只想接收 TagA或B, 那么我们可以在消费者端进行消息过滤。,
,Tag过滤的形式非常简单。,|| 代表或 * 代表所有。,因此Tag过滤对于复杂的场景可能不能进行覆盖。在这种情况下,可以使用SQL表达式筛选消息。,SQL基本语法:,Sql过滤需要 Broker 开启这项功能,需要修改Broker.conf配置文件。加入enablePropertyFilter=true 然后重启Broker服务。,消息生产者,发送消息时加入消息属性,通过 putUserProperty 来设置消息的属性,生产者发送10条消息,,生产者:,消费者:,消费结果:按照Tag和SQL过滤消费3条消息。,第一个消息是TagA ,消息的属性(a)是3。,第一个消息是TagB ,消息的属性(a)是1。,第一个消息是TagA ,消息的属性(a)是0。,
© 版权声明
文章版权归作者所有,未经允许请勿转载。