发布时间:2025-06-24 18:51:57 作者:北方职教升学中心 阅读量:844
交换机、这也就是为什么第二个延时2秒,却后执行
2.6. RabbitMQ 插件实现延迟队列
略
发布时间:2025-06-24 18:51:57 作者:北方职教升学中心 阅读量:844
交换机、这也就是为什么第二个延时2秒,却后执行
略
public class Producer01 { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); // 设置消息的 TTL 时间 10s AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); // 该信息是用作演示队列个数限制 for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes()); System.out.println("生产者发送消息:" + message); } }}
public class Consumer02 { //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明死信交换机 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明队列 String deadQueue = "DEAD-QUEUE"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收死信消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Consumer02 接收到消息" + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); }}
启动 消费者C1 ,之后关闭消费者,模拟其接收不到消息;再启动 Producer
生产者未发送消息
生产者发送10条消息,关闭消费者C1,此时正常的消息队列有10条未消费的信息
时间过去 10s,正常队列里面的消息由于没有被消费,消息进入死信队列
以上步骤完成后,启动 C2 消费者,它消费死信队列里面的消息
// 设置正常队列的长度限制,例如发10个,4个则为死信params.put("x-max-length",6);
注意:此时需要把原先队列删除 因为参数改变了
消费者 C1 消费 6 个消息,设置队列中最多是 6 个消息
消费者 C2 从死信队列中消费 4 个消息
关闭消息者 C1,然后生产者发送消息之后
启动消费者 C1,然后再启动消费者 C2
消费者 C2 从死信队列中获取消息 info5,也就是消费者 C1 拒绝的消息 info5
延迟队列的内部是有序的,特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列
使用场景:
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是 ms;换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信";
如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL
如果设置队列设置 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而消息设置 TTL 属性,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;
注意:如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--swagger--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>3.0.0</version> </dependency> <!--RabbitMQ 测试依赖--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency></dependencies>
spring.rabbitmq.host=42.192.149.71spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456
@Configurationpublic class TtlQueueConfig { public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //死信队列 public static final String DEAD_LETTER_QUEUE = "QD"; // 声明正常交换机 @Bean("xExchange") public DirectExchange getXExchange() { return new DirectExchange(X_EXCHANGE); } // 声明死信交换机 @Bean("yExchange") public DirectExchange getYExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } // 声明队列 QA , 过期时间为 10s 并绑定到对应的死信交换机 @Bean("QA") public Queue getQAQueue() { Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); } // 声明队列 QB , 过期时间为 40s 并绑定到对应的死信交换机 @Bean("QB") public Queue getQBQueue() { Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("y-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("y-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 10000 * 40); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); } // 队列 QA 绑定正常交换机 X @Bean public Binding queueQABindingX(@Qualifier(value = "QA") Queue queue, @Qualifier(value = "xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queue).to(xExchange).with("XA"); } // 队列 QB 绑定正常交换机 X @Bean public Binding queueQBBindingX(@Qualifier(value = "QB") Queue queue, @Qualifier(value = "xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queue).to(xExchange).with("XB"); } // 声明死信队列 QD @Bean(value = "QD") public Queue getQDQueue() { return new Queue(DEAD_LETTER_QUEUE); } // 死信队列 QD 绑定死信交换机 Y @Bean public Binding deadLetterBindingQAD(@Qualifier(value = "QD") Queue queue, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queue).to(yExchange).with("YD"); }}
访问:http://localhost:8080/ttl/sendMsg/范智诚
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了
如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
新增了一个队列 QC,绑定关系如下:该队列不设置TTL 时间
新增的配置类代码:
@Configurationpublic class MsgTtlQueueConfig { public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String QUEUE_C = "QC"; //声明正常队列 QC 并绑定死信交换机 Y @Bean("QC") public Queue queueB() { Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //没有声明 TTL 属性 return QueueBuilder.durable(QUEUE_C).withArguments(args).build(); } //声明队列 QC 绑定 X 交换机 @Bean public Binding queuecBindingX(@Qualifier("QC") Queue queue, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queue).to(xExchange).with("XC"); }}
/** * 延时队列优化 * * @param message 消息 * @param ttlTime 延时的毫秒 */@GetMapping("sendExpirationMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) { rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> { correlationData.getMessageProperties().setExpiration(ttlTime); return correlationData; }); log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);}
发送请求访问:http://localhost:8080/ttl/sendExpirationMsg/范智诚/10000
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“;因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
死信也就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或 queue 里,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有死信队列
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中;还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
public class Consumer01 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明死信和普通交换机,类型为 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明死信队列 String deadQueue = "DEAD-QUEUE"; channel.queueDeclare(deadQueue, false, false, false, null); // 死信队列绑定:队列、