响应式流支持
Reactive Streams 支持
Spring 集成在框架的某些地方和不同方面提供了对 Reactive Streams 交互的支持。 我们将在这里讨论其中的大部分内容,并在必要时提供指向目标章节的适当链接以获取详细信息。
前言
概括地说,Spring Integration 扩展了 Spring 编程模型以支持众所周知的企业集成模式。
Spring 集成在基于 Spring 的应用程序内支持轻量级消息传递,并支持通过声明式适配器与外部系统集成。
Spring 集成的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。
此目标是在目标应用程序中使用一等公民实现的,例如message
,channel
和endpoint
,这允许我们构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生成到一个通道中,供另一个端点使用。
通过这种方式,我们可以将集成交互模型与目标业务逻辑区分开来。
这里的关键部分是介于两者之间的通道:流行为取决于其实现,而端点保持不变。
另一方面,Reactive Streams 是具有非阻塞背压的异步流处理标准。
Reactive Streams 的主要目标是管理跨异步边界的流数据交换——就像将元素传递给另一个线程或线程池一样——同时确保接收方不会被迫缓冲任意数量的数据。
换句话说,背压是此模型不可或缺的一部分,以便允许在线程之间调解的队列有界。
Reactive Streams 实现(例如 Project Reactor)的目的是在 Stream 应用程序的整个处理图中保留这些优势和特征。
Reactive Streams 库的最终目标是使用可用的编程语言结构,以透明和流畅的方式为目标应用程序提供类型、运算符集和支持 API,但最终解决方案并不像普通函数链调用那样势在必行。
它分为几个阶段:定义和执行,这发生在订阅最终反应式发布者的一段时间后,对数据的需求从定义的底部推送到顶部,根据需要施加背压 - 我们请求当前可以处理的尽可能多的事件。
响应式应用程序看起来像一个"stream"
或者正如我们在 Spring Integration 术语中习惯的那样——"flow"
.
事实上,自 Java 9 以来的 Reactive Streams SPI 在java.util.concurrent.Flow
类。
从这里来看,当我们在端点上应用一些反应式框架运算符时,Spring Integration 流似乎真的非常适合编写 Reactive Streams 应用程序,但实际上问题要广泛得多,我们需要记住,并非所有端点(例如JdbcMessageHandler
) 可以在反应式流中透明地处理。
当然,Spring Integration 中 Reactive Streams 支持的主要目标是允许整个过程完全反应式、按需启动和背压就绪。
在通道适配器的目标协议和系统提供 Reactive Streams 交互模型之前,这是不可能的。
在下面的部分中,我们将描述 Spring Integration 中提供了哪些组件和方法,用于开发保留集成流结构的响应式应用程序。
Spring 集成中的所有 Reactive Streams 交互都是用 Project Reactor 类型实现的,比如Mono 和Flux . |
消息网关
与 Reactive Streams 交互的最简单点是@MessagingGateway
其中,我们只将 gateway 方法的 return 类型设为Mono<?>
- 当订阅发生在返回的Mono
实例。
看反应器Mono
了解更多信息。
类似的Mono
-reply 方法在框架内部用于完全基于 Reactive Streams 兼容协议的入站网关(有关更多信息,请参阅下面的 Reactive Channel Adapters)。
send-and-receive作被包装到Mono.deffer()
链接来自replyChannel
标头。
这样,特定反应式协议(例如 Netty)的入站组件将作为在 Spring 集成上执行的反应式流的订阅者和发起者。
如果请求有效负载是反应式类型,则最好使用反应式流定义来处理它,从而将进程推迟到发起方订阅。
为此,处理程序方法也必须返回响应式类型。
有关更多信息,请参阅下一节。
反应式回复有效负载
当回复产生MessageHandler
返回回复消息的反应式类型有效负载,它会以异步方式处理,并使用常规的MessageChannel
implementation providedoutputChannel
并在输出渠道为ReactiveStreamsSubscribableChannel
implementation 的 intent 实例,例如FluxMessageChannel
.
使用标准祈使语气MessageChannel
use-case,如果回复有效负载是多值发布者(请参阅ReactiveAdapter.isMultiValue()
有关更多信息),它被包装到Mono.just()
.
因此,Mono
必须在下游显式订阅,或者由FluxMessageChannel
下游。
使用ReactiveStreamsSubscribableChannel
对于outputChannel
,无需担心返回类型和订阅;框架内部一切都顺利处理。
有关更多信息,请参阅 Asynchronous Service Activator 。
如需了解详情,另请参阅 Kotlin 协程。
FluxMessageChannel
和ReactiveStreamsConsumer
这FluxMessageChannel
是MessageChannel
和Publisher<Message<?>>
.
一个Flux
作为热源,是在内部创建的,用于从send()
实现。
这Publisher.subscribe()
implementation 被委托给该内部Flux
.
此外,对于按需上游消费,FluxMessageChannel
提供了ReactiveStreamsSubscribableChannel
合同。
任何上游Publisher
(例如,请参阅下面的 Source Polling Channel Adapter 和 splitter)当此通道的订阅准备就绪时,会自动订阅为此通道提供的 Source Polling Channel Adapter 和 splitter。
来自此委派发布者的事件将沉入内部Flux
上述。
一个FluxMessageChannel
必须是org.reactivestreams.Subscriber
实例来遵守 Reactive Streams 合约。
幸运的是,所有的MessageHandler
Spring Integration 中的实现还实现了一个CoreSubscriber
来自 Reactor 项目。
多亏了ReactiveStreamsConsumer
implementation 中,整个集成流配置对目标开发人员是透明的。
在这种情况行为从命令式推送模型更改为反应式拉取模型。
一个ReactiveStreamsConsumer
也可用于转动任何MessageChannel
发送到响应式源中IntegrationReactiveUtils
,使集成流部分反应。
看FluxMessageChannel
了解更多信息。
从版本 5.5 开始,ConsumerEndpointSpec
引入了reactive()
选项,将流中的终端节点设为ReactiveStreamsConsumer
独立于 input 通道。
可选的Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
可以提供以自定义源Flux
从 Input Channel 通过Flux.transform()
作,例如使用publishOn()
,doOnNext()
,retry()
等。
此功能表示为@Reactive
sub-annotation 的所有消息注释 (@ServiceActivator
,@Splitter
等)通过他们的reactive()
属性。
源轮询通道适配器
通常,SourcePollingChannelAdapter
依赖于由TaskScheduler
.
轮询触发器是根据提供的选项构建的,用于定期计划任务以轮询目标数据或事件源。
当outputChannel
是一个ReactiveStreamsSubscribableChannel
一样Trigger
用于确定下次执行的时间,但不是调度任务,而是SourcePollingChannelAdapter
创建一个Flux<Message<?>>
基于Flux.generate()
对于nextExecutionTime
values 和Mono.delay()
从上一步开始的持续时间。
一个Flux.flatMapMany()
用于轮询maxMessagesPerPoll
并将它们沉入 outputFlux
.
此生成器Flux
由提供的ReactiveStreamsSubscribableChannel
尊重下游的背压。
从版本 5.5 开始,当maxMessagesPerPoll == 0
,则根本不会调用源,并且flatMapMany()
立即通过Mono.empty()
result 直到maxMessagesPerPoll
稍后更改为非零值,例如通过 Control Bus。
这样,任何MessageSource
implementation 可以变成一个反应式热源。
有关更多信息,请参阅轮询使用者。
事件驱动的通道适配器
MessageProducerSupport
是事件驱动型通道适配器的基类,通常,其sendMessage(Message<?>)
在生产驱动程序 API 中用作侦听器回调。
这个回调也可以很容易地插入到doOnNext()
Reactor 运算符,当消息生成者实现构建Flux
的消息,而不是基于侦听器的功能。
事实上,这是在框架中完成的,当outputChannel
的消息生成者不是ReactiveStreamsSubscribableChannel
.
但是,为了改进最终用户体验,并允许更多的背压就绪功能,MessageProducerSupport
提供subscribeToPublisher(Publisher<? extends Message<?>>)
在 Target 实施中使用的 APIPublisher<Message<?>>>
是来自目标系统的数据源。
通常,它是从doStart()
在为目标驱动程序 API 调用Publisher
的源数据。
建议将反应性MessageProducerSupport
implementation 替换为FluxMessageChannel
作为outputChannel
用于按需订阅和下游事件使用。
当订阅Publisher
已取消。
叫stop()
在这样的通道上,适配器完成从源Publisher
.
通道适配器可以通过自动订阅新创建的源来重新启动Publisher
.
Message Source 到 Reactive Streams
从版本 5.3 开始,ReactiveMessageSourceProducer
。
它是提供的MessageSource
和事件驱动型生产集成到配置的outputChannel
.
在内部,它包装了一个MessageSource
到重复重新订阅的Mono
生成一个Flux<Message<?>>
以订阅subscribeToPublisher(Publisher<? extends Message<?>>)
上述。
此订阅Mono
使用Schedulers.boundedElastic()
避免在目标中可能阻塞MessageSource
.
当消息源返回null
(没有数据可提取),则Mono
转换为repeatWhenEmpty()
state 替换为delay
对于基于IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY
Duration
条目。
默认情况下,它是 1 秒。
如果MessageSource
生成带有IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
信息,则会在 Headers 的doOnSuccess()
原件Mono
和 rejected 在doOnError()
如果下游流抛出MessagingException
替换为 Reject 的失败消息。
这ReactiveMessageSourceProducer
可用于任何用例,当轮询通道适配器的功能应该转换为任何现有MessageSource<?>
实现。
拆分器和聚合器
当AbstractMessageSplitter
获取Publisher
对于其逻辑,该过程自然会越过Publisher
要将它们映射到消息中,以便发送到outputChannel
.
如果此通道是ReactiveStreamsSubscribableChannel
这Flux
wrapper 的Publisher
从该通道按需订阅,并且此拆分器行为看起来更像是flatMap
Reactor 运算符,当我们将传入事件映射到多值输出时Publisher
.
当整个集成流程是使用FluxMessageChannel
在拆分器之前和之后,将 Spring 集成配置与 Reactive Streams 需求及其事件处理运算符保持一致。
对于常规通道,一个Publisher
转换为Iterable
用于标准的 Iterate-and-produce 拆分逻辑。
一个FluxAggregatorMessageHandler
是另一个特定的 Reactive Streams 逻辑实现示例,可以将其视为"reactive operator"
就 Project Reactor 而言。
它基于Flux.groupBy()
和Flux.window()
(或buffer()
) 运算符。
传入消息将沉入Flux.create()
在FluxAggregatorMessageHandler
,使其成为热源。
这Flux
由ReactiveStreamsSubscribableChannel
按需,或直接在FluxAggregatorMessageHandler.start()
当outputChannel
不是反应性的。
这MessageHandler
具有它的力量,当整个集成流程是使用FluxMessageChannel
在这个 component 之前和之后,使整个 logic back-pressure 准备好。
有关更多信息,请参阅 Stream and Flux Splitting 和 Flux Aggregator 。
Java DSL
一IntegrationFlow
在 Java 中,DSL 可以从任何Publisher
实例(请参阅IntegrationFlow.from(Publisher<Message<T>>)
).
此外,使用IntegrationFlowBuilder.toReactivePublisher()
运算符、IntegrationFlow
可以变成反应性热源。
一个FluxMessageChannel
在这两种情况下都在内部使用;它可以订阅入站Publisher
根据其ReactiveStreamsSubscribableChannel
合同,它是一个Publisher<Message<?>>
对于下游订阅者。
具有动态IntegrationFlow
注册后,我们可以实现一个强大的逻辑,将 Reactive Streams 与这个集成流相结合,桥接到 / 从Publisher
.
从版本 5.5.6 开始,toReactivePublisher(boolean autoStartOnSubscribe)
operator 变体用于控制整个IntegrationFlow
在返回的Publisher<Message<?>>
.
通常,来自反应式发布者的订阅和使用发生在后面的运行时阶段,而不是在反应式流组合期间,甚至ApplicationContext
启动。
为了避免使用IntegrationFlow
在Publisher<Message<?>>
订阅点,为了获得更好的最终用户体验,这个新运算符具有autoStartOnSubscribe
标志。
它标记(如果true
) 的IntegrationFlow
及其组件autoStartup = false
,因此ApplicationContext
不会自动启动流中消息的生成和使用。
相反,start()
对于IntegrationFlow
从内部Flux.doOnSubscribe()
.
独立于autoStartOnSubscribe
值,则流将从Flux.doOnCancel()
和Flux.doOnTerminate()
- 如果没有东西可以消费,那么生成消息就没有意义。
对于完全相反的用例,当IntegrationFlow
应该调用一个反应式流并在完成后继续,一个fluxTransform()
运算符在IntegrationFlowDefinition
.
此时的流将转换为FluxMessageChannel
它被传播到提供的fluxFunction
,在Flux.transform()
算子。
该函数的结果被包装到Mono<Message<?>>
用于平面映射到输出Flux
被另一个人订阅FluxMessageChannel
用于下游流。
有关更多信息,请参见 Java DSL 章节。
ReactiveMessageHandler
从版本 5.3 开始,ReactiveMessageHandler
在框架中原生支持。
这种类型的消息处理程序专为反应式客户端而设计,这些客户端返回反应式类型以进行按需订阅以执行低级作,并且不提供任何回复数据来继续反应式流组合。
当ReactiveMessageHandler
在命令式集成流程中使用,handleMessage()
result in subscribed 在返回后立即返回,只是因为在这样的 flow 中没有响应式流组合来遵守背压。
在这种情况下,框架将ReactiveMessageHandler
转换为ReactiveMessageHandlerAdapter
- 一个MessageHandler
.
但是,当ReactiveStreamsConsumer
参与流中(例如,当 channel to consume 是一个FluxMessageChannel
),这样一个ReactiveMessageHandler
由一个flatMap()
反应器作员在消耗过程中遵守背压。
开箱即用的ReactiveMessageHandler
implementation 是一个ReactiveMongoDbStoringMessageHandler
用于出站通道适配器。
有关更多信息,请参见MongoDB Reactive Channel Adapters。
Reactive 通道适配器
当集成的目标协议提供 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器就变得简单了。
入站、事件驱动的通道适配器实现是将请求(如有必要)包装到延迟的Mono
或Flux
并仅在协议组件启动订阅到Mono
从 listener 方法返回。
这样我们就有了一个完全封装在这个组件中的反应式流解决方案。
当然,在输出通道上订阅的下游集成流应该遵循 Reactive Streams 规范,并以按需、背压就绪的方式执行。
这并不总是由 的性质 (或当前实现) 提供的MessageHandler
处理器。
可以使用线程池和队列或FluxMessageChannel
(见上文)在没有反应式实现时,在 integration endpoints 之前和之后。
反应式事件驱动的入站通道适配器的示例:
public class CustomReactiveMessageProducer extends MessageProducerSupport {
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}
@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
}
}
用法将如下所示:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
或者以声明的方式:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
或者即使没有通道适配器,我们也可以始终按以下方式使用 Java DSL:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
反应式出站通道适配器实现是关于根据为目标协议提供的反应式 API 启动(或延续)反应流以与外部系统交互。 入站有效负载本身可以是反应式类型,也可以是整个集成流的事件,它是顶部的反应式流的一部分。 如果我们处于单向、即发即弃的场景中,则可以立即订阅返回的反应式类型,或者它被传播到下游(请求-回复场景)以进行进一步的集成流或目标业务逻辑中的显式订阅,但仍在下游保留反应式流语义。
反应式出站通道适配器的示例:
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}
@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}
private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}
private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}
public enum Type {
INSERT,
UPDATE,
}
}
我们将能够使用这两个通道适配器:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
目前, Spring 集成为 WebFlux、RSocket、MongoDb、R2DBC、ZeroMQ、GraphQL、Apache Cassandra 提供通道适配器(或网关)实现。
Redis Stream Channel Adapter 也是反应式的,并使用ReactiveStreamOperations
来自 Spring Data。
更多的反应式通道适配器即将到来,例如基于ReactiveKafkaProducerTemplate
和ReactiveKafkaConsumerTemplate
来自 Spring for Apache Kafka 等。
对于许多其他非反应式通道适配器,建议使用线程池以避免在反应式流处理期间阻塞。
对命令式上下文传播的反应
当上下文传播库位于 Classpath 上时,Project Reactor 可以采用ThreadLocal
值(例如 Micrometer Observation 或SecurityContextHolder
) 并将它们存储到Subscriber
上下文。
当我们需要填充日志记录 MDC 进行跟踪或让我们从反应流中调用的服务来从作用域中恢复观察时,也可以进行相反的作。
请参阅 Project Reactor 文档中有关其用于上下文传播的特殊运算符的更多信息。
如果我们的整个解决方案是单个响应式流组合,那么存储和恢复上下文可以顺利地进行,因为Subscriber
context 从下游一直到合成的开头都可见 (Flux
或Mono
).
但是,如果应用程序在不同的Flux
实例或命令式处理并返回,则上下文绑定到Subscriber
可能不可用。
对于这样的用例, Spring 集成提供了一个额外的功能(从版本6.0.5
) 来存储 ReactorContextView
到IntegrationMessageHeaderAccessor.REACTOR_CONTEXT
从反应式流生成的 message 头,例如,当我们执行 directsend()
操作。
然后,此标头在FluxMessageChannel.subscribeTo()
恢复 Reactor 上下文Message
这个频道将要发出。
目前,此标头是从WebFluxInboundEndpoint
和RSocketInboundGateway
组件,但可用于执行 Reactive to Imperative 集成的任何解决方案。
填充此标头的逻辑如下所示:
return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
请注意,我们仍然需要使用handle()
运算符进行 Reactor 还原ThreadLocal
值。
即使它是作为标头发送的,框架也无法假设它是否要恢复到ThreadLocal
值。
要从Message
另一方面Flux
或Mono
composition 中,可以执行此逻辑:
Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));