不知道大家有没有遇到这样的场景,就是一个项目中要消费多个kafka消息,不同的消费者消费指定kafka消息。遇到这种场景,我们可以通过kafka的提供的api进行配置即可。但很多时候我们会使用spring-kafka来简化开发,可是spring-kafka原生的配置项并没提供多个kafka配置,因此本文就来聊聊如何将spring-kafka进行改造,使之能支持多个kafka配置,1.通过 @ConfigurationProperties指定KafkaProperties前缀;,如果有多个就配置多个,形如:,2.配置消费者工厂,消费者工厂绑定对应的KafkaProperties;,3.配置消费者监听器工厂,并绑定指定消费者工厂以及消费者配置;,完整的配置示例如下:,那个 @Primary要指定一下,不然启动会因为存在多个KafkaProperties,而导致kafka的自动装配不懂要选哪个而报错。,1.在项目的pom引入spring-kafka GAV;,2.在项目的yml中配置如下内容;,3.配置生产者;,这个KafkaTemplate绑定的就是@Primary配置的kafkaProperties。,4.配置消费者监听,并绑定containerFactory;,通过指定containerFactory ,来消费指定的kafka消息;,5.测试;,发送消息,观察控制台输出;,会出现这样,是因为数据库已经有这条记录了,刚好验证一下重复消费,本文实现的核心其实就是通过注入多个kafkaProperties来实现多配置 ,不知道大家有没有发现,就是改造后的配置,配置消费者后,生产者仍然也要配置。因为如果不配置,走的就是kafkaProperties默认的配置信息,即为localhost。还有细心的朋友也许会发现我示例中的消费者监听使用的注解是@LybGeekKafkaListener,这个和 @KafkaListener实现的功能基本一致。因为本示例和之前的文章聊聊如何实现一个带幂等模板的kafka消费者监听是同份代码,就直接复用了。
© 版权声明
文章版权归作者所有,未经允许请勿转载。