响应式流支持

Reactive Streams 支持

Spring 集成在框架的某些地方和不同方面提供了对 Reactive Streams 交互的支持。 我们将在这里讨论其中的大部分内容,并在必要时提供指向目标章节的适当链接以获取详细信息。spring-doc.cadn.net.cn

前言

概括地说,Spring Integration 扩展了 Spring 编程模型以支持众所周知的企业集成模式。 Spring 集成在基于 Spring 的应用程序内支持轻量级消息传递,并支持通过声明式适配器与外部系统集成。 Spring 集成的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。 此目标是在目标应用程序中使用一等公民实现的,例如message,channelendpoint,这允许我们构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生成到一个通道中,供另一个端点使用。 通过这种方式,我们可以将集成交互模型与目标业务逻辑区分开来。 这里的关键部分是介于两者之间的通道:流行为取决于其实现,而端点保持不变。spring-doc.cadn.net.cn

另一方面,Reactive Streams 是具有非阻塞背压的异步流处理标准。 Reactive Streams 的主要目标是管理跨异步边界的流数据交换——就像将元素传递给另一个线程或线程池一样——同时确保接收方不会被迫缓冲任意数量的数据。 换句话说,背压是此模型不可或缺的一部分,以便允许在线程之间调解的队列有界。 Reactive Streams 实现(例如 Project Reactor)的目的是在 Stream 应用程序的整个处理图中保留这些优势和特征。 Reactive Streams 库的最终目标是使用可用的编程语言结构,以透明和流畅的方式为目标应用程序提供类型、运算符集和支持 API,但最终解决方案并不像普通函数链调用那样势在必行。 它分为几个阶段:定义和执行,这发生在订阅最终反应式发布者的一段时间后,对数据的需求从定义的底部推送到顶部,根据需要施加背压 - 我们请求当前可以处理的尽可能多的事件。 响应式应用程序看起来像一个"stream"或者正如我们在 Spring Integration 术语中习惯的那样——"flow". 事实上,自 Java 9 以来的 Reactive Streams SPI 在java.util.concurrent.Flow类。spring-doc.cadn.net.cn

从这里来看,当我们在端点上应用一些反应式框架运算符时,Spring Integration 流似乎真的非常适合编写 Reactive Streams 应用程序,但实际上问题要广泛得多,我们需要记住,并非所有端点(例如JdbcMessageHandler) 可以在反应式流中透明地处理。 当然,Spring Integration 中 Reactive Streams 支持的主要目标是允许整个过程完全反应式、按需启动和背压就绪。 在通道适配器的目标协议和系统提供 Reactive Streams 交互模型之前,这是不可能的。 在下面的部分中,我们将描述 Spring Integration 中提供了哪些组件和方法,用于开发保留集成流结构的响应式应用程序。spring-doc.cadn.net.cn

Spring 集成中的所有 Reactive Streams 交互都是用 Project Reactor 类型实现的,比如MonoFlux.

消息网关

与 Reactive Streams 交互的最简单点是@MessagingGateway其中,我们只将 gateway 方法的 return 类型设为Mono<?>- 当订阅发生在返回的Mono实例。 看反应器Mono了解更多信息。 类似的Mono-reply 方法在框架内部用于完全基于 Reactive Streams 兼容协议的入站网关(有关更多信息,请参阅下面的 Reactive Channel Adapters)。 send-and-receive作被包装到Mono.deffer()链接来自replyChannel标头。 这样,特定反应式协议(例如 Netty)的入站组件将作为在 Spring 集成上执行的反应式流的订阅者和发起者。 如果请求有效负载是反应式类型,则最好使用反应式流定义来处理它,从而将进程推迟到发起方订阅。 为此,处理程序方法也必须返回响应式类型。 有关更多信息,请参阅下一节。spring-doc.cadn.net.cn

反应式回复有效负载

当回复产生MessageHandler返回回复消息的反应式类型有效负载,它会以异步方式处理,并使用常规的MessageChannelimplementation providedoutputChannel并在输出渠道为ReactiveStreamsSubscribableChannelimplementation 的 intent 实例,例如FluxMessageChannel. 使用标准祈使语气MessageChanneluse-case,如果回复有效负载是多值发布者(请参阅ReactiveAdapter.isMultiValue()有关更多信息),它被包装到Mono.just(). 因此,Mono必须在下游显式订阅,或者由FluxMessageChannel下游。 使用ReactiveStreamsSubscribableChannel对于outputChannel,无需担心返回类型和订阅;框架内部一切都顺利处理。spring-doc.cadn.net.cn

有关更多信息,请参阅 Asynchronous Service Activatorspring-doc.cadn.net.cn

如需了解详情,另请参阅 Kotlin 协程。spring-doc.cadn.net.cn

