发布时间:2025-06-24 18:57:58  作者:北方职教升学中心  阅读量:724


本文将详细介绍 Kafka 在 Spring Boot 项目中的使用步骤及其背后的原理。consumerFactory方法创建了 ConsumerFactory,配置了 Kafka 连接、键与值的序列化类,序列化是为了把 Java 对象转化成能在 Kafka 网络传输的字节数组。

  1. 配置 Kafka 服务器:在 application.properties中配置 Kafka 连接信息。这里的 topic就是消息的分类标签,不同主题可用于区分不同业务的数据。

sendMessage方法借助注入的 Kafka 生产者,将给定的消息发送到指定的 Kafka 主题。
<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>

spring-kafka是 Spring 官方提供的整合 Kafka 的库,它封装了 Kafka 客户端,让 Spring Boot 开发者可以用熟悉的 Spring 编程风格去和 Kafka 交互,免去直接处理复杂 Kafka 客户端 API 的麻烦。当 testTopic中有新消息时,receiveMessage方法就会被触发,在这里只是简单打印收到的消息。

定义 Kafka 消费者

  1. 配置类
importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.annotation.EnableKafka;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importjava.util.HashMap;importjava.util.Map;@Configuration@EnableKafkapublicclassKafkaConsumerConfig{@BeanpublicConsumerFactory<String,String>consumerFactory(){Map<String,Object>configProps =newHashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);returnnewDefaultKafkaConsumerFactory<>(configProps);}@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String>factory =newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());returnfactory;}}

@Configuration用于配置类,@EnableKafka开启 Spring Boot 项目对 Kafka 的监听功能。键值的反序列化类,反序列化把从 Kafka 接收的字节数组变回 Java 对象。

  1. 生产者服务类
importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaProducer;importorg.springframework.stereotype.Service;@ServicepublicclassKafkaMessageProducer{@AutowiredprivateKafkaProducer<String,String>kafkaProducer;publicvoidsendMessage(Stringtopic,Stringmessage){kafkaProducer.send(topic,message);}}

@Service表明这是一个业务服务类。

测试集成

可以在 Spring Boot 的控制器或者测试类中注入生产者服务类来测试:

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassTestController{@AutowiredprivateKafkaMessageProducerkafkaMessageProducer;@GetMapping("/send")publicStringsendMessage(){kafkaMessageProducer.sendMessage("testTopic","Hello, Kafka in Spring Boot!");return"Message sent";}}

运行 Spring Boot 项目,访问 /send接口,生产者会将消息发送到 Kafka 主题,随后消费者便能接收到并处理,完成了 Kafka 在 Spring Boot 项目中的基础集成流程。

在这里插入图片描述

环境搭建

  1. 引入依赖:在项目的 pom.xml文件里添加 Kafka 相关的依赖。@KafkaListener注解指定监听的主题是 testTopic,所属消费组是 testGroupkafkaListenerContainerFactory方法则构建了一个容器工厂,用来创建管理 Kafka 消费者的容器。kafkaProducer方法利用这个工厂创建出实际的 Kafka 生产者实例。
spring.kafka.bootstrap-servers=localhost:9092

spring.kafka.bootstrap-servers用来指定 Kafka 集群的初始连接点,也就是告诉 Spring Boot 项目该去哪里找到 Kafka 服务器。

定义 Kafka 生产者

  1. 配置类创建
importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importorg.springframework.kafka.core.KafkaProducerFactory;importorg.springframework.kafka.core.ProducerFactory;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassKafkaProducerConfig{@BeanpublicProducerFactory<String,String>producerFactory(){Map<String,Object>configProps =newHashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);returnnewDefaultKafkaProducerFactory<>(configProps);}@Beanpublicorg.springframework.kafka.core.KafkaProducer<String,String>kafkaProducer(){returnneworg.springframework.kafka.core.KafkaProducer<>(producerFactory());}}

@Configuration标记这是一个配置类。Spring Boot 则以简化配置、producerFactory方法构建了一个 ProducerFactory,它是创建 Kafka 生产者的工厂,里面配置了 Kafka 服务器地址、这里配置为本地的 localhost:9092,如果是集群环境,可添加多个地址,用逗号分隔,方便后续客户端发现集群中的所有 Broker。

Kafka 在 Spring Boot 项目中的实战指南

引言

随着大数据与微服务架构的盛行,Kafka 作为一款高性能的分布式流处理平台,在企业级项目里愈发举足轻重。快速开发著称,将二者结合,能高效搭建起具备强大数据处理能力的应用程序。

  1. 消费者服务类
importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;@ComponentpublicclassKafkaMessageConsumer{@KafkaListener(topics ="testTopic",groupId ="testGroup")publicvoidreceiveMessage(Stringmessage){System.out.println("Received message: "+message);}}

@Component标识这是一个 Spring 组件。