积分服务扛不住的情况
发布时间:2025-06-24 19:45:51 作者:北方职教升学中心 阅读量:581
积分服务可能扛不住
但如果采用异步调用的方式,就很少会出现交易服务、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类 QueueBuilder 构建
- Exchange:用于声明交换机,可以用工厂类 ExchangeBuilder 构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类 BindingBuilder 构建
14.1.2 快速上手
我们创建一个 Fanout 类型的交换机,并且创建队列与这个交换机绑定
在 consumer 服务中编写 FanoutConfiguration 配置类
importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfiguration{@BeanpublicFanoutExchangefanoutExchange3(){returnExchangeBuilder.fanoutExchange("blog.fanout3").build();}@BeanpublicFanoutExchangefanoutExchange4(){returnnewFanoutExchange("blog.fanout4");}@BeanpublicQueuefanoutQueue3(){returnnewQueue("fanout.queue3");}@BeanpublicQueuefanoutQueue4(){returnQueueBuilder.durable("fanout.queue4").build();}@BeanpublicBindingfanoutBinding3(QueuefanoutQueue3,FanoutExchangefanoutExchange3){returnBindingBuilder.bind(fanoutQueue3).to(fanoutExchange3);}@BeanpublicBindingfanoutBinding4(){returnBindingBuilder.bind(fanoutQueue4()).to(fanoutExchange4());}}
启动 consumer 的启动类之后,队列、测试环境、而且,不同的环境(开发环境、通知服务、在高并发的情况下,用户每成功支付一次,支付服务只需要发送一条消息给消息代理,这些像洪水一般的消息都会被消息代理拦住
消息代理会保存这些消息,后续服务可以根据自己的处理速度,从消息代理中一条一条地取出信息并处理。转发消息的人
改为异步调用之后,支付服务不再同步调用与支付业务关联度低的服务,而是发送消息通知于支付业务关联度低的服务
5. 异步调用的优点和缺点
5.1 异步调用的优点
5.1.1 解除耦合,拓展性强
即使以后有新业务拓充,支付服务只需要发送一条消息给消息代理,让消息代理通知新业务,拓展性强
5.1.2 无需等待,性能好
支付服务完成之后只需要发送消息给消息代理,让消息代理通知其它服务
5.1.3 故障隔离
即使交易服务出现了故障,也不会影响到支付服务
5.1.4 削峰填谷
假如支付服务正在面临着很大的压力,流量时高时低(呈波浪形)。积分服务就可以开始执行相应的操作了
然而,通知服务不依赖于交易服务,积分服务也不依赖于通知服务
在成功扣减用户余额并成功更新支付状态之后,支付业务就已经完成了
所以说,支付服务完成了之后,只需要通知交易服务、积分服务都依赖于支付服务的结果
当支付服务成功扣减用户余额并成功更新支付状态之后,交易服务、接收方在处理完消息后,可能会在未来的某个时间点给出回应
异步通讯的特点是:
- 非阻塞:发送方在发送消息后可以立即继续其他工作,不会因为等待回应而被阻塞
- 解耦:发送方和接收方在时间上解耦,可以独立处理各自的任务
- 灵活:异步通讯可以处理更复杂的通信模式,如消息队列、通知服务、生产环境)可能会有不同的队列和交换机,手动创建队列和交换机效率十分低下
接下来为大家介绍两种在 SpringBoot 项目中声明队列和交换机的方式
14.1 编程式声明
14.1.1 SpringAQMP提供的创建队列和交换机的类
SpringAMQP 提供了几个类,用来声明队列、由此可以看出,Direct 交换机的功能比 Fanout 交换机更强大
13.2.2 快速上手
我们做一个小案例来体验 Direct 交换机的效果,案例要求如下:
- 在 RabbitMQ 控制台中,声明队列 direct.queue1 和 direct.queue2
- 在 RabbitMQ 控制台中,声明交换机 blog.direct ,将上面创建的两个队列与其绑定
- 在 consumer 服务中,编写两个消费者方法,分别监听 direct.queue1 和 direct.queue2
- 在 publisher 服务中编写测试方法,利用不同的 RoutingKey 向 blog.direct 交换机发送消息
为 direct.queue1队列 和 direct.queue2 队列分别指定 bindingKey
在 consumer 服务的 RabbitMQListener 类中添加以下方法,分别监听 direct.queue1 和 direct.queue2 队列
@RabbitListener(queues ="direct.queue1")publicvoidlistenDirectQueue1(Stringmessage){System.out.println("消费者1 收到了 direct.queue1的消息:【"+message +"】");}@RabbitListener(queues ="direct.queue2")publicvoidlistenDirectQueue2(Stringmessage){System.err.println("消费者2 收到了 direct.queue2的消息...... :【"+message +"】");}
在 publisher 服务的 SpringAmqp 测试类中添加以下方法,向 blog.direct交换机发送消息
@TestvoidtestSendDirect(){StringexchangeName ="blog.direct";StringblueMessage ="蓝色通知,警报解除,哥斯拉放的是气球";rabbitTemplate.convertAndSend(exchangeName,"blue",blueMessage);StringredMessage ="红色警报,由于日本排放核污水,惊现哥斯拉!";rabbitTemplate.convertAndSend(exchangeName,"red",redMessage);StringyellowMessage ="黄色通知,哥斯拉来了,快跑!";rabbitTemplate.convertAndSend(exchangeName,"yellow",yellowMessage);}
13.3 Topic 交换机(推荐使用)
13.3.1 Topic 交换机的概念
Topic Exchange 与 Direct Exchange类似,区别在于 Topic Exchange 的 routingKey 可以是多个单词的列表(多个 routingKey 之间以
.
分割)Queue 与 Exchange 指定 bindingKey 时可以使用通配符
- #:代指 0 个或多个单词
- *:代指 1 个单词
- Topic 交换机能实现的功能 Direct 交换机也能实现,不过用 Topic 交换机实现起来更加方便
- 如果某条消息的 topic 符合多个 queue 的 bindingKey ,该条消息会发送给符合条件的所有 queue ,实现类似于 Fanout 交换机的效果
13.3.2 快速上手
我们做一个小案例来体验 Topic 交换机的效果,案例要求如下:
- 在 RabbitMQ 控制台中,声明队列 topic.queue1 和 topic.queue2
- 在 RabbitMQ 控制台中,声明交换机 blog.topic ,将两个队列与其绑定
- 在 consumer 服务中编写两个消费者方法,分别监听 topic.queue1 和 topic.queue2
- 在 publisher 服务中编写测试方法,利用不同的 routingKey 向 blog.topic 发送消息
为 topic.queue1 和 topic.queue2 队列分别指定 bindingKey
在 consumer 服务的 RabbitMQListener 类中添加以下方法,分别监听 direct.queue1 和 direct.queue2 队列
@RabbitListener(queues ="topic.queue1")publicvoidlistenTopicQueue1(Stringmessage){System.out.println("消费者1 收到了 topic.queue1的消息:【"+message +"】");}@RabbitListener(queues ="topic.queue2")publicvoidlistenTopicQueue2(Stringmessage){System.err.println("消费者2 收到了 topic.queue2的消息...... :【"+message +"】");}
在 publisher 服务的 SpringAmqp 测试类中添加以下方法,向 blog.direct交换机发送消息
@TestvoidtestSendTopic(){StringexchangeName ="blog.topic";StringweatherMessage ="今天天气挺不错,我的心情的挺好的";rabbitTemplate.convertAndSend(exchangeName,"china.weather",weatherMessage);StringnewsMessage ="蓝色通知,警报解除,哥斯拉放的是气球";rabbitTemplate.convertAndSend(exchangeName,"china.news",newsMessage);}
14. 在 SpringBoot 项目中声明队列和交换机的方式
我们之前创建队列和交换机都是在 RabbitMQ 的控制台页面中创建的,不仅十分繁琐,还有可能打错队列和交换机的名。这里的
rabbitmq-plugins
是一个卷的名称,而不是宿主机的路径 --name rabbitmq
: 指定容器的名称为rabbitmq
--hostname rabbitmq
: 设置容器的主机名为rabbitmq
-p 15672:15672
: 将宿主机的端口15672
映射到容器的端口15672
,这是RabbitMQ管理界面的默认端口-p 5672:5672
: 将宿主机的端口5672
映射到容器的端口5672
,这是RabbitMQ用于AMQP协议通信的默认端口-d
: 在后台运行容器(守护进程)rabbitmq:latest
: 使用最新的RabbitMQ官方镜像来创建容器- Publisher:消息发送者
- Consumer:消息的消费者
- Queue:消息队列,存储消息
- Exchange:交换机,负责路由消息
- VirtualHost:虚拟主机,用于数据隔离
- 交换机的 overview 页面没有折线图
- Queues 页面也没有与消息相关的信息
- 点击
channels
后出现Stats in management UI are disabled on this node
信息 - SpringBoot:3.0.2
- JDK:17.0.7
- 在 RabbitMQ 的控制台中创建名为 simple.queue 的队列(队列归属的 VirtualHost 为 /blog)
- 在 publisher 模块中,利用 SpringAMQP 直接向 simple.queue 队列发送消息
- 在 consumer 服务中,利用 SpringAMQP 编写消费者,监听 simple.queue 队列
- 在RabbitMQ的控制台创建一个队列,名为 work.queue
- 在 publisher 服务中定义测试方法,在 1 秒内产生 50 条消息,发送到work.queue
- 在 consumer 服务中定义两个消息监听者,都监听 work.queue 队列
- 消费者 1 每秒处理 50 条消息,消费者 2 每秒处理 5 条消息
- 接收 publisher 发送的消息
- 将消息按照规则路由到与交换机绑定的队列
- Fanout:广播
- Direct:定向
- Topic:话题
- 在 RabbitMQ 控制台中,声明队列 fanout.queue1 和 fanout.queue2
- 在 RabbitMQ 控制台中,声明交换机 blog.fanout,将两个队列与其绑定
- 在 consumer 服务中,编写两个消费者方法,分别监听 fanout.queue1 和 fanout.queue2 队列
- 在 publisher 服务中编写测试方法,向 blog.fanout 交换机发送消息
- 每一个 Queue 都与 Exchange 设置一个 bindingKey
- 发布者发送消息时,指定消息的 RoutingKey
- Exchange 将消息路由到 bindingKey 与消息 routingKey 一致的队列
- 在 RabbitMQ 控制台创建一个队列,名为 object.queue
- 编写单元测试,向该队列中直接发送一条消息,消息的内容为 Map
- 在控制台查看消息
- 使用 JDK 序列化有安全风险(如果序列化后的消息被恶意篡改,在反序列化的过程中可能会执行一些高危的代码)
- 经过 JDK 序列化的消息占用空间很大
- 经过 JDK 序列化的消息可读性很差
7.4 访问 RabbitMQ 的管理页面
接下来进入 RabbitMQ 的管理界面,在浏览器输入以下地址(将 IP 地址换成你的虚拟机的 IP 地址)
http://127.0.0.1:15672/
输入用户名和密码后进入到 RabbitMQ 的管理页面
7.5 可能遇到的问题
7.5.1 安全组和防火墙未开放端口
如果无法进入RabbitMQ的管理界面,记得先在安全组和防火墙中开放 15672 和 5672 端口
在 Ubuntu 中开放15672 和 5672 端口
sudoufw allow 15672sudoufw allow 5672sudoufw reload
在 CentOS 中开放15672 和 5672 端口
sudofirewall-cmd --zone=public --add-port=15672/tcp --permanentsudofirewall-cmd --zone=public --add-port=5672/tcp --permanentsudofirewall-cmd --reload
7.5.2 RabbitMQ 没有安装 Web 插件
如果开放防火墙端口后还是无法访问 RabbitMQ 的管理界面,可能是安装 RabbitMQ 没有安装 Web 插件
以下是 RabbitMQ 安装 Web 插件的方法
第一步:进入容器内部
sudodockerexec-itrabbitmq bash
第二步:安装 Web 插件
rabbitmq-plugins enable rabbitmq_management
安装插件后退出容器内部
exit
8. RabbitMQ 的整体架构和核心概念
RabbitMQ 有几个核心概念:
RabbitMQ 的整体架构
9. RabbitMQ 快速入门
注意事项:交换机只能路由和转发消息,不能存储消息
9.1 新建队列
创建一个名为 hello.queue 的队列
9.2 绑定队列与交换机
我们将刚才新创建的 hello.queue 队列与 amq.fanout 交换机绑定(fanout意为扇出)
绑定成功后的界面
9.3 发送消息
我们在 amq.fanout 交换机中发送一条消息,消息的内容为 Hello, RabbitMQ!
发送消息后,查看交换机的总览信息
查看队列中的消息数
查看消息的具体内容
9.4 可能遇到的问题
如果你发现
需要先修改 RabbitMQ的 配置
第一步:进入容器内部
sudodockerexec-itrabbitmq bash
第二步:修改配置
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
第三步:重启容器
先退出容器内部
exit
再重启容器
sudodockerrestart rabbitmq
10. 数据隔离
10.1 新建用户
新建一个名为 CaiXuKun 的用户,密码为 T1rhFXMGXIOYCoyi ,角色指定为 admin
可以看到,新用户对任意一个 VirtualHost 都是没有访问权限的
用新用户的账号登录管理台,虽然能看到所有 VirtualHost 的信息,但是无法对任意一个 VirtualHost 进行操作
10.2 为新用户创建一个 VirtualHost
用新用户的账号登录管理台,创建一个名为 /blog 的 VirtualHost
10.3 测试不同 VirtualHost 之间是否有数据隔离
可以看到,不同的 VirtualHost 之间有不同的交换机
对某一个 VirtualHost 操作不会影响到另一个 VirtualHost
11. 在 SpringBoot 项目中集成 RabbitMQ
后端环境:
11.1 AMQP 和 SpringAMQP
SpringAMQP 的官网:Spring AMQP
11.2 快速入门
新建一个 SpringBoot 项目,并创建 consumer 和 publisher 两个子模块,项目的整体结构如下
11.2.1 引入 Maven 依赖
在父工程中引入 SpringAMQP 的依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
11.2.2 编写与 RabbitMQ 有关的配置信息
在 consumer 和 publisher 模块的 application.yml 中分别编写与 RabbitMQ 有关的配置信息
spring:rabbitmq:host:127.0.0.1 port:5672virtual-host:/blog username:CaiXuKun password:T1rhFXMGXIOYCoyi
11.3 完成一个简单的案例
案例要求如下:
11.3.1 创建队列
11.3.2 发送消息
在 publisher 模块中编写测试类,用户向队列发送消息
importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTest(classes =PublisherApplication.class)publicclassSpringAmqpTest{@AutowiredprivateRabbitTemplaterabbitTemplate;@TestvoidtestSendMessageToQueue(){StringqueueName ="simple.queue";Stringmsg ="Hello, SpringAMQP!";rabbitTemplate.convertAndSend(queueName,msg);}}
在 RabbitMQ 的控制台可以看到,消息成功发送
11.3.3 接收消息
SpringAMQP 提供了声明式的消息监听,我们只需要通过@RabbitListener
注解在方法上声明要监听的队列名称,将来 SpringAMQP 就会把消息传递给使用了@RabbitListener
注解的方法
importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassRabbitMQListener{@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueue(Stringmessage){System.out.println("消费者收到了 simple.queue 的消息:【"+message +"】");}}
启动 consumer 模块的启动类之后,就可以看到消息的内容
12. Work Queues 模型
12.1 Work Queues 的概念
Work Queues,简单地来说,就是让多个消费者绑定到一个队列,共同消费队列中的消息
虽然有多个消费者绑定同一个队列,但是队列中的某一条消息只会被一个消费者消费
我们实现一个小案例,模拟 Work Queues,实现一个队列绑定多个消费者
案例要求如下:
在 publisher服务的 SpringAmqp 测试类中添加以下方法,该方法可以在 1 秒内产生 50 条消息
@TestvoidtestWorkQueue()throwsInterruptedException{StringqueueName ="work.queue";for(inti =1;i <=50;i++){Stringmessage ="Hello, work queues_"+i;rabbitTemplate.convertAndSend(queueName,message);Thread.sleep(20);}}
在 consumer 服务的 RabbitMQListener 类中添加以下方法,监听 work.queue 队列
@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue1(Stringmessage){System.out.println("消费者1 收到了 work.queue的消息:【"+message +"】");}@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue2(Stringmessage){System.err.println("消费者2 收到了 work.queue的消息...... :【"+message +"】");}
12.2 Work Queues 模型的消息推送机制
如果有两个或两个以上的消费者监听同一个队列,默认情况下 RabbitMQ 会采用轮询的方法将消息分配给每个队列
但每个消费者的消费能力可能是不一样的,我们给两个消费者中的代码设置不同的休眠时间,模拟消费能力的不同
@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue1(Stringmessage)throwsInterruptedException{System.out.println("消费者1 收到了 work.queue的消息:【"+message +"】");Thread.sleep(20);}@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue2(Stringmessage)throwsInterruptedException{System.err.println("消费者2 收到了 work.queue的消息...... :【"+message +"】");Thread.sleep(100);}
经过测试可以发现,即使两个队列的消费能力不一样,默认情况下 RabbitMQ 还是会采用轮询的方法将消息分配给每个队列,也就是平均分配
但这不是我们想要的效果,我们想要的效果是消费能力强的消费者处理更多的消息,甚至能够帮助消费能力弱的消费者
怎么样才能达到这样的效果呢,只需要在配置文件中添加以下信息
spring:rabbitmq:listener:simple:prefetch:1
这个配置信息相当于告诉消费者要一条一条地从队列中取出消息,只有处理完一条消息才能取出下一条
这样一来,就可以充分利用每一台机器的性能,让消费能力强的消费者处理更多的消息,同时还可以避免消息在消费能力较弱的消费者上发生堆积的情况
13. 交换机
真正的生产环境都会经过交换机来发送消息,而不是直接发送到队列
交换机的作用:
交换机的类型有以下三种:
注意事项:交换机只能路由和转发消息,不能存储消息
13.1 Fanout 交换机
13.1.1 Fanout 交换机的概念
Fanout 交换机会将接收到的消息广播到每一个跟其绑定的 queue ,所以也叫广播模式
13.1.2 快速上手
我们做一个小案例来体验 Fanout 交换机的效果,案例要求如下:
声明交换机
在 consumer 服务的 RabbitMQListener 类中添加以下方法,分别监听 fanout.queue1 和 fanout.queue2 队列
@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(Stringmessage){System.out.println("消费者1 收到了 fanout.queue1的消息:【"+message +"】");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(Stringmessage){System.err.println("消费者2 收到了 fanout.queue2的消息...... :【"+message +"】");}
在 publisher 服务的 SpringAmqp 测试类中添加以下方法,向 blog.fanout 交换机发送消息
@TestvoidtestSendFanout(){StringexchangeName ="blog.fanout";Stringmessage ="Hello, fanout exchange";rabbitTemplate.convertAndSend(exchangeName,null,message);}
13.2 Direct 交换机
13.2.1 Direct 交换机的概念
Direct 交换机会将接收到的消息根据规则路由到指定的队列,被称为定向路由
需要注意的是:同一个队列可以绑定多个 bindingKey ,如果有多个队列绑定了同一个 bindingKey ,就可以实现类似于 Fanout 交换机的效果。通知服务、 direct.queue1 队列和 direct.queue2 队列
再改造 consumer 服务的 RabbitMQListener 类的监听 direct.queue1 队列和 direct.queue2 队列的方法
@RabbitListener(bindings =@QueueBinding(value =@Queue(name ="direct.queue1"),exchange =@Exchange(name ="blog.direct",type =ExchangeTypes.DIRECT),key ={"red","blue"}))publicvoidlistenDirectQueue1(Stringmessage){System.out.println("消费者1 收到了 direct.queue1的消息:【"+message +"】");}@RabbitListener(bindings =@QueueBinding(value =@Queue(name ="direct.queue2"),exchange =@Exchange(name ="blog.direct",type =ExchangeTypes.DIRECT),key ={"red","yellow"}))publicvoidlistenDirectQueue2(Stringmessage){System.out.println("消费者2 收到了 direct.queue2的消息:【"+message +"】");}
15. 消息转换器
在了解消息转换器之前,我们先来做一个小案例,案例的内容是利用 SpringAMQP 发送一条消息,消息的内容为一个 Java 对象
案例要求如下:
在 publisher 服务的 SpringAmqpTests 测试类中新增 testSendObject 方法
@TestvoidtestSendObject(){Map<String,Object>hashMap =newHashMap<>(2);hashMap.put("name","Tom");hashMap.put("age",21);rabbitTemplate.convertAndSend("object.queue",hashMap);}
成功发送消息后,我们在 RabbitMQ 的控制台查看消息的具体内容
可以发现,消息的内容类型为 application/x-java-serialized-object
,并且消息的内容也变成一堆乱码
我们本来是想发送一个简单的仅含有姓名和年龄两个字段的简短信息,但是消息却变成了一堆乱码,不仅可读性大大下降,而且占用的空间也大大地增加了,这显然不是我们想要的效果
15.1 默认的消息转换器
Spring 处理对象类型的消息是由 org.springframework.amap.support.converter.MessageConverter
接口来处理的,该接口默认实现是 SimpleMessageConverter
SimpleMessageConverter
类是基于 JDK 提供的 ObjectOutputStream
来类完成序列化的,这种序列化方式存在以下问题:
15.2 自定义消息转换器
一般建议采用 JSON 序列化代替默认的 JDK 序列化
要使用 JSON 序列化,需要先引入 jackson 依赖(在项目的父工程中引入)
如果是 Web 项目,无需引入该依赖,因为 spring-boot-starter-web 依赖中已包含该依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>
接着在 consumer 服务和 publisher 服务中配置 MessageConverter
@BeanpublicMessageConverterjacksonMessageConvertor(){returnnewJackson2JsonMessageConverter();}
再次发送对象类型的消息,可以看到消息已经成功转换成 JSON 类型的字符串
我们也可以在 consumer 服务的 RabbitMQListener 类中添加对 object.queue 队列的监听(用什么类型发,就用什么类型接收)
@RabbitListener(queues ="object.queue")publicvoidlistenObjectQueue(Map<String,Object>hashMap){System.out.println("消费者收到了 object.queue的消息:【"+hashMap +"】");}
启动 consumer 服务的启动类之后,在控制台中可以看到被转换成 JSON 格式的消息
在控制台中会看到报错信息,因为之前有一条用 JDK 序列化的消息,现在改用了 jackson 序列化,序列化和反序列化用的序列化器不一样,肯定会报错
报错后,消息就没了,出现了消息丢失的现象,该怎么解决呢,可以参考我的另一篇博文:RabbitMQ 高级篇