返还金币业务?
发布时间:2025-06-24 18:59:38 作者:北方职教升学中心 阅读量:590
或是将消息丢弃。如图:
解读:
同步通讯:就如同打视频电话,双方的交互都是实时的。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
在广播模式下,消息发送流程是这样的:
1) 可以有多个队列
2) 每个队列都要绑定到Exchange(交换机)
3) 生产者发送的消息,只能发送到交换机
4) 交换机把消息发送给绑定过的所有队列
5) 订阅队列的消费者都能拿到消息
消息发送代码如下:
@Testpublic void testFanoutExchange() { // 交换机名称 String exchangeName = "test.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message);}
我们把消息发到了test.fanout这个交换机上,我们想要实现多个消费者通过不同的队列收到这个消息,那消费者的代码如下:
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】");}
小结
交换机的作用是什么?
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
不能缓存消息,路由失败,消息丢失
FanoutExchange的会将消息路由到每个绑定的队列
3.3.2 Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。因此同一时刻你只能跟一个人打视频电话。另一方面,知道如何处理消息,例如递交给某个特别队列、
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。MQ简介
1.同步调用
2.异步调用
3.技术选型
二、不过队列一定要与交换机绑定。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
声明一个名为xxx
.direct
的交换机声明队列
direct.queue1
,绑定xxx.direct
,bindingKey
为blud
和red
声明队列
direct.queue2
,绑定xxx.direct
,bindingKey
为yellow
和red
在
consumer
服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2在publisher中编写测试方法,向
xxx.direct
发送消息
消费者的代码如下:
@RabbitListener(queues = "direct.queue1")public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "direct.queue2")public void listenDirectQueue2(String msg) { System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");}
消息发送者的代码如下:
@Testpublic void testSendDirectExchange() { // 交换机名称 String exchangeName = "xxx.direct"; // 消息 String message = "这是一条红色紧急消息!!!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message);}
这时候消费者1和消费者2都能接收到这条消息
@Testpublic void testSendDirectExchange() { // 交换机名称 String exchangeName = "xxx.direct"; // 消息 String message = "这是一条蓝色级日常消息"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "blue", message);}
这时候就只要消费者1收到了这条消息
小结:
描述下Direct交换机与Fanout交换机的差异?
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
3.3.3 Topic交换机
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。在实际业务中添加MQ
不管是生产者还是消费者,都需要配置MQ的基本信息。
在入门案例中,我们就演示这样的简单模型,如图:
也就是只需要发送和接收即可
第一步就是引依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
第二步:写配置(在publisher
服务的application.yml
中添加配置:)
spring: rabbitmq: host: 192.168.1.1 # 你的虚拟机IP port: 5672 # 端口 virtual-host: /scau# 虚拟主机 username: zzh# 用户名 password: 123 # 密码
然后在publisher
服务中编写测试类SpringAmqpTest
,并利用RabbitTemplate
实现消息发送:
package com.scau.publisher.amqp;import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublic class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); }}
主要用到的api就是:
rabbitTemplate.convertAndSend(queueName, message);
接下来,我们再来实现消息接收。
还是一样,对接收者Service,也是引依赖,写配置,这里不过多赘述
然后在consumer
服务的中新建一个类SpringRabbitListener
,代码如下:
package com.scau.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class SpringRabbitListener { // 利用RabbitListener来声明要监听的队列信息 // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。Consumer:消费者,与以前一样,订阅队列,没有变化
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
Fanout:广播,将消息交给所有绑定到交换机的队列。
2.3绑定关系
点击Exchanges
选项卡,点击amq.fanout
交换机,进入交换机详情页,然后点击Bindings
菜单,在表单中填写要绑定的队列名称:

再次回到exchange页面,找到刚刚绑定的amq.fanout
,点击进入详情页,再次发送一条消息:
回到Queues
页面,可以发现hello.queue
中已经有一条消息了:

点击队列名称,进入详情页,查看队列详情,这次我们点击get message:
可以看到消息到达队列了:
这个时候如果有消费者监听了MQ的hello.queue1
或hello.queue2
队列,自然就能接收到消息了。Kafka
追求消息低延迟:RabbitMQ、因此你可以多线操作,同时跟多人聊天。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。
2.异步调用
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
消息发送者:投递消息的人,就是原来的调用方
消息Broker:管理、现在采用了异步调用,解除了耦合,他们即便执行过程中出现了故障,也不会影响到支付服务。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。
所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。
综上,同步调用的方式存在下列问题:
拓展性差
性能下降
级联失败
而要解决这些问题,我们就必须用异步调用的方式来代替同步调用。递交给所有队列、在实际业务中添加MQ
1.添加依赖
2.配置MQ地址
3.接收信息
4.发送消息
一、而相关的微服务都可以订阅消息通知,一旦消息到达Broker,则会分发给每一个订阅了的微服务,处理各自的业务。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。RabbitMQ
追求可靠性:RabbitMQ、也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通讯。。RocketMQ
追求吞吐能力:RocketMQ、交换机及其绑定关系
基于注解的监听器模式,异步接收消息
封装了RabbitTemplate工具,用于发送消息
3.1快速入门
在之前的案例中,我们都是经过交换机发送消息到队列,不过有时候为了测试方便,我们也可以直接向队列发送消息,跳过交换机。但是,在某些场景下,我们希望不同的消息被不同的队列消费。在这个过程中是很容易出现错误的。RabbitMQ
1.安装
安装说明参考以下博客:RabbitMQ安装教程(超详细)-CSDN博客
安装完成后,我们访问15672端口即可看到管理控制台。
我们现在希望消费者1消费更多消息(毕竟他比较快)
只需要开启一个配置即可,在消费者的yml文件下配置以下内容即可:
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
再次测试,结果如下:
消费者1接收到消息:【hello, message_0】21:12:51.659664200消费者2........接收到消息:【hello, message_1】21:12:51.680610消费者1接收到消息:【hello, message_2】21:12:51.703625消费者1接收到消息:【hello, message_3】21:12:51.724330100消费者1接收到消息:【hello, message_4】21:12:51.746651100消费者1接收到消息:【hello, message_5】21:12:51.768401400消费者1接收到消息:【hello, message_6】21:12:51.790511400消费者1接收到消息:【hello, message_7】21:12:51.812559800消费者1接收到消息:【hello, message_8】21:12:51.834500600消费者1接收到消息:【hello, message_9】21:12:51.857438800消费者1接收到消息:【hello, message_10】21:12:51.880379600消费者2........接收到消息:【hello, message_11】21:12:51.899327100消费者1接收到消息:【hello, message_12】21:12:51.922828400消费者1接收到消息:【hello, message_13】21:12:51.945617400消费者1接收到消息:【hello, message_14】21:12:51.968942500消费者1接收到消息:【hello, message_15】21:12:51.992215400消费者1接收到消息:【hello, message_16】21:12:52.013325600消费者1接收到消息:【hello, message_17】21:12:52.035687100消费者1接收到消息:【hello, message_18】21:12:52.058188消费者1接收到消息:【hello, message_19】21:12:52.081208400消费者2........接收到消息:【hello, message_20】21:12:52.103406200消费者1接收到消息:【hello, message_21】21:12:52.123827300消费者1接收到消息:【hello, message_22】21:12:52.146165100消费者1接收到消息:【hello, message_23】21:12:52.168828300消费者1接收到消息:【hello, message_24】21:12:52.191769500消费者1接收到消息:【hello, message_25】21:12:52.214839100消费者1接收到消息:【hello, message_26】21:12:52.238998700消费者1接收到消息:【hello, message_27】21:12:52.259772600消费者1接收到消息:【hello, message_28】21:12:52.284131800消费者2........接收到消息:【hello, message_29】21:12:52.306190600消费者1接收到消息:【hello, message_30】21:12:52.325315800消费者1接收到消息:【hello, message_31】21:12:52.347012500消费者1接收到消息:【hello, message_32】21:12:52.368508600消费者1接收到消息:【hello, message_33】21:12:52.391785100消费者1接收到消息:【hello, message_34】21:12:52.416383800消费者1接收到消息:【hello, message_35】21:12:52.439019消费者1接收到消息:【hello, message_36】21:12:52.461733900消费者1接收到消息:【hello, message_37】21:12:52.485990消费者1接收到消息:【hello, message_38】21:12:52.509219900消费者2........接收到消息:【hello, message_39】21:12:52.523683400消费者1接收到消息:【hello, message_40】21:12:52.547412100消费者1接收到消息:【hello, message_41】21:12:52.571191800消费者1接收到消息:【hello, message_42】21:12:52.593024600消费者1接收到消息:【hello, message_43】21:12:52.616731800消费者1接收到消息:【hello, message_44】21:12:52.640317消费者1接收到消息:【hello, message_45】21:12:52.663111100消费者1接收到消息:【hello, message_46】21:12:52.686727消费者1接收到消息:【hello, message_47】21:12:52.709266500消费者2........接收到消息:【hello, message_48】21:12:52.725884900消费者1接收到消息:【hello, message_49】21:12:52.746299900
小结:
Work模型的使用:
多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量
3.3交换机
在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。
3.AMQP
上述我们都是在控制台中去收发消息,操作起来也十分方便,但是我们以后再开发业务的时候肯定不会在上面操作的,都是基于编程实现的。
在大多数电商业务中,用户支付成功后都会以短信或者其它方式通知用户,告知支付成功。
由于RabbitMQ
采用了AMQP协议,因此它具备跨语言的特性。发微信可以同时与多个人收发微信,但是往往响应会有延迟。安全性和性能
架构复杂,后期维护和调试麻烦
3.技术选型
市面上已经有很多种MQ了,我们这里只需要学习RabbitMQ即可,以下是各个MQ的对比。通知服务。
3.3.1 Fanout交换机
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。这时就要用到Direct类型的Exchange。
综上,异步调用的优势包括:
耦合度更低
性能更好
业务拓展性强
故障隔离,避免级联失败
当然,异步通信也并非完美无缺,它存在下列缺点:
完全依赖于Broker的可靠性、会发现消息依然没有到达队列!!
怎么回事呢?(注意,交换机只负责转发消息,不存储,就类似外卖小哥一样,只负责送外卖,不不帮你保管,外卖柜就类似队列)
发送到交换机的消息,只会路由到与其绑定的队列,因此仅仅创建队列是不够的,我们还需要将其与交换机绑定。这看起来似乎没有问题,但是如果我们的整个业务链中还包含着其他业务,比如日志记录等不相干的业务,日志记录失败了,也会导致我们整条业务链全部失败回滚,这肯定是不允许的。并且RabbitMQ
官方也提供了各种不同语言的客户端。生产者投递的消息会暂存在消息队列中,等待消费者处理
exchange
:交换机,负责消息路由。
几种常见MQ的对比:
RabbitMQ ActiveMQ RocketMQ Kafka 公司/社区 Rabbit Apache 阿里 Apache 开发语言 Erlang Java Java Scala&Java 协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议 可用性 高 一般 高 高 单机吞吐量 一般 差 高 非常高 消息延迟 微秒级 毫秒级 毫秒级 毫秒以内 消息可靠性 高 一般 高 一般
追求可用性:Kafka、暂存、消费者是trade-service)
1.添加依赖
<!--消息发送--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.配置MQ地址
spring: rabbitmq: host: 192.168.1.1 # 你的虚拟机IP port: 5672 # 端口 virtual-host: /test# 虚拟主机 username: test# 用户名 password: 123 # 密码
3.接收信息
先创建一个单独的listener类出来