FluxMessageChannelReactiveStreamsConsumer

FluxMessageChannelMessageChannelPublisher<Message<?>>. 一个Flux作为热源,是在内部创建的,用于从send()实现。 这Publisher.subscribe()implementation 被委托给该内部Flux. 此外,对于按需上游消费,FluxMessageChannel提供了ReactiveStreamsSubscribableChannel合同。 任何上游Publisher(例如,请参阅下面的 Source Polling Channel Adapter 和 splitter)当此通道的订阅准备就绪时,会自动订阅为此通道提供的 Source Polling Channel Adapter 和 splitter。 来自此委派发布者的事件将沉入内部Flux上述。spring-doc.cadn.net.cn

一个FluxMessageChannel必须是org.reactivestreams.Subscriber实例来遵守 Reactive Streams 合约。 幸运的是,所有的MessageHandlerSpring Integration 中的实现还实现了一个CoreSubscriber来自 Reactor 项目。 多亏了ReactiveStreamsConsumerimplementation 中,整个集成流配置对目标开发人员是透明的。 在这种情况行为从命令式推送模型更改为反应式拉取模型。 一个ReactiveStreamsConsumer也可用于转动任何MessageChannel发送到响应式源中IntegrationReactiveUtils,使集成流部分反应。spring-doc.cadn.net.cn

从版本 5.5 开始,ConsumerEndpointSpec引入了reactive()选项,将流中的终端节点设为ReactiveStreamsConsumer独立于 input 通道。 可选的Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>可以提供以自定义源Flux从 Input Channel 通过Flux.transform()作,例如使用publishOn(),doOnNext(),retry()等。 此功能表示为@Reactivesub-annotation 的所有消息注释 (@ServiceActivator,@Splitter等)通过他们的reactive()属性。spring-doc.cadn.net.cn

源轮询通道适配器

通常,SourcePollingChannelAdapter依赖于由TaskScheduler. 轮询触发器是根据提供的选项构建的,用于定期计划任务以轮询目标数据或事件源。 当outputChannel是一个ReactiveStreamsSubscribableChannel一样Trigger用于确定下次执行的时间,但不是调度任务,而是SourcePollingChannelAdapter创建一个Flux<Message<?>>基于Flux.generate()对于nextExecutionTimevalues 和Mono.delay()从上一步开始的持续时间。 一个Flux.flatMapMany()用于轮询maxMessagesPerPoll并将它们沉入 outputFlux. 此生成器Flux由提供的ReactiveStreamsSubscribableChannel尊重下游的背压。 从版本 5.5 开始,当maxMessagesPerPoll == 0,则根本不会调用源,并且flatMapMany()立即通过Mono.empty()result 直到maxMessagesPerPoll稍后更改为非零值,例如通过 Control Bus。 这样,任何MessageSourceimplementation 可以变成一个反应式热源。spring-doc.cadn.net.cn

有关更多信息,请参阅轮询使用者spring-doc.cadn.net.cn

事件驱动的通道适配器

MessageProducerSupport是事件驱动型通道适配器的基类,通常,其sendMessage(Message<?>)在生产驱动程序 API 中用作侦听器回调。 这个回调也可以很容易地插入到doOnNext()Reactor 运算符,当消息生成者实现构建Flux的消息,而不是基于侦听器的功能。 事实上,这是在框架中完成的,当outputChannel的消息生成者不是ReactiveStreamsSubscribableChannel. 但是,为了改进最终用户体验,并允许更多的背压就绪功能,MessageProducerSupport提供subscribeToPublisher(Publisher<? extends Message<?>>)在 Target 实施中使用的 APIPublisher<Message<?>>>是来自目标系统的数据源。 通常,它是从doStart()在为目标驱动程序 API 调用Publisher的源数据。 建议将反应性MessageProducerSupportimplementation 替换为FluxMessageChannel作为outputChannel用于按需订阅和下游事件使用。 当订阅Publisher已取消。 叫stop()在这样的通道上,适配器完成从源Publisher. 通道适配器可以通过自动订阅新创建的源来重新启动Publisher.spring-doc.cadn.net.cn

Message Source 到 Reactive Streams

从版本 5.3 开始,ReactiveMessageSourceProducer。 它是提供的MessageSource和事件驱动型生产集成到配置的outputChannel. 在内部,它包装了一个MessageSource到重复重新订阅的Mono生成一个Flux<Message<?>>以订阅subscribeToPublisher(Publisher<? extends Message<?>>)上述。 此订阅Mono使用Schedulers.boundedElastic()避免在目标中可能阻塞MessageSource. 当消息源返回null(没有数据可提取),则Mono转换为repeatWhenEmpty()state 替换为delay对于基于IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration条目。 默认情况下,它是 1 秒。 如果MessageSource生成带有IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK信息,则会在 Headers 的doOnSuccess()原件Mono和 rejected 在doOnError()如果下游流抛出MessagingException替换为 Reject 的失败消息。 这ReactiveMessageSourceProducer可用于任何用例,当轮询通道适配器的功能应该转换为任何现有MessageSource<?>实现。spring-doc.cadn.net.cn

