发布时间:2025-06-24 18:18:55  作者:北方职教升学中心  阅读量:124


文章目录

    • 1.环境搭建
        • 1.common-rabbitmq-starter 配置防止消费者抢消息(基础配置)
        • 2.common-rabbitmq-starter-demo下创建一个生产者一个消费者
    • 2.生产者可靠性
        • 1.开启消息超时重试机制
        • 2.生产者开启ConfirmCallback消息确认机制
          • 1.application.yml
          • 2.TestConfigPublisher.java
          • 3.测试交换机名字写错的情况
    • 3.MQ可靠性
        • 1.使用LazyQueue和持久化队列结合的方式来做
    • 4.消费者可靠性
        • 1.消费者失败重试机制
          • 1.application.yml
          • 2.解释
        • 2.消费者消息失败处理策略
          • 1.ErrorConfiguration.java 指定错误消息发送到异常交换机
          • 2.ErrorListener.java 异常队列监听器
          • 3.ErrorMessageHandler.java 异常消息处理器
          • 4.TestConfig.java配置
          • 5.TestConfigPublisher.java 生产者
          • 6.TestConfigConsumer.java 消费者故意消费失败
          • 7.测试,消费失败则重试三次后到异常处理逻辑

1.环境搭建

1.common-rabbitmq-starter 配置防止消费者抢消息(基础配置)
spring:rabbitmq:# 消费者配置listener:simple:prefetch:1# 每次获取一条消息,处理完再获取下一条
2.common-rabbitmq-starter-demo下创建一个生产者一个消费者

CleanShot 2024-12-31 at 21.59.36@2x

2.生产者可靠性