代码如下:
package com.test.trade.listener;import com.hmall.trade.service.IOrderService;import lombok.RequiredArgsConstructor;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RequiredArgsConstructorpublic class PayStatusListener { private final IOrderService orderService; @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "trade.pay.success.queue", durable = "true"), exchange = @Exchange(name = "pay.topic"), key = "pay.success" )) public void listenPaySuccess(Long orderId){ orderService.markOrderPaySuccess(orderId); }}
4.发送消息
修改pay-service
服务下的com.test.pay.service.impl.PayOrderServiceImpl
类中的tryPayOrderByBalance
方法:
private final RabbitTemplate rabbitTemplate;@Override@Transactionalpublic void tryPayOrderByBalance(PayOrderDTO payOrderDTO) { // 1.查询支付单 PayOrder po = getById(payOrderDTO.getId()); // 2.判断状态 if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){ // 订单不是未支付,状态异常 throw new BizIllegalException("交易已支付或关闭!"); } // 3.尝试扣减余额 userClient.deductMoney(payOrderDTO.getPw(), po.getAmount()); // 4.修改支付单状态 boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now()); if (!success) { throw new BizIllegalException("交易已支付或关闭!"); } // 5.修改订单状态 // tradeClient.markOrderPaySuccess(po.getBizOrderNo()); try { rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo()); } catch (Exception e) { log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}}