发布时间:2025-06-24 18:57:58 作者:北方职教升学中心 阅读量:724
本文将详细介绍 Kafka 在 Spring Boot 项目中的使用步骤及其背后的原理。consumerFactory
方法创建了 ConsumerFactory
,配置了 Kafka 连接、键与值的序列化类,序列化是为了把 Java 对象转化成能在 Kafka 网络传输的字节数组。
配置 Kafka 服务器 :在 application.properties
中配置 Kafka 连接信息。这里的 topic
就是消息的分类标签,不同主题可用于区分不同业务的数据。 sendMessage
方法借助注入的 Kafka 生产者,将给定的消息发送到指定的 Kafka 主题。
< dependencies> < dependency> < groupId> org.springframework.kafka</ groupId> < artifactId> spring-kafka</ artifactId> </ dependency> </ dependencies>
spring-kafka
是 Spring 官方提供的整合 Kafka 的库,它封装了 Kafka 客户端,让 Spring Boot 开发者可以用熟悉的 Spring 编程风格去和 Kafka 交互,免去直接处理复杂 Kafka 客户端 API 的麻烦。当 testTopic
中有新消息时,receiveMessage
方法就会被触发,在这里只是简单打印收到的消息。
定义 Kafka 消费者 配置类 :import org. apache. kafka. clients. consumer. ConsumerConfig ; import org. apache. kafka. common. serialization. StringDeserializer ; import org. springframework. context. annotation. Bean ; import org. springframework. context. annotation. Configuration ; import org. springframework. kafka. annotation. EnableKafka ; import org. springframework. kafka. config. ConcurrentKafkaListenerContainerFactory ; import org. springframework. kafka. core. ConsumerFactory ; import org. springframework. kafka. core. DefaultKafkaConsumerFactory ; import java. util. HashMap ; import java. util. Map ; @Configuration @EnableKafka public class KafkaConsumerConfig { @Bean public ConsumerFactory < String , String > consumerFactory ( ) { Map < String , Object > configProps = new HashMap < > ( ) ; configProps. put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , "localhost:9092" ) ; configProps. put ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer . class ) ; configProps. put ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer . class ) ; return new DefaultKafkaConsumerFactory < > ( configProps) ; } @Bean public ConcurrentKafkaListenerContainerFactory < String , String > kafkaListenerContainerFactory ( ) { ConcurrentKafkaListenerContainerFactory < String , String > factory = new ConcurrentKafkaListenerContainerFactory < > ( ) ; factory. setConsumerFactory ( consumerFactory ( ) ) ; return factory; } }
@Configuration
用于配置类,@EnableKafka
开启 Spring Boot 项目对 Kafka 的监听功能。键值的反序列化类,反序列化把从 Kafka 接收的字节数组变回 Java 对象。
生产者服务类 :import org. springframework. beans. factory. annotation. Autowired ; import org. springframework. kafka. core. KafkaProducer ; import org. springframework. stereotype. Service ; @Service public class KafkaMessageProducer { @Autowired private KafkaProducer < String , String > kafkaProducer; public void sendMessage ( String topic, String message) { kafkaProducer. send ( topic, message) ; } }
@Service
表明这是一个业务服务类。
测试集成 可以在 Spring Boot 的控制器或者测试类中注入生产者服务类来测试:
import org. springframework. beans. factory. annotation. Autowired ; import org. springframework. web. bind. annotation. GetMapping ; import org. springframework. web. bind. annotation. RestController ; @RestController public class TestController { @Autowired private KafkaMessageProducer kafkaMessageProducer; @GetMapping ( "/send" ) public String sendMessage ( ) { kafkaMessageProducer. sendMessage ( "testTopic" , "Hello, Kafka in Spring Boot!" ) ; return "Message sent" ; } }
运行 Spring Boot 项目,访问 /send
接口,生产者会将消息发送到 Kafka 主题,随后消费者便能接收到并处理,完成了 Kafka 在 Spring Boot 项目中的基础集成流程。
环境搭建 引入依赖 :在项目的 pom.xml
文件里添加 Kafka 相关的依赖。@KafkaListener
注解指定监听的主题是 testTopic
,所属消费组是 testGroup
。kafkaListenerContainerFactory
方法则构建了一个容器工厂,用来创建管理 Kafka 消费者的容器。kafkaProducer
方法利用这个工厂创建出实际的 Kafka 生产者实例。spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.bootstrap-servers
用来指定 Kafka 集群的初始连接点,也就是告诉 Spring Boot 项目该去哪里找到 Kafka 服务器。
定义 Kafka 生产者 配置类创建 :import org. apache. kafka. clients. producer. ProducerConfig ; import org. apache. kafka. common. serialization. StringSerializer ; import org. springframework. context. annotation. Bean ; import org. springframework. context. annotation. Configuration ; import org. springframework. kafka. core. DefaultKafkaProducerFactory ; import org. springframework. kafka. core. KafkaProducerFactory ; import org. springframework. kafka. core. ProducerFactory ; import java. util. HashMap ; import java. util. Map ; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory < String , String > producerFactory ( ) { Map < String , Object > configProps = new HashMap < > ( ) ; configProps. put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , "localhost:9092" ) ; configProps. put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , StringSerializer . class ) ; configProps. put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer . class ) ; return new DefaultKafkaProducerFactory < > ( configProps) ; } @Bean public org. springframework. kafka. core. KafkaProducer< String , String > kafkaProducer ( ) { return new org. springframework. kafka. core. KafkaProducer< > ( producerFactory ( ) ) ; } }
@Configuration
标记这是一个配置类。Spring Boot 则以简化配置、producerFactory
方法构建了一个 ProducerFactory
,它是创建 Kafka 生产者的工厂,里面配置了 Kafka 服务器地址、这里配置为本地的 localhost:9092
,如果是集群环境,可添加多个地址,用逗号分隔,方便后续客户端发现集群中的所有 Broker。
Kafka 在 Spring Boot 项目中的实战指南 引言 随着大数据与微服务架构的盛行,Kafka 作为一款高性能的分布式流处理平台,在企业级项目里愈发举足轻重。快速开发著称,将二者结合,能高效搭建起具备强大数据处理能力的应用程序。
消费者服务类 :import org. springframework. kafka. annotation. KafkaListener ; import org. springframework. stereotype. Component ; @Component public class KafkaMessageConsumer { @KafkaListener ( topics = "testTopic" , groupId = "testGroup" ) public void receiveMessage ( String message) { System . out. println ( "Received message: " + message) ; } }
@Component
标识这是一个 Spring 组件。