发布时间:2025-06-24 18:51:57  作者:北方职教升学中心  阅读量:844


交换机、这也就是为什么第二个延时2秒,却后执行

2.6. RabbitMQ 插件实现延迟队列

路由键(routingKey) channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常队列绑定死信队列信息的其他参数 Map<String, Object> params = new HashMap<>(); // 过期时间一般设置在生产者,过期时间为 10s // params.put("x-message-ttl", 10000) // 正常队列绑定死信交换机,参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 正常队列设置死信 routing-key,参数 key 是固定值 params.put("x-dead-letter-routing-key", "lisi"); //正常队列 String normalQueue = "NORMAL-QUEUE"; channel.queueDeclare(normalQueue, false, false, false, params); // 正常队列绑定正常交换机 channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Consumer01 接收到消息" + message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); }}
public class Producer01 {    private static final String NORMAL_EXCHANGE = "normal_exchange";    public static void main(String[] argv) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);        // 设置消息的 TTL 时间 10s        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();        // 该信息是用作演示队列个数限制        for (int i = 1; i < 11; i++) {            String message = "info" + i;            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());            System.out.println("生产者发送消息:" + message);        }    }}
public class Consumer02 {    //死信交换机名称    private static final String DEAD_EXCHANGE = "dead_exchange";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        // 声明死信交换机        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);        // 声明队列        String deadQueue = "DEAD-QUEUE";        channel.queueDeclare(deadQueue, false, false, false, null);        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");        System.out.println("等待接收死信消息........... ");        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);            System.out.println("Consumer02 接收到消息" + message);        };        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {        });    }}

启动 消费者C1 ,之后关闭消费者,模拟其接收不到消息;再启动 Producer

生产者未发送消息

生产者发送10条消息,关闭消费者C1,此时正常的消息队列有10条未消费的信息

时间过去 10s,正常队列里面的消息由于没有被消费,消息进入死信队列

以上步骤完成后,启动 C2 消费者,它消费死信队列里面的消息

1.3.2. 死信之 最大长度

  1. 消息生产者代码去掉 TTL 属性

  1. 消费者 C1 修改以下代码(启动之后关闭该消费者C1,模拟其接收不到消息)
// 设置正常队列的长度限制,例如发10个,4个则为死信params.put("x-max-length",6);

注意:此时需要把原先队列删除 因为参数改变了

  1. 消费者C2 代码不变(关闭C1,启动生产者)

  1. 启动消费者 C1 和 C2

消费者 C1 消费 6 个消息,设置队列中最多是 6 个消息

消费者 C2 从死信队列中消费 4 个消息

1.3.3. 死信之 消息被拒绝

  1. 消息生产者代码保持不变
  2. C1 消费者代码(启动之后关闭该消费者C1,模拟其接收不到消息),拒收消息"info5"
  3. 开启手动应答
  4. C2 消费者代码保持不变

关闭消息者 C1,然后生产者发送消息之后

启动消费者 C1,然后再启动消费者 C2

消费者 C2 从死信队列中获取消息 info5,也就是消费者 C1 拒绝的消息 info5

2. 延迟队列

2.1. 延迟队列介绍

延迟队列的内部是有序的,特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

使用场景:

  1. 订单在十分钟之内未支付则自动取消
  2. 新创建的店铺如果十天内没有上传过商品,则会自动发消息提醒;
  3. 账号注册成功,如果三天内没登陆则进行短信提醒
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

2.2. RabbitMQ 中的 TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是 ms;换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"

如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL

  1. 队列设置 TTL:创建队列时设置“x-message-ttl”属性

  1. 消息设置 TTL:针对每条消息设置 TTL

  1. 两者的区别

如果设置队列设置 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而消息设置 TTL 属性,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;

注意:如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃

2.3. 整合 SpringBoot