拆分器和聚合器

AbstractMessageSplitter获取Publisher对于其逻辑,该过程自然会越过Publisher要将它们映射到消息中,以便发送到outputChannel. 如果此通道是ReactiveStreamsSubscribableChannelFluxwrapper 的Publisher从该通道按需订阅,并且此拆分器行为看起来更像是flatMapReactor 运算符,当我们将传入事件映射到多值输出时Publisher. 当整个集成流程是使用FluxMessageChannel在拆分器之前和之后,将 Spring 集成配置与 Reactive Streams 需求及其事件处理运算符保持一致。 对于常规通道,一个Publisher转换为Iterable用于标准的 Iterate-and-produce 拆分逻辑。spring-doc.cadn.net.cn

一个FluxAggregatorMessageHandler是另一个特定的 Reactive Streams 逻辑实现示例,可以将其视为"reactive operator"就 Project Reactor 而言。 它基于Flux.groupBy()Flux.window()(或buffer()) 运算符。 传入消息将沉入Flux.create()FluxAggregatorMessageHandler,使其成为热源。 这FluxReactiveStreamsSubscribableChannel按需,或直接在FluxAggregatorMessageHandler.start()outputChannel不是反应性的。 这MessageHandler具有它的力量,当整个集成流程是使用FluxMessageChannel在这个 component 之前和之后,使整个 logic back-pressure 准备好。spring-doc.cadn.net.cn

有关更多信息,请参阅 Stream and Flux SplittingFlux Aggregatorspring-doc.cadn.net.cn

Java DSL

IntegrationFlow在 Java 中,DSL 可以从任何Publisher实例(请参阅IntegrationFlow.from(Publisher<Message<T>>)). 此外,使用IntegrationFlowBuilder.toReactivePublisher()运算符、IntegrationFlow可以变成反应性热源。 一个FluxMessageChannel在这两种情况下都在内部使用;它可以订阅入站Publisher根据其ReactiveStreamsSubscribableChannel合同,它是一个Publisher<Message<?>>对于下游订阅者。 具有动态IntegrationFlow注册后,我们可以实现一个强大的逻辑,将 Reactive Streams 与这个集成流相结合,桥接到 / 从Publisher.spring-doc.cadn.net.cn

从版本 5.5.6 开始,toReactivePublisher(boolean autoStartOnSubscribe)operator 变体用于控制整个IntegrationFlow在返回的Publisher<Message<?>>. 通常,来自反应式发布者的订阅和使用发生在后面的运行时阶段,而不是在反应式流组合期间,甚至ApplicationContext启动。 为了避免使用IntegrationFlowPublisher<Message<?>>订阅点,为了获得更好的最终用户体验,这个新运算符具有autoStartOnSubscribe标志。 它标记(如果true) 的IntegrationFlow及其组件autoStartup = false,因此ApplicationContext不会自动启动流中消息的生成和使用。 相反,start()对于IntegrationFlow从内部Flux.doOnSubscribe(). 独立于autoStartOnSubscribe值,则流将从Flux.doOnCancel()Flux.doOnTerminate()- 如果没有东西可以消费,那么生成消息就没有意义。spring-doc.cadn.net.cn

对于完全相反的用例,当IntegrationFlow应该调用一个反应式流并在完成后继续,一个fluxTransform()运算符在IntegrationFlowDefinition. 此时的流将转换为FluxMessageChannel它被传播到提供的fluxFunction,在Flux.transform()算子。 该函数的结果被包装到Mono<Message<?>>用于平面映射到输出Flux被另一个人订阅FluxMessageChannel用于下游流。spring-doc.cadn.net.cn

有关更多信息,请参见 Java DSL 章节spring-doc.cadn.net.cn

ReactiveMessageHandler

从版本 5.3 开始,ReactiveMessageHandler在框架中原生支持。 这种类型的消息处理程序专为反应式客户端而设计,这些客户端返回反应式类型以进行按需订阅以执行低级作,并且不提供任何回复数据来继续反应式流组合。 当ReactiveMessageHandler在命令式集成流程中使用,handleMessage()result in subscribed 在返回后立即返回,只是因为在这样的 flow 中没有响应式流组合来遵守背压。 在这种情况下,框架将ReactiveMessageHandler转换为ReactiveMessageHandlerAdapter- 一个MessageHandler. 但是,当ReactiveStreamsConsumer参与流中(例如,当 channel to consume 是一个FluxMessageChannel),这样一个ReactiveMessageHandler由一个flatMap()反应器作员在消耗过程中遵守背压。spring-doc.cadn.net.cn

