修改消息内容等)

发布时间:2025-06-24 17:43:53  作者:北方职教升学中心  阅读量:054


MessagePostProcessor 

MessagePostProcessor 是 Spring AMQP 提供的一个接口,用于在消息发送到 RabbitMQ 之前或从 RabbitMQ 消费之后对消息进行后处理。


2. MessagePostProcessor 的接口定义

MessagePostProcessor 是一个函数式接口,只有一个方法 postProcessMessage

@FunctionalInterfacepublic interface MessagePostProcessor {    Message postProcessMessage(Message message) throws AmqpException;}
  • 参数message 是需要处理的消息对象。

  • 在消息处理后,清理上下文或执行后续操作。它的主要作用包括:

    • 配置消息监听器的连接工厂(ConnectionFactory)。

五、

  • 设置消息转换器(MessageConverter),用于将 RabbitMQ 的消息体转换为 Java 对象。

  • 通过 factory.setAdviceChain(userContextMessageAdvice),将拦截器添加到监听器容器的拦截器链中。

  • (3) UserContextMessageAdvice
    • UserContextMessageAdvice 是一个拦截器(Advice),用于在消息处理前后执行自定义逻辑。

    • 通过 factory.setConnectionFactory(connectionFactory),将连接工厂注入到监听器容器中,确保监听器能够与 RabbitMQ 建立连接。

    • 添加拦截器链(AdviceChain),用于在消息处理前后执行自定义逻辑。SimpleRabbitListenerContainerFactory 

      1.SimpleRabbitListenerContainerFactory 的作用

      SimpleRabbitListenerContainerFactory 是 Spring AMQP 提供的一个工厂类,用于创建和管理 RabbitMQ 消息监听器的容器(SimpleMessageListenerContainer)。过期时间、如果不需要修改消息,可以直接返回原始消息。问题引入

      某些业务中,需要根据登录用户信息处理业务,而基于MQ的异步调用并不会传递登录用户信息。

    • 通过 factory.setMessageConverter(messageConverter),为监听器容器设置消息转换器,使得监听器可以直接处理 Java 对象,而不需要手动解析消息体。

    • 设置消息转换器(MessageConverter),用于将 RabbitMQ 的消息体转换为 Java 对象。

    (2) MessageConverter
    • MessageConverter 用于将 RabbitMQ 的消息体(通常是字节数组)转换为 Java 对象,或者将 Java 对象转换为消息体。SimpleRabbitListenerContainerFactory 

      1.SimpleRabbitListenerContainerFactory 的作用

      SimpleRabbitListenerContainerFactory 是 Spring AMQP 提供的一个工厂类,用于创建和管理 RabbitMQ 消息监听器的容器(SimpleMessageListenerContainer)。

      @Configuration@RequiredArgsConstructor@Slf4jpublic class RabbitMQConfig {    private final MessageConverter messageConverter;    private final RabbitTemplateConfigurer configurer;    private final ConnectionFactory connectionFactory;    @Bean    public RabbitTemplate rabbitTemplate() {        RabbitTemplate rabbitTemplate = new RabbitTemplate();        configurer.configure(rabbitTemplate, connectionFactory);        rabbitTemplate.setMessageConverter(messageConverter);        return rabbitTemplate;    }    public MethodInterceptor userContextMessageAdvice() {        return (invocation) -> {            Message message = null;            // 获取Message参数            for (Object arg : invocation.getArguments()) {                if (arg instanceof Message) {                    message = (Message) arg;                    break;                }            }            try {                if (message != null) {                    MessageProperties messageProperties = message.getMessageProperties();                    // 从消息头中获取用户信息                    User user = JSONUtil.toBean(messageProperties.getHeader(MqConstants.USER_INFO).toString(), User.class);                    String messageId = messageProperties.getMessageId();                    log.info("Message ID: {},User:{}", messageId, user);                    // 设置用户上下文                    if (user != null) {                        UserContext.setUser(user);                    }                }                // 执行实际的消息处理                return invocation.proceed();            } finally {                // 清理用户上下文                UserContext.clear();            }        };    }    @Bean    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        factory.setConnectionFactory(connectionFactory);        factory.setMessageConverter(messageConverter);        factory.setAdviceChain(userContextMessageAdvice());        return factory;    }}

    它的主要作用是对消息的属性、头信息或内容进行修改或增强。

    (3) UserContextMessageAdvice
    • UserContextMessageAdvice 是一个拦截器(Advice),用于在消息处理前后执行自定义逻辑。

    • 拦截器可以用于实现以下功能:

      • 在消息处理前,记录日志或设置上下文信息(例如用户身份信息)。

      • 拦截器可以用于实现以下功能:

        • 在消息处理前,记录日志或设置上下文信息(例如用户身份信息)。

          二、

        三、

      (2) MessageConverter
      • MessageConverter 用于将 RabbitMQ 的消息体(通常是字节数组)转换为 Java 对象,或者将 Java 对象转换为消息体。它的主要作用包括:

        • 配置消息监听器的连接工厂(ConnectionFactory)。


        2. 代码解析

        @Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(        ConnectionFactory connectionFactory,         MessageConverter messageConverter,         UserContextMessageAdvice userContextMessageAdvice) {    // 创建 SimpleRabbitListenerContainerFactory 实例    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        // 设置连接工厂    factory.setConnectionFactory(connectionFactory);        // 设置消息转换器    factory.setMessageConverter(messageConverter);        // 设置拦截器链    factory.setAdviceChain(userContextMessageAdvice);        return factory;}
        (1) ConnectionFactory
        • ConnectionFactory 是 RabbitMQ 的连接工厂,用于创建与 RabbitMQ 服务器的连接。

        • 消息消费后的处理:在消息从 RabbitMQ 消费之后,对消息进行额外的处理(例如记录日志、

        • 在消息处理后,清理上下文或执行后续操作。


          1. MessagePostProcessor 的作用

          MessagePostProcessor 允许你在消息的生命周期中插入自定义逻辑,通常用于以下场景:

          • 消息发送前的处理:在消息发送到 RabbitMQ 之前,修改消息的属性(例如设置消息头、

          • 通过 factory.setAdviceChain(userContextMessageAdvice),将拦截器添加到监听器容器的拦截器链中。

          • 返回值:返回处理后的消息对象。

          • 通过 factory.setMessageConverter(messageConverter),为监听器容器设置消息转换器,使得监听器可以直接处理 Java 对象,而不需要手动解析消息体。


          2. 代码解析

          @Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(        ConnectionFactory connectionFactory,         MessageConverter messageConverter,         UserContextMessageAdvice userContextMessageAdvice) {    // 创建 SimpleRabbitListenerContainerFactory 实例    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        // 设置连接工厂    factory.setConnectionFactory(connectionFactory);        // 设置消息转换器    factory.setMessageConverter(messageConverter);        // 设置拦截器链    factory.setAdviceChain(userContextMessageAdvice);        return factory;}
          (1) ConnectionFactory
          • ConnectionFactory 是 RabbitMQ 的连接工厂,用于创建与 RabbitMQ 服务器的连接。解决方案

            生产者

            首先在生产者服务中实现MessagePostProcessor接口,实现postProcessMessage方法,在该方法中向消息头部设置用户信息。

        四、

      • 添加拦截器链(AdviceChain),用于在消息处理前后执行自定义逻辑。

        并通过setBeforePublishPostProcessors为rabbitmq设置MessagePostProcessor

        @Configuration@Slf4j@RequiredArgsConstructorpublic class RabbitMQConfig {    public MessageConverter messageConverter() {        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();        jackson2JsonMessageConverter.setCreateMessageIds(true);        return jackson2JsonMessageConverter;    }    public MessagePostProcessor messagePostProcessor() {        return message -> {            User currentUser = UserContext.getUser();            String messageId = message.getMessageProperties().getMessageId();            if (currentUser != null) {                message.getMessageProperties().setHeader(MqConstants.USER_INFO, JSONUtil.toJsonStr(currentUser));            }            log.info("Message ID: {},User:{}", messageId, currentUser);            return message;};    }    @Bean    public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {        RabbitTemplate rabbitTemplate = new RabbitTemplate();        configurer.configure(rabbitTemplate, connectionFactory);        rabbitTemplate.setBeforePublishPostProcessors(messagePostProcessor());        rabbitTemplate.setMessageConverter(messageConverter());        return rabbitTemplate;    }}

        消费者

        实现MethodInterceptor接口,在方法内从消息头部获取用户信息,将其注入UserContext,通过SimpleRabbitListenerContainerFactory添加拦截器链,将我们自己的拦截器加入。优先级等)。

        一、前面我们的做法比较麻烦,至少要做两件事:

        • 消息发送者在消息体中传递登录用户

        • 消费者获取消息体中的登录用户,处理业务

        这样做不仅麻烦,而且编程体验也不统一。

      • 通过 factory.setConnectionFactory(connectionFactory),将连接工厂注入到监听器容器中,确保监听器能够与 RabbitMQ 建立连接。修改消息内容等)。