最近在新项目开发中遇到一个有趣的问题,如何在 SpringBoot 项目中控制 RocketMQ 消费线程数量。如何设置单个 topic 消费线程的最小数量和最大数量,用来区分不同 topic 吞吐量不同。,我们先介绍一下 RocketMQ 消息监听再来说明 RocketMQ 消费线程。,设置消费者组为 my_consumer_group,监听 TopicTest 队列,并使用并发消息监听器MessageListenerConcurrently,接口:org.apache.rocketmq.client.consumer.listener.MessageListener,有两个子接口:,作用:consumer并发消费消息的监听器,比如,在 quick start 中,就是使用的并发消费消息监听器:,方法返回值,是个枚举:,画外音:,当前,我们在具体开发中,肯定不会直接使用这种方式来写consumer。,常用的Consumer实现是:基于 推 的consumer:DefaultMQPushConsumer,作用:consumer顺序消费消息的监听器,作用:基于 推 的consumer消费者,org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#registerMessageListener,当使用这个方法注册消息监听器时,实际上会把这个病发消息监听器设置到 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#messageListenerInner属性中。,可选有两种:,并发消费的service,顺序消费的service,当consumer在启动的时,会使用MessageListener具体实现类型进行判断:,MessageListener 就有并发和顺序两种,所以service也有两种。,如果使用的是并发消费的话,使用 ConsumeMessageConcurrentlyService :,在实例化的时候,会创建一个线程池:,consumer消费线程池参数:,注意:因为线程池使用的是无界队列,那么设置的最大线程数,其实没有什么意义。,上面我们已经知道了,设置线程池的最大线程数是没什么用的。,那我们其实可以设置线程池的最小线程数,来修改consumer消费消息时的线程池大小。,注意:consumeThreadMin 如果大于64,则也需要设置 consumeThreadMax 参数,因为有个校验:,-修改线程池线程数-SpringBoot版,如果consumer是使用spring boot进行集成的,则可以这样设置消费者线程数:,
© 版权声明
文章版权归作者所有,未经允许请勿转载。