开箱即用的ReactiveMessageHandlerimplementation 是一个ReactiveMongoDbStoringMessageHandler用于出站通道适配器。 有关更多信息,请参见MongoDB Reactive Channel Adaptersspring-doc.cadn.net.cn

Reactive 通道适配器

当集成的目标协议提供 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器就变得简单了。spring-doc.cadn.net.cn

入站、事件驱动的通道适配器实现是将请求(如有必要)包装到延迟的MonoFlux并仅在协议组件启动订阅到Mono从 listener 方法返回。 这样我们就有了一个完全封装在这个组件中的反应式流解决方案。 当然,在输出通道上订阅的下游集成流应该遵循 Reactive Streams 规范,并以按需、背压就绪的方式执行。spring-doc.cadn.net.cn

这并不总是由 的性质 (或当前实现) 提供的MessageHandler处理器。 可以使用线程池和队列或FluxMessageChannel(见上文)在没有反应式实现时,在 integration endpoints 之前和之后。spring-doc.cadn.net.cn

反应式事件驱动的入站通道适配器的示例:spring-doc.cadn.net.cn

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

用法将如下所示:spring-doc.cadn.net.cn

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

或者以声明的方式:spring-doc.cadn.net.cn

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

或者即使没有通道适配器,我们也可以始终按以下方式使用 Java DSL:spring-doc.cadn.net.cn

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

反应式出站通道适配器实现是关于根据为目标协议提供的反应式 API 启动(或延续)反应流以与外部系统交互。 入站有效负载本身可以是反应式类型,也可以是整个集成流的事件,它是顶部的反应式流的一部分。 如果我们处于单向、即发即弃的场景中,则可以立即订阅返回的反应式类型,或者它被传播到下游(请求-回复场景)以进行进一步的集成流或目标业务逻辑中的显式订阅,但仍在下游保留反应式流语义。spring-doc.cadn.net.cn

反应式出站通道适配器的示例:spring-doc.cadn.net.cn

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

我们将能够使用这两个通道适配器:spring-doc.cadn.net.cn

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

目前, Spring 集成为 WebFluxRSocketMongoDbR2DBCZeroMQ、GraphQL、Apache Cassandra 提供通道适配器(或网关)实现。 Redis Stream Channel Adapter 也是反应式的,并使用ReactiveStreamOperations来自 Spring Data。 更多的反应式通道适配器即将到来,例如基于ReactiveKafkaProducerTemplateReactiveKafkaConsumerTemplate来自 Spring for Apache Kafka 等。 对于许多其他非反应式通道适配器,建议使用线程池以避免在反应式流处理期间阻塞。spring-doc.cadn.net.cn

对命令式上下文传播的反应

上下文传播库位于 Classpath 上时,Project Reactor 可以采用ThreadLocal值(例如 Micrometer ObservationSecurityContextHolder) 并将它们存储到Subscriber上下文。 当我们需要填充日志记录 MDC 进行跟踪或让我们从反应流中调用的服务来从作用域中恢复观察时,也可以进行相反的作。 请参阅 Project Reactor 文档中有关其用于上下文传播的特殊运算符的更多信息。 如果我们的整个解决方案是单个响应式流组合,那么存储和恢复上下文可以顺利地进行,因为Subscribercontext 从下游一直到合成的开头都可见 (FluxMono). 但是,如果应用程序在不同的Flux实例或命令式处理并返回,则上下文绑定到Subscriber可能不可用。 对于这样的用例, Spring 集成提供了一个额外的功能(从版本6.0.5) 来存储 ReactorContextViewIntegrationMessageHeaderAccessor.REACTOR_CONTEXT从反应式流生成的 message 头,例如,当我们执行 directsend()操作。 然后,此标头在FluxMessageChannel.subscribeTo()恢复 Reactor 上下文Message这个频道将要发出。 目前,此标头是从WebFluxInboundEndpointRSocketInboundGateway组件,但可用于执行 Reactive to Imperative 集成的任何解决方案。 填充此标头的逻辑如下所示:spring-doc.cadn.net.cn

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

请注意,我们仍然需要使用handle()运算符进行 Reactor 还原ThreadLocal值。 即使它是作为标头发送的,框架也无法假设它是否要恢复到ThreadLocal值。spring-doc.cadn.net.cn

要从Message另一方面FluxMonocomposition 中,可以执行此逻辑:spring-doc.cadn.net.cn

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));