修改消息内容等)
发布时间:2025-06-24 17:43:53 作者:北方职教升学中心 阅读量:054
MessagePostProcessor
MessagePostProcessor
是 Spring AMQP 提供的一个接口,用于在消息发送到 RabbitMQ 之前或从 RabbitMQ 消费之后对消息进行后处理。
2. MessagePostProcessor
的接口定义
MessagePostProcessor
是一个函数式接口,只有一个方法 postProcessMessage
:
@FunctionalInterfacepublic interface MessagePostProcessor { Message postProcessMessage(Message message) throws AmqpException;}
参数:
message
是需要处理的消息对象。在消息处理后,清理上下文或执行后续操作。它的主要作用包括:
配置消息监听器的连接工厂(
ConnectionFactory
)。
五、
设置消息转换器(MessageConverter
),用于将 RabbitMQ 的消息体转换为 Java 对象。
通过 factory.setAdviceChain(userContextMessageAdvice)
,将拦截器添加到监听器容器的拦截器链中。
(3) UserContextMessageAdvice
UserContextMessageAdvice
是一个拦截器(Advice),用于在消息处理前后执行自定义逻辑。通过
factory.setConnectionFactory(connectionFactory)
,将连接工厂注入到监听器容器中,确保监听器能够与 RabbitMQ 建立连接。添加拦截器链(
AdviceChain
),用于在消息处理前后执行自定义逻辑。SimpleRabbitListenerContainerFactory
1.SimpleRabbitListenerContainerFactory
的作用SimpleRabbitListenerContainerFactory
是 Spring AMQP 提供的一个工厂类,用于创建和管理 RabbitMQ 消息监听器的容器(SimpleMessageListenerContainer
)。过期时间、如果不需要修改消息,可以直接返回原始消息。问题引入某些业务中,需要根据登录用户信息处理业务,而基于MQ的异步调用并不会传递登录用户信息。
通过
factory.setMessageConverter(messageConverter)
,为监听器容器设置消息转换器,使得监听器可以直接处理 Java 对象,而不需要手动解析消息体。设置消息转换器(
MessageConverter
),用于将 RabbitMQ 的消息体转换为 Java 对象。
(2) MessageConverter
MessageConverter
用于将 RabbitMQ 的消息体(通常是字节数组)转换为 Java 对象,或者将 Java 对象转换为消息体。SimpleRabbitListenerContainerFactory
1.SimpleRabbitListenerContainerFactory
的作用SimpleRabbitListenerContainerFactory
是 Spring AMQP 提供的一个工厂类,用于创建和管理 RabbitMQ 消息监听器的容器(SimpleMessageListenerContainer
)。@Configuration@RequiredArgsConstructor@Slf4jpublic class RabbitMQConfig { private final MessageConverter messageConverter; private final RabbitTemplateConfigurer configurer; private final ConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(); configurer.configure(rabbitTemplate, connectionFactory); rabbitTemplate.setMessageConverter(messageConverter); return rabbitTemplate; } public MethodInterceptor userContextMessageAdvice() { return (invocation) -> { Message message = null; // 获取Message参数 for (Object arg : invocation.getArguments()) { if (arg instanceof Message) { message = (Message) arg; break; } } try { if (message != null) { MessageProperties messageProperties = message.getMessageProperties(); // 从消息头中获取用户信息 User user = JSONUtil.toBean(messageProperties.getHeader(MqConstants.USER_INFO).toString(), User.class); String messageId = messageProperties.getMessageId(); log.info("Message ID: {},User:{}", messageId, user); // 设置用户上下文 if (user != null) { UserContext.setUser(user); } } // 执行实际的消息处理 return invocation.proceed(); } finally { // 清理用户上下文 UserContext.clear(); } }; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(messageConverter); factory.setAdviceChain(userContextMessageAdvice()); return factory; }}