Binding 和函数式编程模型

发布时间:2025-06-24 19:47:51  作者:北方职教升学中心  阅读量:401


  • 与 Spring 生态集成:无缝集成 Spring Boot、
  • group对应 Kafka 的 Consumer Group ID

    在下一篇文章【深度 Mape 之九】中,我们将深入分布式链路追踪的世界,学习如何使用 Spring Cloud Sleuth 与 Zipkin (或 SkyWalking) 集成,为我们的微服务调用链(无论同步还是异步)添加“透视眼”,敬请期待!


    你更倾向于使用 RabbitMQ 还是 Kafka?为什么?在使用 Spring Cloud Stream 时,你遇到过哪些挑战或有趣的场景?欢迎在评论区分享你的见解!

  • MessageBuilder用于构建 Spring Messaging 的 Message对象,可以携带 Payload 和 Headers。

    摘要:在复杂的分布式系统中,同步的请求-响应模式并非万能。弹性)。本文作为系列的第八篇,将带你探索微服务架构中的另一种重要通信模式——异步通信,并重点实战如何使用 Spring Cloud Stream框架,结合主流的消息队列 (MQ) 中间件(如 RabbitMQ 或 Kafka),构建消息驱动的微服务,实现服务间的解耦、

  • 函数式编程模型 (Recommended): 这是 SCS 推荐的现代编程方式,取代了旧版的 @EnableBindingSource/Sink/Processor接口。Spring Cloud Sleuth (用于链路追踪) 等。Binding 和函数式编程模型。

  • group: 非常重要!定义了消费者组。它不接收输入,只产生输出消息。
  • Consumer<T>: 作为消费者 (Sink)。常见的 Binder 有 RabbitMQ Binder, Kafka Binder, Kafka Streams Binder 等。Broker、在添加 Cloud Stream时,必须选择一个 Binder,我们选择 RabbitMQ
  • 理解 Spring Cloud Stream 的核心概念:Binder, Binding, 以及推荐的函数式编程模型(Supplier, Consumer, Function)。

  • Binding: 应用内部逻辑(通常是一个函数 Bean)与外部消息中间件(通过 Binder)之间的桥梁

  • 测试消费者组负载均衡 (可选):

    • 不停止第一个 stream-consumer-demo实例。
    • 启动 stream-producer-demo应用 (端口 9091)。
    • 熟练在 Spring Boot 项目中引入 Spring Cloud Stream 及特定 Binder (RabbitMQ/Kafka)。
    • 三、Spring Cloud Stream 提供了一个统一的编程模型,屏蔽了底层 MQ 实现的差异,让开发者能以简洁、SCS 会将从 MQ 收到的消息传递给该 Consumer 的 accept()方法。它提供了一个统一的、
    • 消息中间件 (Broker):负责接收、
    • 添加依赖 (pom.xml)
      <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><!-- 可选 --></dependency><!-- Spring Cloud Stream Core --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><!-- RabbitMQ Binder --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency></dependencies><!-- 确保 BOM 配置正确 --><dependencyManagement>... </dependencyManagement>
    • 配置 application.yml:
      server:port:9091# 生产者端口spring:application:name:stream-producer-service  # RabbitMQ 连接配置 (如果 RabbitMQ Server 在本地且使用默认端口/用户/密码,可以省略)rabbitmq:host:localhost    port:5672username:guest    password:guest  cloud:stream:# 配置 RabbitMQ Binder (如果只有一个 Binder,通常可省略)# default-binder: rabbit# 定义 Binding (将函数 Bean 绑定到 MQ 目的地)bindings:# 绑定名为 "produceMessage" 的 Supplier Bean 的第一个输出 (out-0)# 这个名字 "produceMessage-out-0" 是根据 Supplier Bean 的名字自动生成的produceMessage-out-0:# 指定目标 Exchange 的名称 (如果不存在,Binder 会自动创建)destination:demo-exchange          # (可选) 指定 Content-Type,默认为 application/jsoncontent-type:application/json          # (可选) RabbitMQ 特有配置,如 routing key (生产者通常不指定 group)# producer:#   routing-key-expression: "'myRoutingKey'" # SpEL 表达式
    • 创建生产者 (SupplierBean):
      packagecom.example.streamproducerdemo;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.messaging.Message;// Spring Messaging APIimportorg.springframework.messaging.support.MessageBuilder;importreactor.core.publisher.Flux;// 使用 Reactor 进行反应式发送 (可选)importreactor.core.publisher.Sinks;importjava.time.LocalDateTime;importjava.util.function.Supplier;// 核心函数接口@ConfigurationpublicclassMessageProducerConfig{privatestaticfinalLoggerlog =LoggerFactory.getLogger(MessageProducerConfig.class);// 创建一个 Sinks.Many,用于从外部触发消息发送 (例如通过 REST API)// 这是反应式编程的方式,更灵活privateSinks.Many<Message<String>>messageSink =Sinks.many().unicast().onBackpressureBuffer();// !! 定义 Supplier Bean !!// Bean 的名称 "produceMessage" 将用于配置绑定 (produceMessage-out-0)// 返回值类型 Flux<Message<String>> 表示这是一个持续产生消息的源@BeanpublicSupplier<Flux<Message<String>>>produceMessage(){return()->messageSink.asFlux().doOnNext(msg ->log.info("Sending message: {}",newString(msg.getPayload().getBytes()))).doOnError(e ->log.error("Error sending message",e));}// 提供一个方法供外部调用来发送消息 (例如在 Controller 中注入调用)publicvoidsendMessage(Stringpayload){StringmessageToSend =payload +" at "+LocalDateTime.now();// 使用 MessageBuilder 构建消息,可以添加 Header 等Message<String>message =MessageBuilder.withPayload(messageToSend)// .setHeader("myHeader", "myValue").build();// 发射消息到 SinkmessageSink.emitNext(message,Sinks.EmitFailureHandler.FAIL_FAST);log.info("Message emitted to sink: {}",messageToSend);}}
      • 我们使用 Reactor 的 Sinks来创建一个可以从外部触发的事件源,这样更灵活。更换 MQ 只需更换 Binder 依赖和配置。内容类型协商等。
      • 简化的配置:通过 Spring Boot 的自动配置和属性绑定,可以方便地配置 Binder 连接、它接收输入消息,进行处理,并产生输出消息。

        1. Binder: 连接应用与消息中间件的适配器。提高吞吐量、

      (D) 使用 Kafka Binder (简要说明)

      如果想使用 Kafka:

      1. 依赖:将 pom.xml中的 spring-cloud-stream-binder-rabbit替换为 spring-cloud-stream-binder-kafka
      2. 掌握了 Spring Cloud Stream 抽象层带来的好处和核心概念。
      3. 发送消息:访问生产者的 Controller 端点 http://localhost:9091/send/HelloStream

    生产者和消费者的 Java 代码(Supplier/ConsumerBean)完全不需要改变,这就是 Spring Cloud Stream 的威力!

    五、
  • 配置 application.yml:
    server:port:9092# 消费者端口spring:application:name:stream-consumer-service  rabbitmq:# RabbitMQ 连接信息host:localhost    port:5672username:guest    password:guest  cloud:stream:bindings:# 绑定名为 "consumeMessage" 的 Consumer Bean 的第一个输入 (in-0)consumeMessage-in-0:# 指定要消费的 Exchange/Topic (必须与生产者配置的 destination 匹配)destination:demo-exchange          # !! 指定消费者组 !!# 同一组内的消费者实例会负载均衡消费消息 (Queue 模式)# 不同组的消费者实例会各自收到一份完整的消息 (发布/订阅模式)group:demo-consumer-group-1# (可选) 指定 Content-Type,需要与生产者匹配content-type:application/json          # (可选) RabbitMQ 特有配置,如绑定 Queue 的 routing key# consumer:#   binding-routing-key: "myRoutingKey"
    • destination: 必须与生产者发送到的目的地一致。存储和转发消息的中间服务(如 RabbitMQ Server, Kafka Cluster)。
  • 创建消费者 (ConsumerBean):
    packagecom.example.streamconsumerdemo;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.messaging.Message;importjava.util.function.Consumer;// 核心函数接口@ConfigurationpublicclassMessageConsumerConfig{privatestaticfinalLoggerlog =LoggerFactory.getLogger(MessageConsumerConfig.class);// !! 定义 Consumer Bean !!// Bean 的名称 "consumeMessage" 用于配置绑定 (consumeMessage-in-0)// 泛型类型 Message<String> 表示接收完整的消息对象 (包含 Payload 和 Headers)// 也可以直接用 String 接收 Payload@BeanpublicConsumer<Message<String>>consumeMessage(){returnmessage ->{Stringpayload =message.getPayload();// Map<String, Object> headers = message.getHeaders();log.info("Received message payload: {}",payload);// 在这里编写实际的消息处理逻辑...// log.info("Received headers: {}", headers);};}/*    // 或者直接消费 Payload:    @Bean    public Consumer<String> consumeMessagePayload() {        return payload -> {            log.info("Received payload directly: {}", payload);            // 处理逻辑...};    }    */}
  • © 运行与测试

    1. 确保 RabbitMQ Server 正在运行(可以通过 Docker 快速启动: docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management)。
    2. 多次访问生产者的 /send/{payload}接口发送消息。
    3. 难以应对峰值流量:突发请求可能直接压垮后端服务。然而,并非所有的服务交互都适合或需要同步进行。
    4. 约定优于配置:Spring Cloud Stream 会根据这些函数 Bean 的名称泛型类型来自动推断 Binding。基于 Spring Boot 的编程模型,用于构建消息驱动的微服务,同时屏蔽了底层消息中间件的实现细节。消费者组等。然而,当一个请求流经多个服务,其中可能包含同步调用(如 OpenFeign)和异步调用(如 Stream),如何追踪整个请求的完整链路,以便在出现问题时进行故障定位和性能分析,就变得至关重要。 Spring Cloud Stream 核心概念

    理解 SCS 的关键在于 Binder、


    本文目标

    • 理解同步通信与异步通信的优缺点,以及异步消息在微服务中的核心价值(解耦、
    • 了解消息队列 (MQ) 的基本概念(生产者、
    • 队列 (Queue) / 主题 (Topic):消息存储的逻辑单元。服务可以独立演进和部署。独立(广播)的消费者。概念和配置上都有差异。
    • Function<T, R>: 作为处理器 (Processor)。Kafka 的消费者组机制天然支持负载均衡(一个 Partition 只会被组内一个 Consumer 消费)。
    • 配置 application.yml:
      spring:# ... application name ...# Kafka 连接配置kafka:bootstrap-servers:localhost:9092# Kafka Broker 地址# 其他 Kafka producer/consumer 配置...cloud:stream:# kafka: # Kafka Binder 特定配置 (可选)#   binder:#     brokers: ${spring.kafka.bootstrap-servers}bindings:produceMessage-out-0:destination:demo-topic # Kafka Topic 名称content-type:application/json          # producer: # Kafka Producer 特定配置 (可选)#   partition-key-expression: headers['partitionKey']consumeMessage-in-0:destination:demo-topic # Kafka Topic 名称group:demo-kafka-group-1# Kafka Consumer Group IDcontent-type:application/json          # consumer: # Kafka Consumer 特定配置 (可选)#   concurrency: 3 # 并发消费线程数
      • 配置 spring.kafka.bootstrap-servers

        核心优势:

        1. 统一编程模型:无论是使用 RabbitMQ 还是 Kafka,开发者都使用相同的注解和接口(主要是 java.util.function)来编写消息的生产和消费逻辑。
        2. 削峰填谷 (Peak Shaving):MQ 作为缓冲区,可以平滑处理突发流量。

    因此,在需要解耦、

  • 可用性降低:一个服务的暂时不可用或缓慢会直接影响调用方。
  • 通过 SupplierConsumerBean 编写了消息处理逻辑。一致的方式构建事件驱动的微服务。
  • 优点
    • 解耦 (Decoupling):生产者和消费者无需知道对方的存在,只需与 MQ 交互。
    • 优点:实时性高,交互直接,适用于需要立即得到结果的场景(如查询数据)。

      二、

  • 异步通信 (Asynchronous Communication),通常借助消息队列 (Message Queue, MQ)实现:

    • 模式:生产者将消息发送到 MQ 中间件,然后立即返回,无需等待消费者处理。
    • 异步处理 (Asynchronous Processing):对于非实时性要求高的任务(如发送邮件、

      Spring Cloud Stream (SCS)就是为了解决这个问题而生的框架。它接收输入消息进行处理,没有返回值。本文将阐述异步通信的价值,介绍 Spring Cloud Stream 的核心概念(Binder, Binding, Functional Programming Model),并通过实战演示如何使用其与 RabbitMQ (或 Kafka) 集成,轻松实现消息的生产和消费,以及消费者分组带来的负载均衡效果。

    • 最终一致性 (Eventual Consistency):适用于可以接受数据短暂不一致,但最终会达到一致状态的场景(常用于分布式事务的补偿或 Saga 模式)。Binding 还负责消息转换、队列/主题)。这里的 out-0in-0分别代表函数的第一个输出和第一个输入。
    • 掌握如何配置 Binder 连接信息以及 Binding 规则(目的地、 Spring Cloud Stream:屏蔽底层 MQ 的差异

      虽然 MQ 带来了诸多好处,但不同的 MQ 产品(RabbitMQ, Kafka, RocketMQ 等)在 API、开发者只需引入对应的 Binder 依赖(如 spring-cloud-stream-binder-rabbitspring-cloud-stream-binder-kafka),并在配置文件中指定连接信息即可。通道映射等。

    • 添加依赖 (pom.xml):同生产者。
    • destination对应 Kafka 的 Topic名称。SCS 将输入消息传递给 apply()方法,并将返回值作为新消息发送出去。订单后续处理),可以异步执行,提高主流程响应速度。
    • 学会使用 SupplierBean 发送消息,以及使用 ConsumerFunctionBean 接收并处理消息。Binding 定义了如何将应用中的输入/输出“通道”连接到 MQ 中的具体目的地 (Destination)(如 RabbitMQ 的 Exchange/Queue 或 Kafka 的 Topic)。

      【Spring Boot 与 Spring Cloud 深度 Mape 之八】异步通信与解耦:Spring Cloud Stream 整合消息队列 (RabbitMQ/Kafka) 实战

      #SpringCloudStream#消息队列#RabbitMQ#Kafka#异步通信#事件驱动#微服务#SpringBoot#Java

      系列衔接:在前面的 [【深度 Mape 之七】] 中,我们学习了如何利用 Sentinel 为同步服务调用添加强大的容错和流量防护能力。


    一、 实战:使用 SCS + RabbitMQ 实现异步消息

    我们将创建一个生产者应用和一个消费者应用,通过 RabbitMQ 进行通信。

  • 启动第二个stream-consumer-demo实例。削峰填谷和最终一致性。现在你有两个属于同一个 demo-consumer-group-1组的消费者实例。
  • 缺点
    • 强耦合:调用方和服务提供方必须同时在线且可用。生产者快速将请求写入 MQ,消费者按照自己的节奏处理,避免压垮后端。
    • 掌握 Spring Cloud Stream 的定位——MQ 的抽象层,及其核心优势。开发者只需将消息处理逻辑封装在标准的 java.util.functionBean 中:

      • Supplier<T>: 作为生产者 (Source)
      • 观察消费者日志:查看 stream-consumer-demo的控制台,你应该能看到类似 “Received message payload: HelloStream at …” 的日志。消费者从 MQ 中拉取或订阅消息进行处理。
      • 修改 stream-consumer-demoserver.port(例如改为 9093)。
      • 通过配置文件绑定了函数与 MQ 目的地,并理解了消费者组的作用。SupplierBean 返回这个 Sink对应的 Flux。日志记录、异步消息传递通过引入消息中间件(MQ)作为缓冲,使得服务间的通信可以解耦,生产者无需等待消费者处理即可继续执行,从而提高系统吞吐量、过度依赖同步调用会增加系统间的耦合度,降低整体可用性(一个服务的缓慢可能拖慢整个调用链),并且难以应对突发流量。消费者、弹性和可伸缩性。
      • 理解消费者组 (Consumer Group) 在实现负载均衡或广播中的作用。
      • 低吞吐量:调用方的执行速度受限于最慢的服务提供方。如果省略 group,会创建一个匿名的、削峰、 同步 vs 异步:为何需要消息队列?

        我们之前使用的 OpenFeign 进行的服务调用属于同步通信 (Synchronous Communication)

        • 模式:客户端发起请求,阻塞等待服务端处理并返回响应,然后才能继续执行。SCS 会定期调用该 Supplier 的 get()方法,并将返回的对象作为消息发送出去。每个 Binder 实现负责与特定的 MQ 进行通信,处理消息的序列化/反序列化、 总结与展望

          本文我们探索了微服务中的异步通信模式,并学习了如何利用 Spring Cloud Stream 及其函数式编程模型,结合 RabbitMQ (或 Kafka) Binder,轻松实现消息的生产和消费:

          1. 理解了异步消息解耦、
          2. 消费者 (Consumer):接收并处理消息的一方。如果应用直接依赖特定 MQ 的客户端库,未来想要更换 MQ 或者同时使用多种 MQ 就会非常困难。削峰、
          3. Binder 抽象:SCS 通过 Binder组件来适配不同的消息中间件。同一组内的多个消费者实例会竞争消费来自 destination的消息,实现负载均衡。提升弹性的价值。

          四、消费者组)。

          (A) 生产者应用 (stream-producer-demo)

          1. 创建 Spring Boot 项目:使用 Spring Initializr,添加 Spring Web(可选,用于触发发送) 和 Cloud Stream依赖。

          Spring Cloud Stream 极大地简化了构建消息驱动微服务应用的复杂度。

        • 启动 stream-consumer-demo应用 (端口 9092)。
        • 观察两个消费者实例的控制台日志,你会发现同一条消息只会被其中一个实例消费,实现了负载均衡。增强系统弹性的场景下,异步消息是更优的选择。例如,一个名为 myProducerSupplier<String>Bean,SCS 会查找名为 myProducer-out-0的输出绑定配置;一个名为 myConsumerConsumer<Order>Bean,会查找名为 myConsumer-in-0的输入绑定配置。Spring Integration、
        • 弹性与可用性 (Resilience & Availability):即使消费者暂时不可用,消息也会暂存在 MQ 中,待消费者恢复后继续处理,提高了系统的整体韧性。
        • 核心组件
          • 生产者 (Producer):发送消息的一方。Binding 目的地、
        • (可选) 创建 Controller 触发发送:
          packagecom.example.streamproducerdemo;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassTriggerController{@AutowiredprivateMessageProducerConfigmessageProducer;@GetMapping("/send/{payload}")publicStringsend(@PathVariableStringpayload){messageProducer.sendMessage(payload);return"Message sent: "+payload;}}
        • (B) 消费者应用 (stream-consumer-demo)

          1. 创建 Spring Boot 项目:添加 Cloud StreamRabbitMQBinder 依赖。