,RocketMQ集群架构图,如图所示,RocketMQ集群由4部分组成:Producer会根据业务需要发送消息;Broker负责接收、存储和分发消息;Consumer负责按需消费消息;Name Server负责通过长连接、Topic路由、心跳检测等手段保证集群的高可用。,其中的Broker和Name Server都是由运维和架构部同学负责管理,业务开发接触较少。业务开发同学接触比较多的就是Producer、Consumer部分,这两部分都有RocketMQ提供的Java客户端,但需要嵌入到具体的业务代码里,我们的组件就是针对Java客户端易用性的扩展。,Consumer一般用法如下:,Producer的一般用法:,从代码中我们看到两个组件的生命周期都是3个阶段:创建&初始化&启动、业务处理、销毁,且在这3个阶段里其实只有业务处理阶段是与具体的业务开发紧密相关的。当我们要定义多个Topic、Group、Tag去发送或者消费消息的时候,发现其他2个阶段都属于重复代码,且在初始化阶段中的参数有些许不同,但还是要在业务开发中进行重复开发或配置。,由于现有RcoketMQ客户端存在上述问题,我希望有一种新组件,能够在业务开发中,将这两个客户端代码中重复出现的2个阶段抽离出来,并允许业务侧保留个性化定制的部分。,于是想到可以基于AOP的思想,使用注解来定义Consumer和Producer实例的创建和初始化参数,利用Spring容器加载和管理实例化后的bean对象的生命周期,来设计和使用该组件,这样即满足了业务定制化的诉求,又达到了减少重复开发的目的。,1,新的Consumer组件(HunterConsumer)的一般用法:,即,将@HunterConsumer用在被注解的方法上,将注解上的topic、tags、group等作为个性化的初始化参数,在消费消息时将消息内容通过参数名称和类型确定,作为参数传递给消费者客户端,再根据返回值true/false决定消费成功或者失败。,2,新的Producer组件(HunterProducer)的一般用法:,即,将@HunterProducer用在被注解的字段上,将注解上的topic、tags、group等作为个性化的初始化参数,创建IHunterProducer客户端实例,提供发送消息的接口。,如上述代码所示,对于Consumer、Producer客户端生命周期中的创建&初始化&启动、销毁2个阶段,由工程在首次引入组件时,一次性简单配置即可,整个业务开发过程中只需要使用@HunterConsumer和@HunterProducer两个注解就行了。,HunterConsumer组件依托于Spring容器来管理bean的生命周期,所以在业务侧在引入该组件时要使用Spring的ApplicationContext来进行初始化,具体如下:,HunterConsumerStarter.startConsumers内部在初始化时总体会进行如下4步处理:读取Spring容器中的全部配置、加载Spring容器中的全部HunterConsumer注解、启动前检查、根据配置创建并启动Consmer客户端,从Spring容器中加载组件所需要的基本配置信息,如:是否启动HunterConsumer组件、name server地址、默认是否打印详细日志、默认核心线程池大小 核心代码如下:,读取Spring中的配置信息:,扫描Spring容器中的全部bean获取各个方法上的@HunterConsmuer注解,并检查注解使用方式是否正确,组件为防止一些错误用法,将做限定检查(具体检查代码不再展示):,1,同一个group下不能有两个不同的topic,2,同一个group和topic下不能出现在多个@HunterConsumer注解中(防止在本地直连线上mq服务器时修改线上group的订阅tag,进而导致mq消息丢失),3,广播消费模式下,tags必须是空或者{}或者{*},4,无法获取到被@HunterConsumer注解的方法参数名称,第四点需要重点说明下,这里@HunterConsumer使用到了Java8开始有的一个特性,即可以获取到源码中方法的参数名,所以引入@HunterConsumer组件的工程需要保证在编译是使用-parameters参数。 以Maven为例,需要引入配置:,创建并启动客户端:,创建并启动客户端的具体过程:,消费并监控失败信息:,RocketMQ客户端要在Spring容器关闭时销毁,业务侧无感知。,HunterProducer组件也是依托于Spring容器来管理bean的生命周期,所以在业务侧在引入该组件时也需要使用Spring的ApplicationContext来进行初始化,具体如下:,HunterProducerStarter.startProducers内部在初始化时总体步骤与HunterConsumer组件相同:读取Spring容器中的全部配置、加载Spring容器中的全部HunterProducer注解、启动前检查、根据配置创建并启动Producer客户端。 以下四个阶段不再单独介绍:,1,读取Spring容器中的全部配置阶段与HunterConsumer完全相同,2,加载Spring容器中的全部HunterProducer注解阶段是通过扫描Spring容器中bean的全部字段上是否有@HunterProducer注解来实现的,可以参照@HunterConsmuer扫描bean的全部方法,3,启动前检查阶段是检查注解参数是否为空、是否重复等,较为简单,4,销毁阶段与HunterConsumer完全相同,HunterProducer与HunterConsmuer的最大的不同点在于根据配置创建并启动Producer客户端阶段,这一阶段要给调用方提供操作接口,而发送延迟消息与普通消息在操作接口上又有略微区别,所以这里提取出来IProducer接口作为公共的父接口来定义公共方法,IHunterProduer和IHunterDelayProducer作为子接口,来提供个性化方法,具体如下:,当我们要发送不同的消息时可以使用不同字段类型来获取具体的客户端实例:,组件内部再通过实现IHunterProduer和IHunterDelayProducer接口的实例,转调RocketMQ原生客户端DefaultMQProducer来做消息发送。,HunterConsumer和HunterProducer组件主要是利用AOP思想实现,它使开发人员在编写业务逻辑时可以专心于核心业务,而不用过多的关注于一些非业务的重复代码,这不但提高了开发效率,而且增强了代码的可维护性。
© 版权声明
文章版权归作者所有,未经允许请勿转载。