1.开启消息超时重试机制
# 生产者消息重试配置template:retry:# 启用消息重试机制,默认为 falseenabled:true# 初始重试间隔时间为一秒initial-interval:1000ms        # 重试最大次数,默认为 3 次max-attempts:2# 重试的间隔倍数# 配置 2 的话,第一次等initial-interval也就是1s,第二次等 2s,第三次等 4smultiplier:2connection-timeout:500ms # 连接超时时间500ms
2.生产者开启ConfirmCallback消息确认机制
1.application.yml
# 生产者配置publisher-confirm-type: correlated # 发布确认类型为异步回调(一旦配置了,就必须要有回调方法)
2.TestConfigPublisher.java
packagecom.sunxiansheng.publisher.pub;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.util.concurrent.ListenableFutureCallback;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjavax.annotation.Resource;importjava.util.UUID;/** * Description: 测试发布者 * * @Author sun * @Create 2024/12/31 19:05 * @Version 1.0 */@RestController@Slf4jpublicclassTestConfigPublisher{@ResourceprivateRabbitTemplaterabbitTemplate;@RequestMapping("/send")publicvoidsend(){log.info("发送消息");// 1.创建CorrelationData对象CorrelationDatacd =newCorrelationData(UUID.randomUUID().toString());// 2.设置回调cd.getFuture().addCallback(newListenableFutureCallback<CorrelationData.Confirm>(){@OverridepublicvoidonFailure(Throwablethrowable){// 基本不可能发生,因为这里的异常不是MQ问题导致的log.error("ConfirmCallback:消息发送失败(非MQ问题):{}",throwable.getMessage());}@OverridepublicvoidonSuccess(CorrelationData.Confirmconfirm){// 判断是否发送成功if(confirm.isAck()){log.info("ConfirmCallback:消息发送成功:{}",confirm);}else{log.error("ConfirmCallback:消息发送失败:{}",confirm.getReason());}}});rabbitTemplate.convertAndSend("fanout.exchange.tesst","","hello rabbitmq",cd);}}
3.测试交换机名字写错的情况

CleanShot 2024-12-31 at 19.57.56@2x

3.MQ可靠性

1.使用LazyQueue和持久化队列结合的方式来做
/**     * 创建一个队列     *     * @return     */@BeanpublicQueuefanoutQueueTest(){returnQueueBuilder.durable("lazyQueue")// 持久化队列.lazy()// 惰性队列.build();}

持久化队列可以保存队列的元数据,重启后自动恢复,惰性队列可以将所有的消息都持久化到磁盘,内存只保留最近的2048条消息

4.消费者可靠性

1.消费者失败重试机制
1.application.yml
# 消费者配置listener:simple:acknowledge-mode:auto # 自动确认模式(消费者确认机制)retry:enabled:true# 开启重试机制max-attempts:3# 最大重试次数initial-interval:1000ms # 重试间隔时间multiplier:1.0# 重试时间间隔倍数stateless:false# false:有状态,true:无状态,如果是有状态的,每次重试都会发送到同一个队列
2.解释

首先开启了消费者自动确认机制,如果消息消费失败,就进行重试

2.消费者消息失败处理策略
1.ErrorConfiguration.java 指定错误消息发送到异常交换机
packagecom.sunxiansheng.rabbitmq.error;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.retry.MessageRecoverer;importorg.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;importorg.springframework.boot.autoconfigure.condition.ConditionalOnProperty;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/** * Description: 处理失败消息的交换机和队列 * * @Author sun * @Create 2024/12/31 19:07 * @Version 1.0 */@Configuration// 当配置文件中spring.rabbitmq.listener.simple.retry.enabled=true时,才会生效@ConditionalOnProperty(prefix ="spring.rabbitmq.listener.simple",name ="retry.enabled",havingValue ="true")publicclassErrorConfiguration{/**     * 一个error交换机     */@BeanpublicDirectExchangeerrorExchange(){returnnewDirectExchange("error.exchange");}/**     * 一个error队列     */@BeanpublicQueueerrorQueue(){returnnewQueue("error.queue");}/**     * 绑定error队列到error交换机     */@BeanpublicBindingerrorBinding(){returnBindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}/**     * MessageRecoverer     */@BeanpublicMessageRecoverermyMessageRecoverer(RabbitTemplaterabbitTemplate){// 指定错误消息发送到error.exchange交换机,routingKey为errorreturnnewRepublishMessageRecoverer(rabbitTemplate,"error.exchange","error");}}
2.ErrorListener.java 异常队列监听器
packagecom.sunxiansheng.consumer.error;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/** * Description: 错误消息监听器 * * @Author sun * @Create 2024/12/31 20:32 * @Version 1.0 */@Component@Slf4jpublicclassErrorListener{@RabbitListener(queues ="error.queue")publicvoiderrorListener(Messagemessage){// 解析错误信息ErrorMessageHandler.handleErrorMessage("error.queue",message);}}
3.ErrorMessageHandler.java 异常消息处理器
packagecom.sunxiansheng.consumer.error;importcom.rabbitmq.client.LongString;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageProperties;importjava.nio.charset.StandardCharsets;importjava.util.Map;/** * Description: 错误消息处理器 * * @Author sun * @Create 2024/12/31 20:32 * @Version 1.0 */@Slf4jpublicclassErrorMessageHandler{publicstaticvoidhandleErrorMessage(StringlistenerName,Messagemessage){// 获取消息属性MessagePropertiesmessageProperties =message.getMessageProperties();StringmessageBody =newString(message.getBody(),StandardCharsets.UTF_8);Map<String,Object>headers =messageProperties.getHeaders();// 从消息头部获取异常信息StringexceptionMessage =(String)headers.get("x-exception-message");StringoriginalExchange =(String)headers.get("x-original-exchange");StringoriginalRoutingKey =(String)headers.get("x-original-routingKey");// 处理LongString类型的异常堆栈跟踪信息StringexceptionStackTrace =null;if(headers.containsKey("x-exception-stacktrace")){ObjectstacktraceObject =headers.get("x-exception-stacktrace");if(stacktraceObject instanceofLongString){exceptionStackTrace =stacktraceObject.toString();}}// 格式化输出所有信息,并在前后添加分割线log.error("\n-------------------------------\n"+"MQ错误监听队列: {}\n"+"原始交换机: {}\n"+"原始路由键: {}\n"+"原始信息: {}\n"+"异常信息: {}\n"+"异常堆栈: {}n"+"-------------------------------",listenerName,originalExchange,originalRoutingKey,messageBody,exceptionMessage,exceptionStackTrace);}}
4.TestConfig.java配置
packagecom.sunxiansheng.publisher.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/** * Description: 测试配置类 * * @Author sun * @Create 2024/12/31 19:00 * @Version 1.0 */@ConfigurationpublicclassTestConfig{/**     * 创建一个fanout类型的交换机     *     * @return     */@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanout.exchange.test");}/**     * 创建一个队列     *     * @return     */@BeanpublicQueuefanoutQueueTest(){returnQueueBuilder.durable("lazyQueue")// 持久化队列.lazy()// 惰性队列.build();}/**     * 交换机和队列绑定     */@BeanpublicBindingbinding(){returnBindingBuilder.bind(fanoutQueueTest()).to(fanoutExchange());}}
5.TestConfigPublisher.java 生产者
packagecom.sunxiansheng.publisher.pub;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.util.concurrent.ListenableFutureCallback;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjavax.annotation.Resource;importjava.util.UUID;/** * Description: 测试发布者 * * @Author sun * @Create 2024/12/31 19:05 * @Version 1.0 */@RestController@Slf4jpublicclassTestConfigPublisher{@ResourceprivateRabbitTemplaterabbitTemplate;@RequestMapping("/send")publicvoidsend(){log.info("发送消息");// 1.创建CorrelationData对象CorrelationDatacd =newCorrelationData(UUID.randomUUID().toString());// 2.设置回调cd.getFuture().addCallback(newListenableFutureCallback<CorrelationData.Confirm>(){@OverridepublicvoidonFailure(Throwablethrowable){// 基本不可能发生,因为这里的异常不是MQ问题导致的log.error("ConfirmCallback:消息发送失败(非MQ问题):{}",throwable.getMessage());}@OverridepublicvoidonSuccess(CorrelationData.Confirmconfirm){// 判断是否发送成功if(confirm.isAck()){log.info("ConfirmCallback:消息发送成功:{}",confirm);}else{log.error("ConfirmCallback:消息发送失败:{}",confirm.getReason());}}});rabbitTemplate.convertAndSend("fanout.exchange.test","","hello rabbitmq",cd);}}
6.TestConfigConsumer.java 消费者故意消费失败
packagecom.sunxiansheng.consumer.con;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/** * Description: 测试消费者 * * @Author sun * @Create 2024/12/31 19:03 * @Version 1.0 */@Component@Slf4jpublicclassTestConfigConsumer{@RabbitListener(queues ="fanout.queue.test")publicvoidreceive(Stringmessage){log.info("接收到的消息:{}",message);inti =1/0;}}
7.测试,消费失败则重试三次后到异常处理逻辑

CleanShot 2024-12-31 at 22.07.15@2x