  1. 添加依赖
<dependencies>  <dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter</artifactId>  </dependency>  <!--RabbitMQ 依赖-->  <dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId>  </dependency>  <dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-web</artifactId>  </dependency>  <dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-test</artifactId>    <scope>test</scope>  </dependency>  <dependency>    <groupId>com.alibaba</groupId>    <artifactId>fastjson</artifactId>    <version>1.2.47</version>  </dependency>  <dependency>    <groupId>org.projectlombok</groupId>    <artifactId>lombok</artifactId>  </dependency>  <!--swagger-->  <dependency>    <groupId>io.springfox</groupId>    <artifactId>springfox-swagger2</artifactId>    <version>3.0.0</version>  </dependency>  <dependency>    <groupId>io.springfox</groupId>    <artifactId>springfox-swagger-ui</artifactId>    <version>3.0.0</version>  </dependency>  <!--RabbitMQ 测试依赖-->  <dependency>    <groupId>org.springframework.amqp</groupId>    <artifactId>spring-rabbit-test</artifactId>    <scope>test</scope>  </dependency></dependencies>
  1. 修改配置文件
spring.rabbitmq.host=42.192.149.71spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456
  1. 添加 Swagger 配置类(可以去Swagger查看笔记)

2.4. 队列 TTL

@Configurationpublic class TtlQueueConfig {    public static final String X_EXCHANGE = "X";    public static final String QUEUE_A = "QA";    public static final String QUEUE_B = "QB";    //死信交换机    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";    //死信队列    public static final String DEAD_LETTER_QUEUE = "QD";    // 声明正常交换机    @Bean("xExchange")    public DirectExchange getXExchange() {        return new DirectExchange(X_EXCHANGE);    }    // 声明死信交换机    @Bean("yExchange")    public DirectExchange getYExchange() {        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);    }    // 声明队列 QA , 过期时间为 10s 并绑定到对应的死信交换机    @Bean("QA")    public Queue getQAQueue() {        Map<String, Object> args = new HashMap<>(3);        //声明当前队列绑定的死信交换机        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);        //声明当前队列的死信路由 key        args.put("x-dead-letter-routing-key", "YD");        //声明队列的 TTL        args.put("x-message-ttl", 10000);        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();    }    // 声明队列 QB , 过期时间为 40s 并绑定到对应的死信交换机    @Bean("QB")    public Queue getQBQueue() {        Map<String, Object> args = new HashMap<>(3);        //声明当前队列绑定的死信交换机        args.put("y-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);        //声明当前队列的死信路由 key        args.put("y-dead-letter-routing-key", "YD");        //声明队列的 TTL        args.put("x-message-ttl", 10000 * 40);        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();    }    // 队列 QA 绑定正常交换机 X    @Bean    public Binding queueQABindingX(@Qualifier(value = "QA") Queue queue,                                   @Qualifier(value = "xExchange") DirectExchange xExchange) {        return BindingBuilder.bind(queue).to(xExchange).with("XA");    }    // 队列 QB 绑定正常交换机 X    @Bean    public Binding queueQBBindingX(@Qualifier(value = "QB") Queue queue,                                   @Qualifier(value = "xExchange") DirectExchange xExchange) {        return BindingBuilder.bind(queue).to(xExchange).with("XB");    }    // 声明死信队列 QD    @Bean(value = "QD")    public Queue getQDQueue() {        return new Queue(DEAD_LETTER_QUEUE);    }    // 死信队列 QD 绑定死信交换机 Y    @Bean    public Binding deadLetterBindingQAD(@Qualifier(value = "QD") Queue queue,                                        @Qualifier("yExchange") DirectExchange yExchange) {        return BindingBuilder.bind(queue).to(yExchange).with("YD");    }}

访问:http://localhost:8080/ttl/sendMsg/范智诚

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了

如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

2.5. 延迟队列 TTL 优化

新增了一个队列 QC,绑定关系如下:该队列不设置TTL 时间

新增的配置类代码:

@Configurationpublic class MsgTtlQueueConfig {    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";    public static final String QUEUE_C = "QC";    //声明正常队列 QC 并绑定死信交换机 Y    @Bean("QC")    public Queue queueB() {        Map<String, Object> args = new HashMap<>(3);        //声明当前队列绑定的死信交换机        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);        //声明当前队列的死信路由 key        args.put("x-dead-letter-routing-key", "YD");        //没有声明 TTL 属性        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();    }    //声明队列 QC 绑定 X 交换机    @Bean    public Binding queuecBindingX(@Qualifier("QC") Queue queue,                                  @Qualifier("xExchange") DirectExchange xExchange) {        return BindingBuilder.bind(queue).to(xExchange).with("XC");    }}
/** * 延时队列优化 * * @param message 消息 * @param ttlTime 延时的毫秒 */@GetMapping("sendExpirationMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {    rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {        correlationData.getMessageProperties().setExpiration(ttlTime);        return correlationData;    });    log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);}

发送请求访问:http://localhost:8080/ttl/sendExpirationMsg/范智诚/10000

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“;因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

1. 死信队列

1.1. 死信的概念

死信也就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或 queue 里,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有死信队列

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中;还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

1.2. 死信的来源

  1. 消息TTL过期(time to live,生存时间)
  2. 队列达到最大长度(队列满了,无法再添加数据到MQ中)
  3. 消息被拒绝(basic.reject 或 basic.nack 并且 requeue = false)

1.3. 死信实战

1.3.1. 死信之 TTL

public class Consumer01 {    //普通交换机名称    private static final String NORMAL_EXCHANGE = "normal_exchange";    //死信交换机名称    private static final String DEAD_EXCHANGE = "dead_exchange";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        // 声明死信和普通交换机,类型为 direct        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);        // 声明死信队列        String deadQueue = "DEAD-QUEUE";        channel.queueDeclare(deadQueue, false, false, false, null);        // 死信队列绑定:队列、