1.消息中间件简介 消息中间件(Message Middleware)是一种在分布式系统中用于解耦不同服务或组件的软件,它通过异步消息传递的方式来实现服务之间的通信 。
send(String topic, K key, V value)
: 发送一个键值对到指定主题。副本分为领导者副本(Leader)和跟随者副本(Follower),生产者和消费者只与领导者副本交互,跟随着副本只是被动跟随。send(String topic, K key, V value, Map<String, Object> headers)
: 发送一个键值对到指定主题,并附带消息头。5.Kafka消息队列的使用 SpringBoot整合Demo 创建两个Maven项目: 一个是生产者KafkaProducerDemo
,用于发布消息,另一个是消费者KafkaConsumerDemo
,用于接受消息,两项目目录结构如下:
引入依赖: 两个项目都在pom.xml
中添加Spring Kafka的依赖,整个pom.xml如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.thkl</groupId> <artifactId>KafkaDemo</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.5.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.3.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.3</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.10</version> </dependency> </dependencies> <build> </build></project>
配置Kafka: 两个项目都在application.yml
中配置Kafka的属性,注意端口号别一样
server: # 端口号 port: 8080spring: kafka: # Kafka服务器的地址和端口 bootstrap-servers: localhost:9092 consumer: # 消费者组的ID group-id: thkl-group
在KafkaProducerDemo中创建Kafka生产者:
@Service @AllArgsConstructor public class KafkaProducerService { private KafkaTemplate kafkaTemplate; public void sendMessage ( String message) { kafkaTemplate. send ( "thkl-topic" , message) ; } }
@RestController @AllArgsConstructor public class KafkaProducerController { private KafkaProducerService kafkaProducerService; @GetMapping ( "/send" ) public String sendMessage ( String message) { kafkaProducerService. sendMessage ( message) ; return "Message sent successfully" ; } }
在KafkaConsumerDemo中创建Kafka消费者:
@Servicepublic class KafkaConsumerService { // 使用@KafkaListener注解来创建Kafka消费者 @KafkaListener(topics = "thkl-topic", groupId = "thkl-group") public void receiveMessage(String message) { System.out.println("Received message: " + message); }}
启动类: 两项目都添加
@SpringBootApplicationpublic class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); }}
启动两个项目,端口号不可以一样,不然启动不起来
在KafkaProducerDemo
发布消息
就可以在KafkaConsumerDemo
接收到消息了
KafkaTemplate 主要API KafkaTemplate
是Spring Framework提供的一个用于简化Kafka消息发送的抽象类。
send(String topic, K key, V value, Map<String, Object> headers)
: 发送一个键值对到指定主题,并附带消息头。send(String topic, K key, V value)
: 发送一个键值对到指定主题。发送消息(可选键和消息头) send(String topic, Object message, String key, Map<String, Object> headers)
: 发送一个对象到指定主题,并指定消息的键和消息头。常见使用场景:
应用解耦 :在不同的服务或应用之间传递消息,降低它们之间的直接依赖;事件驱动架构 :在事件驱动的系统中,消息中间件作为事件总线,传递事件消息;分布式系统 :在分布式环境中,消息中间件用于服务间的通信;大数据处理 :在数据分析和处理系统中,消息中间件用于收集和分发大量数据;微服务架构 :微服务之间通过消息中间件进行通信,实现服务的独立性和动态扩展。 发送消息(消息头) send(String topic, Object message, Map<String, Object> headers)
: 发送一个对象到指定主题,并附带消息头。用户名和密码替换为你自己的数据库信息。 发送消息(键和消息头) send(String topic, Object message, String key, Map<String, Object> headers)
: 发送一个对象到指定主题,并指定消息的键和消息头。 注: 这些方法的具体实现可能会根据Kafka客户端版本和Spring Kafka版本有所不同。
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
: 设置环境变量,指定offsets topic的副本因子。可持久化、发送对象 send(String topic, Object message)
: 发送一个对象到指定主题。发布/订阅模型:
主题(Topics) :在Kafka中,消息被发布到称为“主题”的类别中。发送消息和对象等。