发布时间:2025-06-24 20:30:53 作者:北方职教升学中心 阅读量:951
找不到交换机2.交换机路由到队列,也存在丢失消息的可能性问题解决方案:1.生产者确认模式2.生产者回退模式目标: 演示生产者确认的效果,消息百分百进入交换机实现步骤:1.配置开启生产者确认模式2.编写生产者确认回调方法,处理业务逻辑3.在RabbitMQ模板对象中,设置回调逻辑4.测试请求一下 */importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassMessageController{//发送消息接口@AutowiredprivateRabbitTemplaterabbitTemplate;//定义发送消息的接口@RequestMapping("/direct/sendMsg")publicStringsendMsgtoMQ(Stringexchange,StringroutingKey,Stringmsg){rabbitTemplate.convertAndSend(exchange,routingKey,msg);return"已投递";}}
MessageConfirmRallback.java
packagecom.cubemall.controller;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;/*发送消息回调确认类:消息如果没有进入交换机,会回调当前类中的confirm */@ComponentpublicclassMessageConfirmRallbackimplementsRabbitTemplate.ConfirmCallback{//配置回调的方法@AutowiredprivateRabbitTemplaterabbitTemplate;//配置在当前对象注入之后,再设置当前对象到RabbitTemplate对象中@PostConstruct//注解作用: 在当前对象初始化完毕之后执行的方法publicvoidinitRabbittemplate(){rabbitTemplate.setConfirmCallback(this::confirm);}/** * 不论是否进入交换机,都会回调当前方法 * @param correlationData 消息投递封装对象 * @param ack 是否投递成功 * @param exception 如果错误,错误原因 */@Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringexception){if(ack){System.out.println("消息进入了交换机成功{}");}else{System.out.println("消息进入了交换机失败{} 原因:"+exception);}}}
生产者回退模式
application.properties
# 配置开启生产者的回退模式spring.rabbitmq.publisher-returns=true
MessageConfirmRallback.java
packagecom.cubemall.controller;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;/*发送消息回调确认类:消息如果没有进入交换机,会回调当前类中的confirm */@ComponentpublicclassMessageConfirmRallbackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{//配置回调的方法@AutowiredprivateRabbitTemplaterabbitTemplate;//配置在当前对象注入之后,再设置当前对象到RabbitTemplate对象中@PostConstruct//注解作用: 在当前对象初始化完毕之后执行的方法publicvoidinitRabbittemplate(){rabbitTemplate.setConfirmCallback(this::confirm);rabbitTemplate.setReturnCallback(this::returnedMessage);}/** * 不论是否进入交换机,都会回调当前方法 * @param correlationData 消息投递封装对象 * @param ack 是否投递成功 * @param exception 如果错误,错误原因 */@Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringexception){if(ack){System.out.println("消息进入了交换机成功{}");}else{System.out.println("消息进入了交换机失败{} 原因:"+exception);}}/** * 消息从交换机进入队列失败回调方法:只会在失败的情况下 * @param message the returned message. * @param replyCode the reply code. * @param replyText the reply text. * @param exchange the exchange. * @param routingKey the routing key. */@OverridepublicvoidreturnedMessage(Messagemessage,intreplyCode,StringreplyText,Stringexchange,StringroutingKey){System.out.println("消息从交换机进入队列失败: >>>>>>>");System.out.println("exchange = "+exchange);System.out.println("replyCode = "+replyCode);System.out.println("replyText = "+replyText);System.out.println("routingKey = "+routingKey);}}
MessageController.java
packagecom.cubemall.controller;/*目标: 搭建RabbitMQ高级特性演示环境1.搭建消费者工程[复用之前工程]2.搭建提供者工程[复用之前工程]3.编写MessageController: 用来发送消息交换机路由键消息内容4.RabbitMQ配置交换机和队列,及路由键5.编写消费者监听思考问题: 生产者能百分之百将消息发送给消息队列吗?不确定的1.生产者如果发消息给MQ,消息在传输的过程中可能丢失。文章目录
- 生产者确认模式
- application.properties
- MessageController.java
- MessageConfirmRallback.java
- 生产者回退模式
- application.properties
- MessageConfirmRallback.java
- MessageController.java
- 消费者手动确认
- application.properties
- ConsumerAckQueueListener.java
- 死信队列
- 延时队列
测试链接 http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingKey=order.A&msg=aaa
思考问题: 生产者能百分之百将消息发送给消息队列吗?
不确定的
1.生产者如果发消息给MQ,消息在传输的过程中可能丢失。找不到交换机2.交换机路由到队列,也存在丢失消息的可能性问题解决方案:1.生产者确认模式2.生产者回退模式目标: 演示生产者确认的效果,消息百分百进入交换机实现步骤:1.配置开启生产者确认模式2.编写生产者确认回调方法,处理业务逻辑3.在RabbitMQ模板对象中,设置回调逻辑4.测试请求一下目标2: 消息能够从交换机百分百进入到队列实现步骤:1.配置开启生产者回退模式2.编写生产者回退的回调方法3.设置回退回调方法4.测试 */importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassMessageController{//发送消息接口@AutowiredprivateRabbitTemplaterabbitTemplate;//定义发送消息的接口@RequestMapping("/direct/sendMsg")publicStringsendMsgtoMQ(Stringexchange,StringroutingKey,Stringmsg){rabbitTemplate.convertAndSend(exchange,routingKey,msg);return"已投递";}}
消费者手动确认
application.properties
# 配置开启消费端手动签收spring.rabbitmq.listener.simple.acknowledge-mode=manualspring.rabbitmq.listener.direct.acknowledge-mode=manual# 配置开启重试spring.rabbitmq.listener.direct.retry.enabled=true
ConsumerAckQueueListener.java
packagecom.cubemall.listeners;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.IOException;/*消费者消息队列监听器问题: 消费者能不能百分百接收到请求,而且业务逻辑处理出现异常,消息还能不能算接收到呢?目标: 演示消费者手动确认的过程实现步骤:1.编写监听器类 和对于监听的方法,编写手动签收的业务逻辑2.配置开启手动签收3.测试 */@Component@RabbitListener(queues ="order.A")publicclassConsumerAckQueueListener{//处理消息方法@RabbitHandlerpublicvoidsimpleHandler(Stringmsg,Messagemessage,Channelchannel)throwsIOException{System.out.println("下单消息{},内容为: "+msg);//获取消息的投递标签longdeliveryTag =message.getMessageProperties().getDeliveryTag();try{if(msg.contains("苹果")){thrownewRuntimeException("不允许售卖苹果手机");}//签收消息/** * 参数1: 投递标签 * 参数2: 是否是批量签收,true一次性签收所有消息,如果是false则只签收当前消息 */channel.basicAck(deliveryTag,false);System.out.println("签收成功{}");}catch(IOExceptione){//e.printStackTrace();//参数1: 投递标签//参数2: 是否批量//参数3: 是否重回队列channel.basicNack(deliveryTag,false,true);System.out.println("签收失败{}");}//拒绝签收消息: 出现异常了,拒绝签收}}
死信队列

延时队列

消费者消息队列监听器
问题1: 消费者能不能百分百接收到请求,而且业务逻辑处理出现异常,消息还能不能算接收到呢?
目标: 演示消费者手动确认的过程
实现步骤:
1.编写监听器类 和对于监听的方法,编写手动签收的业务逻辑
2.配置开启手动签收
3.测试
问题2: 消息在队列中,如果没有被消费者消费?
TTL–> Time to Live (存活时间/有效期)
目标: 演示消息队列中消息失效超时过程
步骤:
1.配置新的队列order.B,设置队列内消息的超时时间5s x-message-ttl
2.将队列绑定order_exchange交换机上
3.发送消息,测试
问题3: 消息发送失败了,消息丢失了?消息有效期到了
死信队列: 当消息失效了,统一进入的一个队列,这个队列称之为死信队列
主要有三种情况:
1.到达了消息队列容量上限!
2.消费者如果拒绝签收,不重回队列!
3.消息超时了!
目标: 演示成为死信的过程
步骤:
1.建立死信队列deadQueue
2.建立死信交换机deadExchange
3.死信队列绑定死信交换机:order.dead
4.队列order.B绑定死信交换机 x-dead-letter-exchange x-dead-letter-routing-key
5.向队列发送消息,测试死信交换机
需求: 1.新用户注册成功7天后,发送消息问候?
2.下单后,30分钟未支付,取消订单,回滚票
延迟队列: 消息进入队列后不会被消费,只有到达指定的时间后才会被消费!
找不到交换机
2.交换机路由到队列,也存在丢失消息的可能性问题解决方案:
1.生产者确认模式
2.生产者回退模式
目标: 演示生产者确认的效果,消息百分百进入交换机
实现步骤:
1.配置开启生产者确认模式
2.编写生产者确认回调方法,处理业务逻辑
3.在RabbitMQ模板对象中,设置回调逻辑
4.测试请求一下
目标2: 消息能够从交换机百分百进入到队列
实现步骤:
1.配置开启生产者回退模式
2.编写生产者回退的回调方法
3.设置回退回调方法
4.测试
生产者确认模式
application.properties
# 配置开启生产者确认模式spring.rabbitmq.publisher-confirms=true
MessageController.java
packagecom.cubemall.controller;/*目标: 搭建RabbitMQ高级特性演示环境1.搭建消费者工程[复用之前工程]2.搭建提供者工程[复用之前工程]3.编写MessageController: 用来发送消息交换机路由键消息内容4.RabbitMQ配置交换机和队列,及路由键5.编写消费者监听思考问题: 生产者能百分之百将消息发送给消息队列吗?不确定的1.生产者如果发消息给MQ,消息在传输的过程中可能丢失。