返还金币业务?

发布时间:2025-06-24 18:59:38  作者:北方职教升学中心  阅读量:590


或是将消息丢弃。如图:

解读:

  • 同步通讯:就如同打视频电话,双方的交互都是实时的。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

    在广播模式下,消息发送流程是这样的:

    • 1) 可以有多个队列

    • 2) 每个队列都要绑定到Exchange(交换机)

    • 3) 生产者发送的消息,只能发送到交换机

    • 4) 交换机把消息发送给绑定过的所有队列

    • 5) 订阅队列的消费者都能拿到消息

     消息发送代码如下:

    @Testpublic void testFanoutExchange() {    // 交换机名称    String exchangeName = "test.fanout";    // 消息    String message = "hello, everyone!";    rabbitTemplate.convertAndSend(exchangeName, "", message);}

    我们把消息发到了test.fanout这个交换机上,我们想要实现多个消费者通过不同的队列收到这个消息,那消费者的代码如下:

    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");}

    小结

    交换机的作用是什么?

    • 接收publisher发送的消息

    • 将消息按照规则路由到与之绑定的队列

    • 不能缓存消息,路由失败,消息丢失

    • FanoutExchange的会将消息路由到每个绑定的队列

    3.3.2 Direct交换机

    在Fanout模式中,一条消息,会被所有订阅的队列都消费。因此同一时刻你只能跟一个人打视频电话。另一方面,知道如何处理消息,例如递交给某个特别队列、

    因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。MQ简介

    1.同步调用

    2.异步调用

    3.技术选型

    二、不过队列一定要与交换机绑定。

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

 

  1. 声明一个名为xxx.direct的交换机

  2. 声明队列direct.queue1,绑定xxx.directbindingKeybludred

  3. 声明队列direct.queue2,绑定xxx.directbindingKeyyellowred

  4. consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

  5. 在publisher中编写测试方法,向xxx.direct发送消息

消费者的代码如下:

@RabbitListener(queues = "direct.queue1")public void listenDirectQueue1(String msg) {    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "direct.queue2")public void listenDirectQueue2(String msg) {    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");}

 消息发送者的代码如下:

@Testpublic void testSendDirectExchange() {    // 交换机名称    String exchangeName = "xxx.direct";    // 消息    String message = "这是一条红色紧急消息!!!";    // 发送消息    rabbitTemplate.convertAndSend(exchangeName, "red", message);}

这时候消费者1和消费者2都能接收到这条消息

@Testpublic void testSendDirectExchange() {    // 交换机名称    String exchangeName = "xxx.direct";    // 消息    String message = "这是一条蓝色级日常消息";    // 发送消息    rabbitTemplate.convertAndSend(exchangeName, "blue", message);}

这时候就只要消费者1收到了这条消息

小结:

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列

  • Direct交换机根据RoutingKey判断路由给哪个队列

  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.3.3 Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。在实际业务中添加MQ

不管是生产者还是消费者,都需要配置MQ的基本信息。

在入门案例中,我们就演示这样的简单模型,如图:

也就是只需要发送和接收即可

 第一步就是引依赖:

<dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId></dependency>

第二步:写配置(在publisher服务的application.yml中添加配置:)

spring:  rabbitmq:    host: 192.168.1.1 # 你的虚拟机IP    port: 5672 # 端口    virtual-host: /scau# 虚拟主机    username: zzh# 用户名    password: 123 # 密码

然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.scau.publisher.amqp;import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublic class SpringAmqpTest {    @Autowired    private RabbitTemplate rabbitTemplate;    @Test    public void testSimpleQueue() {        // 队列名称        String queueName = "simple.queue";        // 消息        String message = "hello, spring amqp!";        // 发送消息        rabbitTemplate.convertAndSend(queueName, message);    }}

主要用到的api就是:

rabbitTemplate.convertAndSend(queueName, message);

接下来,我们再来实现消息接收。

还是一样,对接收者Service,也是引依赖,写配置,这里不过多赘述

然后在consumer服务的中新建一个类SpringRabbitListener,代码如下:

package com.scau.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class SpringRabbitListener {    // 利用RabbitListener来声明要监听的队列信息    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。

  • Consumer:消费者,与以前一样,订阅队列,没有变化

  • Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

    交换机的类型有四种:

    • Fanout:广播,将消息交给所有绑定到交换机的队列。

       2.3绑定关系

      点击Exchanges选项卡,点击amq.fanout交换机,进入交换机详情页,然后点击Bindings菜单,在表单中填写要绑定的队列名称:

      再次回到exchange页面,找到刚刚绑定的amq.fanout,点击进入详情页,再次发送一条消息:

       回到Queues页面,可以发现hello.queue中已经有一条消息了:

      点击队列名称,进入详情页,查看队列详情,这次我们点击get message:

       

      可以看到消息到达队列了: 

       

      这个时候如果有消费者监听了MQ的hello.queue1hello.queue2队列,自然就能接收到消息了。Kafka

      追求消息低延迟:RabbitMQ、因此你可以多线操作,同时跟多人聊天。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。

      2.异步调用

      异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

      • 消息发送者:投递消息的人,就是原来的调用方

      • 消息Broker:管理、现在采用了异步调用,解除了耦合,他们即便执行过程中出现了故障,也不会影响到支付服务。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。

        所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。

        综上,同步调用的方式存在下列问题:

        • 拓展性差

        • 性能下降

        • 级联失败

        而要解决这些问题,我们就必须用异步调用的方式来代替同步调用。递交给所有队列、在实际业务中添加MQ

        1.添加依赖

        2.配置MQ地址

        3.接收信息

        4.发送消息


        一、而相关的微服务都可以订阅消息通知,一旦消息到达Broker,则会分发给每一个订阅了的微服务,处理各自的业务。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。RabbitMQ

        追求可靠性:RabbitMQ、也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通。。RocketMQ

        追求吞吐能力:RocketMQ、交换机及其绑定关系

      • 基于注解的监听器模式,异步接收消息

      • 封装了RabbitTemplate工具,用于发送消息

      3.1快速入门

      在之前的案例中,我们都是经过交换机发送消息到队列,不过有时候为了测试方便,我们也可以直接向队列发送消息,跳过交换机。但是,在某些场景下,我们希望不同的消息被不同的队列消费。在这个过程中是很容易出现错误的。RabbitMQ

      1.安装

      安装说明参考以下博客:RabbitMQ安装教程(超详细)-CSDN博客

      安装完成后,我们访问15672端口即可看到管理控制台。

      我们现在希望消费者1消费更多消息(毕竟他比较快)

      只需要开启一个配置即可,在消费者的yml文件下配置以下内容即可:

      spring:  rabbitmq:    listener:      simple:        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

      再次测试,结果如下:

      消费者1接收到消息:【hello, message_0】21:12:51.659664200消费者2........接收到消息:【hello, message_1】21:12:51.680610消费者1接收到消息:【hello, message_2】21:12:51.703625消费者1接收到消息:【hello, message_3】21:12:51.724330100消费者1接收到消息:【hello, message_4】21:12:51.746651100消费者1接收到消息:【hello, message_5】21:12:51.768401400消费者1接收到消息:【hello, message_6】21:12:51.790511400消费者1接收到消息:【hello, message_7】21:12:51.812559800消费者1接收到消息:【hello, message_8】21:12:51.834500600消费者1接收到消息:【hello, message_9】21:12:51.857438800消费者1接收到消息:【hello, message_10】21:12:51.880379600消费者2........接收到消息:【hello, message_11】21:12:51.899327100消费者1接收到消息:【hello, message_12】21:12:51.922828400消费者1接收到消息:【hello, message_13】21:12:51.945617400消费者1接收到消息:【hello, message_14】21:12:51.968942500消费者1接收到消息:【hello, message_15】21:12:51.992215400消费者1接收到消息:【hello, message_16】21:12:52.013325600消费者1接收到消息:【hello, message_17】21:12:52.035687100消费者1接收到消息:【hello, message_18】21:12:52.058188消费者1接收到消息:【hello, message_19】21:12:52.081208400消费者2........接收到消息:【hello, message_20】21:12:52.103406200消费者1接收到消息:【hello, message_21】21:12:52.123827300消费者1接收到消息:【hello, message_22】21:12:52.146165100消费者1接收到消息:【hello, message_23】21:12:52.168828300消费者1接收到消息:【hello, message_24】21:12:52.191769500消费者1接收到消息:【hello, message_25】21:12:52.214839100消费者1接收到消息:【hello, message_26】21:12:52.238998700消费者1接收到消息:【hello, message_27】21:12:52.259772600消费者1接收到消息:【hello, message_28】21:12:52.284131800消费者2........接收到消息:【hello, message_29】21:12:52.306190600消费者1接收到消息:【hello, message_30】21:12:52.325315800消费者1接收到消息:【hello, message_31】21:12:52.347012500消费者1接收到消息:【hello, message_32】21:12:52.368508600消费者1接收到消息:【hello, message_33】21:12:52.391785100消费者1接收到消息:【hello, message_34】21:12:52.416383800消费者1接收到消息:【hello, message_35】21:12:52.439019消费者1接收到消息:【hello, message_36】21:12:52.461733900消费者1接收到消息:【hello, message_37】21:12:52.485990消费者1接收到消息:【hello, message_38】21:12:52.509219900消费者2........接收到消息:【hello, message_39】21:12:52.523683400消费者1接收到消息:【hello, message_40】21:12:52.547412100消费者1接收到消息:【hello, message_41】21:12:52.571191800消费者1接收到消息:【hello, message_42】21:12:52.593024600消费者1接收到消息:【hello, message_43】21:12:52.616731800消费者1接收到消息:【hello, message_44】21:12:52.640317消费者1接收到消息:【hello, message_45】21:12:52.663111100消费者1接收到消息:【hello, message_46】21:12:52.686727消费者1接收到消息:【hello, message_47】21:12:52.709266500消费者2........接收到消息:【hello, message_48】21:12:52.725884900消费者1接收到消息:【hello, message_49】21:12:52.746299900

      小结:

      Work模型的使用:

      • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理

      • 通过设置prefetch来控制消费者预取的消息数量

      3.3交换机

      在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。

       3.AMQP

      上述我们都是在控制台中去收发消息,操作起来也十分方便,但是我们以后再开发业务的时候肯定不会在上面操作的,都是基于编程实现的。

      在大多数电商业务中,用户支付成功后都会以短信或者其它方式通知用户,告知支付成功。

      由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。发微信可以同时与多个人收发微信,但是往往响应会有延迟。安全性和性能

    • 架构复杂,后期维护和调试麻烦

    3.技术选型

    市面上已经有很多种MQ了,我们这里只需要学习RabbitMQ即可,以下是各个MQ的对比。通知服务。

    3.3.1 Fanout交换机 

    Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。这时就要用到Direct类型的Exchange。

    综上,异步调用的优势包括:

    • 耦合度更低

    • 性能更好

    • 业务拓展性强

    • 故障隔离,避免级联失败

    当然,异步通信也并非完美无缺,它存在下列缺点:

    • 完全依赖于Broker的可靠性、会发现消息依然没有到达队列!!

      怎么回事呢?(注意,交换机只负责转发消息,不存储,就类似外卖小哥一样,只负责送外卖,不不帮你保管,外卖柜就类似队列)

      发送到交换机的消息,只会路由到与其绑定的队列,因此仅仅创建队列是不够的,我们还需要将其与交换机绑定。这看起来似乎没有问题,但是如果我们的整个业务链中还包含着其他业务,比如日志记录等不相干的业务,日志记录失败了,也会导致我们整条业务链全部失败回滚,这肯定是不允许的。并且RabbitMQ官方也提供了各种不同语言的客户端。生产者投递的消息会暂存在消息队列中,等待消费者处理

    • exchange:交换机,负责消息路由。

      几种常见MQ的对比:

      RabbitMQActiveMQRocketMQKafka
      公司/社区RabbitApache阿里Apache
      开发语言ErlangJavaJavaScala&Java
      协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
      可用性一般
      单机吞吐量一般非常高
      消息延迟微秒级毫秒级毫秒级毫秒以内
      消息可靠性一般一般

      追求可用性:Kafka、暂存、消费者是trade-service)

      1.添加依赖

      <!--消息发送-->  <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-amqp</artifactId>  </dependency>

      2.配置MQ地址

      spring:  rabbitmq:    host: 192.168.1.1 # 你的虚拟机IP    port: 5672 # 端口    virtual-host: /test# 虚拟主机    username: test# 用户名    password: 123 # 密码

      3.接收信息

      先创建一个单独的listener类出来

       代码如下:

      package com.test.trade.listener;import com.hmall.trade.service.IOrderService;import lombok.RequiredArgsConstructor;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component@RequiredArgsConstructorpublic class PayStatusListener {    private final IOrderService orderService;    @RabbitListener(bindings = @QueueBinding(            value = @Queue(name = "trade.pay.success.queue", durable = "true"),            exchange = @Exchange(name = "pay.topic"),            key = "pay.success"    ))    public void listenPaySuccess(Long orderId){        orderService.markOrderPaySuccess(orderId);    }}

      4.发送消息

      修改pay-service服务下的com.test.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:

      private final RabbitTemplate rabbitTemplate;@Override@Transactionalpublic void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {    // 1.查询支付单    PayOrder po = getById(payOrderDTO.getId());    // 2.判断状态    if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){        // 订单不是未支付,状态异常        throw new BizIllegalException("交易已支付或关闭!");    }    // 3.尝试扣减余额    userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());    // 4.修改支付单状态    boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());    if (!success) {        throw new BizIllegalException("交易已支付或关闭!");    }    // 5.修改订单状态    // tradeClient.markOrderPaySuccess(po.getBizOrderNo());    try {        rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());    } catch (Exception e) {        log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}}

    当交易服务、

    RabbitMQ对应的架构如图:

    其中包含几个概念:

    • publisher:生产者,也就是发送消息的一方

    • consumer:消费者,也就是消费消息的一方

    • queue:队列,存储消息。 RocketMQ 、

    • 异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。

      二、RabbitMQ

      1.安装

      2.控制台的使用说明

      2.1交换机

      2.2队列​编辑

       2.3绑定关系

       3.AMQP

      3.1快速入门

      3.2WorkQueues模型

      3.3交换机

      3.3.1 Fanout交换机 

      3.3.2 Direct交换机

      3.3.3 Topic交换机

       3.4 声明交换机和队列

      3.4.1 Fanout示例

      3.4.2  Direct示例

       3.4.3 基于注解声明

      三、

      但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。包括:

      • china.news

      • japan.news

     接下来我们用代码来演示topic交换机的用法:

    消息发送方代码如下:

    /** * topicExchange */@Testpublic void testSendTopicExchange() {    // 交换机名称    String exchangeName = "test.topic";    // 消息    String message = "这是一条国内新闻...";    // 发送消息    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);}

    消息接收方代码如下:

    @RabbitListener(queues = "topic.queue1")public void listenTopicQueue1(String msg){    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "topic.queue2")public void listenTopicQueue2(String msg){    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");}

    同时两个队列的Routing Key如下:

    那么显而易见是两个队列都能收到消息

    小结:

    描述下Direct交换机与Topic交换机的差异?

    • Topic交换机接收的消息RoutingKey必须是多个单词,以 .分割

    • Topic交换机与队列绑定时的bindingKey可以指定通配符

    • #:代表0个或多个词

    • *:代表1个词

     3.4 声明交换机和队列

    在之前我们都是基于RabbitMQ控制台来创建队列、。

    另外,不管是交易服务、

    以下是AMQP的官网:

    Spring AMQPLevel up your Java code and explore what Spring can do for you.icon-default.png?t=O83Ahttps://spring.io/projects/spring-amqp

    SpringAMQP提供了三个功能:

    • 自动声明队列、分为两步:(生产者是pay-service。支付代码完全不用变更,而仅仅是让积分服务也订阅消息即可:

       

      不管后期增加了多少消息订阅者,作为支付服务来讲,执行问扣减余额、

      3.4.1 Fanout示例
      package com.scau.consumer.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FanoutConfig {    /**     * 声明交换机     * @return Fanout类型交换机     */    @Bean    public FanoutExchange fanoutExchange(){        return new FanoutExchange("test.fanout");    }    /**     * 第1个队列     */    @Bean    public Queue fanoutQueue1(){        return new Queue("fanout.queue1");    }    /**     * 绑定队列和交换机     */    @Bean    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);    }    /**     * 第2个队列     */    @Bean    public Queue fanoutQueue2(){        return new Queue("fanout.queue2");    }    /**     * 绑定队列和交换机     */    @Bean    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);    }}
      3.4.2  Direct示例
      package com.scau.consumer.config;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class DirectConfig {    /**     * 声明交换机     * @return Direct类型交换机     */    @Bean    public DirectExchange directExchange(){        return ExchangeBuilder.directExchange("test.direct").build();    }    /**     * 第1个队列     */    @Bean    public Queue directQueue1(){        return new Queue("direct.queue1");    }    /**     * 绑定队列和交换机     */    @Bean    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");    }    /**     * 绑定队列和交换机     */    @Bean    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");    }    /**     * 第2个队列     */    @Bean    public Queue directQueue2(){        return new Queue("direct.queue2");    }    /**     * 绑定队列和交换机     */    @Bean    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");    }    /**     * 绑定队列和交换机     */    @Bean    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");    }}
       3.4.3 基于注解声明

      我们从前文可以知道要想声明一个交换机和队列要编写的代码还是很繁琐的,因此AMQP还支持基于注解的形式声明

      @RabbitListener(bindings = @QueueBinding(    value = @Queue(name = "direct.queue1"),    exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),    key = {"red", "blue"}))public void listenDirectQueue1(String msg){    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(    value = @Queue(name = "direct.queue2"),    exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),    key = {"red", "yellow"}))public void listenDirectQueue2(String msg){    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");}

      topic交换机也是如此:

      @RabbitListener(bindings = @QueueBinding(    value = @Queue(name = "topic.queue1"),    exchange = @Exchange(name = "test.topic", type = ExchangeTypes.TOPIC),    key = "china.#"))public void listenTopicQueue1(String msg){    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(    value = @Queue(name = "topic.queue2"),    exchange = @Exchange(name = "test.topic", type = ExchangeTypes.TOPIC),    key = "#.news"))public void listenTopicQueue2(String msg){    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");}

      三、

      我们在生产者模拟大量消息堆积现象。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。 // 可以看到方法体中接收的就是消息体的内容 @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); }}

      主要是用到@RabbitListener这个注解来接受指定队列的消息

      3.2WorkQueues模型

      我们在快速入门中所展示的是一对一的队列模型,但如果生产者发送的消息过多,只有一个消费者的话消费不过来,就容易导致消息堆积,这时候就可以用到WorkQueues模型,多个消费者消费消息。(开闭原则:拓展开放,修改关闭)

      第二性能下降

      由于我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。但是:

      • 异步调用又该如何实现?

      • 哪些业务适合用异步调用来实现呢?

       简而言之就是:之前我们学习的内容OpenFeign是属于同步调用,使用于实时响应的场景;而MQ是属于异步调用,适用于高效率且不需要实时响应的场景。业务耗时仅仅是这三部分业务耗时,仅仅100ms,大大提高了业务性能。

      这其中就存在3个问题:

      第一拓展性差

      我们目前的业务相对简单,但是随着业务规模扩大,产品的功能也在不断完善。我们最早在控制台使用的正是Fanout交换机

    • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

    • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

    • Headers:头匹配,基于MQ的消息头匹配,用的较少。每个虚拟主机相互独立,有各自的exchange、毕竟收到手里的钱没道理再退回去吧。

      同步调用的方式我们已经学过了,之前的OpenFeign调用就是。Kafka

      虽然说Kafka的吞吐量很高,但是其应用场景在于大数据分析,不适用于我们的日常项目,就类似大炮打蚊子。

    • Queue:消息队列也与以前一样,接收消息、最终整个业务的响应时长就是每次远程调用的执行时长之和:

       假如每个微服务的执行时长都是50ms,则最终整个业务的耗时可能高达300ms,性能太差了。而一旦引入交换机,消息发送的模式会有很大变化:

       

      可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

      • Publisher:生产者,不再发送消息到队列中,而是发给交换机

      • Exchange:交换机,一方面,接收生产者发送的消息。假如产品经理提出需求,用户支付成功后,给用户以积分奖励或者返还金币,你怎么办?是不是要在上述业务中再加入积分业务、

        只不过Topic类型Exchange可以让队列在绑定BindingKey的时候使用通配符!

        BindingKey一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

        通配符规则:

        • #:匹配一个或多个词

        • *:匹配不多不少恰好1个词

        举例:

        • item.#:能够匹配item.spu.insert或者 item.spu

        • item.*:只能匹配item.spu

        假如此时publisher发送的消息使用的RoutingKey共有四种:

        • china.news 代表有中国的新闻消息;

        • china.weather代表中国的天气消息;

        • japan.news则代表日本新闻

        • japan.weather代表日本的天气消息;

        解释:

        • topic.queue1:绑定的是china.#,凡是以 china.开头的routing key都会被匹配到,包括:

          • china.news

          • china.weather

        • topic.queue2:绑定的是#.news,凡是以 .news结尾的 routing key都会被匹配。假如后期产品经理提出这样新的需求,你怎么办?是不是要在上述业务中再加入通知用户的业务?

          某些电商项目中,还会有积分或金币的概念。返还金币业务?

          第三,级联失败

          由于我们是基于OpenFeign调用交易服务、

           还是以余额支付业务为例:

           

          除了扣减余额、

           

          在Direct模型下:

          • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

          • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。转发消息,你可以把它理解成微信服务器

          • 消息接收者:接收和处理消息的人,就是原来的服务提供方

           

          在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。缓存消息。然后接收者根据自己的需求从消息Broker那里订阅消息。

          在publisher服务中的SpringAmqpTest类中添加一个测试方法:

           

          /**     * workQueue     * 向队列中不停发送消息,模拟消息堆积。

          但是大家思考一下,我们假设用户余额充足,扣款已经成功,此时我们应该确保支付流水单更新为已支付,确保交易成功。

          1.同步调用

          之前说过,我们现在基于OpenFeign的调用都属于是同步调用,那么这种方式存在哪些问题呢?

          举个例子,我们以余额支付功能为例来分析,首先看下整个流程:

           

          目前我们采用的是基于OpenFeign的同步调用,也就是说业务执行流程是这样的:

          • 支付服务需要先调用用户服务完成余额扣减

          • 然后支付服务自己要更新支付流水单的状态

          • 然后支付服务调用交易服务,更新业务订单状态为已支付

          三个步骤依次执行。queue

        2.控制台的使用说明

        2.1交换机

         

        此时如果没有队列绑定在交换机上,那发送出去的消息就会丢失 

        2.2队列

         

         

        时,我们再次向amq.fanout交换机发送一条消息。

        最终你的支付业务会越来越臃肿:

         也就是说每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则,拓展性不好。

        假如产品经理提出了新的需求,比如要在支付成功后更新用户积分。更新支付流水状态后,发送消息即可。

        这其实就是同步调用的级联失败问题。生产者发送的消息由交换机决定投递到哪个队列。

        目录

        一、通知服务,还是积分服务,他们的业务与支付关联度低。 */@Testpublic void testWorkQueue() throws InterruptedException { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, message_"; for (int i = 0; i < 50; i++) { // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息 rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); }}

        要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

        @RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());    Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());    Thread.sleep(200);}

        注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

        • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息

        • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

         测试结果如下:

        消费者1接收到消息:【hello, message_0】21:06:00.869555300消费者2........接收到消息:【hello, message_1】21:06:00.884518消费者1接收到消息:【hello, message_2】21:06:00.907454400消费者1接收到消息:【hello, message_4】21:06:00.953332100消费者1接收到消息:【hello, message_6】21:06:00.997867300消费者1接收到消息:【hello, message_8】21:06:01.042178700消费者2........接收到消息:【hello, message_3】21:06:01.086478800消费者1接收到消息:【hello, message_10】21:06:01.087476600消费者1接收到消息:【hello, message_12】21:06:01.132578300消费者1接收到消息:【hello, message_14】21:06:01.175851200消费者1接收到消息:【hello, message_16】21:06:01.218533400消费者1接收到消息:【hello, message_18】21:06:01.261322900消费者2........接收到消息:【hello, message_5】21:06:01.287003700消费者1接收到消息:【hello, message_20】21:06:01.304412400消费者1接收到消息:【hello, message_22】21:06:01.349950100消费者1接收到消息:【hello, message_24】21:06:01.394533900消费者1接收到消息:【hello, message_26】21:06:01.439876500消费者1接收到消息:【hello, message_28】21:06:01.482937800消费者2........接收到消息:【hello, message_7】21:06:01.488977100消费者1接收到消息:【hello, message_30】21:06:01.526409300消费者1接收到消息:【hello, message_32】21:06:01.572148消费者1接收到消息:【hello, message_34】21:06:01.618264800消费者1接收到消息:【hello, message_36】21:06:01.660780600消费者2........接收到消息:【hello, message_9】21:06:01.689189300消费者1接收到消息:【hello, message_38】21:06:01.705261消费者1接收到消息:【hello, message_40】21:06:01.746927300消费者1接收到消息:【hello, message_42】21:06:01.789835消费者1接收到消息:【hello, message_44】21:06:01.834393100消费者1接收到消息:【hello, message_46】21:06:01.875312100消费者2........接收到消息:【hello, message_11】21:06:01.889969500消费者1接收到消息:【hello, message_48】21:06:01.920702500消费者2........接收到消息:【hello, message_13】21:06:02.090725900消费者2........接收到消息:【hello, message_15】21:06:02.293060600消费者2........接收到消息:【hello, message_17】21:06:02.493748消费者2........接收到消息:【hello, message_19】21:06:02.696635100消费者2........接收到消息:【hello, message_21】21:06:02.896809700消费者2........接收到消息:【hello, message_23】21:06:03.099533400消费者2........接收到消息:【hello, message_25】21:06:03.301446400消费者2........接收到消息:【hello, message_27】21:06:03.504999100消费者2........接收到消息:【hello, message_29】21:06:03.705702500消费者2........接收到消息:【hello, message_31】21:06:03.906601200消费者2........接收到消息:【hello, message_33】21:06:04.108118500消费者2........接收到消息:【hello, message_35】21:06:04.308945400消费者2........接收到消息:【hello, message_37】21:06:04.511547700消费者2........接收到消息:【hello, message_39】21:06:04.714038400消费者2........接收到消息:【hello, message_41】21:06:04.916192700消费者2........接收到消息:【hello, message_43】21:06:05.116286400消费者2........接收到消息:【hello, message_45】21:06:05.318055100消费者2........接收到消息:【hello, message_47】21:06:05.520656400消费者2........接收到消息:【hello, message_49】21:06:05.723106700

        我们发现由于消费者1收消息的速度比较快,于是消费者1收完了消息(每个消费者均分消息)之后就让消费者2慢慢执行,这效率肯定会低很多。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。

        这样,发送消息的人和接收消息的人就完全解耦了。通知服务出现故障时,整个事务都会回滚,交易失败。到底如何操作,取决于Exchange的类型。每当发送方发送消息后,接受者都能获取消息并处理。

        因此,这里不能因为短信通知、

      • virtual host:虚拟主机,起到数据隔离的作用。

      两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。交换机。而是改为发送一条消息到Broker。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢?

      我们先来看看什么是同步通讯和异步通讯。更新订单状态失败而回滚整个事务。更新支付流水单状态以外,其它调用逻辑全部取消。MQ简介

      微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。