对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
Spring Integration 在框架的某些地方和不同的方面提供了对 Reactive Streams 交互的支持。 我们将在这里讨论其中的大部分内容,并在必要时提供指向目标章节的适当链接以获取详细信息。
前言
总而言之,Spring Integration 扩展了 Spring 编程模型,以支持众所周知的企业集成模式。
Spring Integration 支持在基于 Spring 的应用程序中进行轻量级消息传递,并支持通过声明性适配器与外部系统集成。
Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点的分离,这对于生成可维护、可测试的代码至关重要。
这个目标是在目标应用程序中使用像 和 这样的一类公民来实现的,这允许我们构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生成到一个通道中,供另一个端点使用。
通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。
这里的关键部分是介于两者之间的通道:流行为取决于其实现,而端点保持不变。message
channel
endpoint
另一方面,反应流是具有非阻塞背压的异步流处理的标准。
反应式流的主要目标是控制跨异步边界的流数据交换(例如将元素传递到另一个线程或线程池),同时确保接收端不会被迫缓冲任意数量的数据。
换言之,背压是该模型的一个组成部分,以便允许在线程之间调解的队列是有界的。
响应式流实现(如 Project Reactor)的目的是在流应用程序的整个处理图中保留这些优势和特征。
Reactive Streams 库的最终目标是以透明和流畅的方式为目标应用程序提供类型、运算符集和支持 API,就像可用的编程语言结构一样,但最终的解决方案并不像正常的函数链调用那样必要。
它分为几个阶段:定义和执行,在订阅最终反应式发布者期间的一段时间后发生,对数据的需求从定义的底部推到顶部,根据需要施加背压 - 我们请求尽可能多的事件,我们目前可以处理。
反应式应用程序看起来像我们在 Spring 集成术语中习惯的 or - 。
事实上,自 Java 9 以来的 Reactive Streams SPI 已呈现在类中。"stream"
"flow"
java.util.concurrent.Flow
从这里开始,当我们在端点上应用一些响应式框架运算符时,Spring Integration 流看起来确实非常适合编写响应式流应用程序,但实际上问题要广泛得多,我们需要记住,并非所有端点(例如)都可以在响应式流中透明地处理。
当然,Spring Integration 中支持 Reactive Streams 的主要目标是让整个过程完全响应、按需启动和背压准备。
在信道适配器的目标协议和系统提供反应式流交互模型之前,这是不可能的。
在下面的部分中,我们将介绍Spring Integration中提供了哪些组件和方法,用于开发响应式应用程序保留集成流结构。JdbcMessageHandler
Spring Integration 中使用 Project Reactor 类型实现的所有响应式流交互,例如 和 .Mono Flux |
Spring Integration 中使用 Project Reactor 类型实现的所有响应式流交互,例如 和 .Mono Flux |
消息传递网关
与 Reactive Streams 交互的最简单点是,我们只需将网关方法的返回类型作为 - 并且当返回的实例上发生订阅时,网关方法调用背后的整个集成流将执行。
有关更多信息,请参见 Reactor Mono
。
框架内部对完全基于反应式流兼容协议的入站网关使用类似的 -reply 方法(有关详细信息,请参阅下面的反应式通道适配器)。
发送和接收操作被包装成一个,并在可用时从标头链接回复评估。
这样,特定反应式协议(例如Netty)的入站组件将作为在Spring Integration上执行的反应流的订阅者和启动者。
如果请求有效负载是响应式类型,则最好使用响应式流定义来处理它,将进程延迟到启动方订阅。
为此,处理程序方法还必须返回反应式类型。
有关详细信息,请参阅下一节。@MessagingGateway
Mono<?>
Mono
Mono
Mono.deffer()
replyChannel
反应式回复有效负载
当生成回复返回回复消息的反应式有效负载时,它将以异步方式进行处理,并为 (必须设置为 ) 提供常规实现,并在输出通道为实现时使用按需订阅扁平化,例如 .
对于标准命令式用例,如果回复有效负载是多值发布者(有关详细信息,请参阅),则将其包装到 .
因此,必须明确地在下游订阅或被下游压平。
使用 a for the ,无需担心返回类型和订阅;一切都由框架在内部顺利处理。MessageHandler
MessageChannel
outputChannel
async
true
ReactiveStreamsSubscribableChannel
FluxMessageChannel
MessageChannel
ReactiveAdapter.isMultiValue()
Mono.just()
Mono
FluxMessageChannel
ReactiveStreamsSubscribableChannel
outputChannel
有关详细信息,请参阅异步服务激活器。
有关更多信息,另请参阅 Kotlin 协程。
FluxMessageChannel
和ReactiveStreamsConsumer
是 和 的组合实现。
A 作为热源,是在内部创建的,用于接收来自实现的传入消息。
实现被委托给该内部 .
此外,对于按需上游消费,为合约提供了实现。
为此通道提供的任何上游(例如,请参阅下面的源轮询通道适配器和拆分器)都会在为此通道准备好订阅时自动订阅。
来自此委托发布者的事件被沉入上述内部。FluxMessageChannel
MessageChannel
Publisher<Message<?>>
Flux
send()
Publisher.subscribe()
Flux
FluxMessageChannel
ReactiveStreamsSubscribableChannel
Publisher
Flux
的使用者必须是履行反应式流协定的实例。
幸运的是,Spring Integration 中的所有实现也实现了 from project Reactor。
由于中间有一个实现,整个集成流配置对目标开发人员来说是透明的。
在这种情况行为从命令式推送模型更改为反应式拉取模型。
A 也可用于将任何源转换为反应源,使积分流部分反应。FluxMessageChannel
org.reactivestreams.Subscriber
MessageHandler
CoreSubscriber
ReactiveStreamsConsumer
ReactiveStreamsConsumer
MessageChannel
IntegrationReactiveUtils
有关详细信息,请参阅 FluxMessageChannel
。
从版本 5.5 开始,引入了一个选项,用于将流中的终结点设置为独立于输入通道的端点。
可以提供可选功能,通过操作从输入通道自定义信号源,例如使用 、 等。
此功能通过其属性表示为所有消息注释(等)的子注释。ConsumerEndpointSpec
reactive()
ReactiveStreamsConsumer
Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
Flux
Flux.transform()
publishOn()
doOnNext()
retry()
@Reactive
@ServiceActivator
@Splitter
reactive()
源轮询通道适配器
通常,依赖于由 .
轮询触发器是根据提供的选项构建的,用于定期计划任务以轮询数据或事件的目标源。
当 an 为 时,用于确定下一次执行时间,但不是计划任务,而是根据 for 值和上一步的持续时间创建一个。
然后使用 A 轮询并将它们沉入输出中。
该发电机由提供的承压下游订阅。
从 5.5 版开始,当 时,根本不调用源,而是立即通过结果完成,直到稍后更改为非零值,例如通过控制总线。
这样,任何实现都可以变成反应式热源。SourcePollingChannelAdapter
TaskScheduler
outputChannel
ReactiveStreamsSubscribableChannel
Trigger
SourcePollingChannelAdapter
Flux<Message<?>>
Flux.generate()
nextExecutionTime
Mono.delay()
Flux.flatMapMany()
maxMessagesPerPoll
Flux
Flux
ReactiveStreamsSubscribableChannel
maxMessagesPerPoll == 0
flatMapMany()
Mono.empty()
maxMessagesPerPoll
MessageSource
有关详细信息,请参阅轮询使用者。
事件驱动通道适配器
MessageProducerSupport
是事件驱动的通道适配器的基类,通常,它用作生成驱动程序 API 中的侦听器回调。
当消息生产者实现构建消息而不是基于侦听器的功能时,此回调也可以很容易地插入到 Reactor 运算符中。
实际上,这是在框架中完成的,当消息生产者的 an 不是 .
但是,为了改善最终用户体验,并允许更多的背压就绪功能,当 a 是来自目标系统的数据源时,它提供了一个用于目标实现的 API。
通常,当为源数据调用目标驱动程序 API 时,从实现中使用它。
建议将响应式实现与 a 结合使用,用于按需订阅和下游事件消费。
取消对 的订阅时,通道适配器将进入停止状态。
调用这样的通道适配器即可完成从源 的生成。
可以通过自动订阅新创建的源来重新启动通道适配器。sendMessage(Message<?>)
doOnNext()
Flux
outputChannel
ReactiveStreamsSubscribableChannel
MessageProducerSupport
subscribeToPublisher(Publisher<? extends Message<?>>)
Publisher<Message<?>>>
doStart()
Publisher
MessageProducerSupport
FluxMessageChannel
outputChannel
Publisher
stop()
Publisher
Publisher
反应式流的消息源
从版本 5.3 开始,提供了 a。
它是将提供的和事件驱动的生产组合到配置的 .
在内部,它将 a 包装到重复重新订阅的 a 中,从而产生上述要订阅的内容。
为此的订阅是用来避免目标中可能的阻塞。
当消息源返回(没有要拉取的数据)时,将转换为一种状态,其中包含 a 用于基于订阅者上下文中的条目进行后续重新订阅。
默认情况下,它是 1 秒。
如果生成的消息在标头中包含信息,则在原始消息中确认(如有必要),并在下游流抛出要拒绝的失败消息时拒绝。
这可用于任何用例,当轮询通道适配器的功能应转换为任何现有实现的反应性按需解决方案时。ReactiveMessageSourceProducer
MessageSource
outputChannel
MessageSource
Mono
Flux<Message<?>>
subscribeToPublisher(Publisher<? extends Message<?>>)
Mono
Schedulers.boundedElastic()
MessageSource
null
Mono
repeatWhenEmpty()
delay
IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY
Duration
MessageSource
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
doOnSuccess()
Mono
doOnError()
MessagingException
ReactiveMessageSourceProducer
MessageSource<?>
拆分器和聚合器
当 an 获得 a 的逻辑时,该过程会自然地遍历 中的项,将它们映射到消息中以发送到 .
如果这个通道是 ,则 的包装器是按需从该通道订阅的,当我们将传入事件映射到多值输出时,这种拆分器行为看起来更像是一个 Reactor 运算符。
当整个集成流使用拆分器之前和之后构建时,这是最有意义的,使 Spring Integration 配置与 Reactive Streams 要求及其事件处理运算符保持一致。
使用常规通道时,a 被转换为标准迭代和生成拆分逻辑。AbstractMessageSplitter
Publisher
Publisher
outputChannel
ReactiveStreamsSubscribableChannel
Flux
Publisher
flatMap
Publisher
FluxMessageChannel
Publisher
Iterable
A 是特定 Reactive Streams 逻辑实现的另一个示例,可以将其视为 Project Reactor 的 a。
它基于 和 (或) 运算符。
在创建 a 时,传入的消息将沉入 inited,使其成为热源。
这是按需订阅的,或者直接在非反应性时订阅。
当整个集成流程构建有这个组件的前后时,这有其强大的功能,使整个逻辑背压准备就绪。FluxAggregatorMessageHandler
"reactive operator"
Flux.groupBy()
Flux.window()
buffer()
Flux.create()
FluxAggregatorMessageHandler
Flux
ReactiveStreamsSubscribableChannel
FluxAggregatorMessageHandler.start()
outputChannel
MessageHandler
FluxMessageChannel
有关更多信息,请参见 Stream and Flux Splitting 和 Flux Aggregator。
Java DSL的
Java DSL 中的 DSL 可以从任何实例启动(请参阅)。
此外,通过操作员,可以将其变成反应性热源。
在这两种情况下,A 都在内部使用;它可以根据其合同订阅入站,并且它本身就是下游订阅者。
通过动态注册,我们可以实现一个强大的逻辑,将反应式流与这个集成流桥接到/桥接到。IntegrationFlow
Publisher
IntegrationFlow.from(Publisher<Message<T>>)
IntegrationFlowBuilder.toReactivePublisher()
IntegrationFlow
FluxMessageChannel
Publisher
ReactiveStreamsSubscribableChannel
Publisher<Message<?>>
IntegrationFlow
Publisher
从版本 5.5.6 开始,存在一个运算符变体来控制返回的 .
通常,来自反应式发布者的订阅和使用发生在后期运行时阶段,而不是在反应式流组合期间,甚至不是在启动期间。
为了避免在订阅点进行生命周期管理的样板代码,并获得更好的最终用户体验,引入了这个带有标志的新运算符。
它标记(如果)及其组件,因此不会自动启动流中消息的生产和消费。
相反,for 是从内部启动的。
与该值无关,流从 a 和 停止 - 如果没有任何东西可以消耗它们,则生成消息是没有意义的。toReactivePublisher(boolean autoStartOnSubscribe)
IntegrationFlow
Publisher<Message<?>>
ApplicationContext
IntegrationFlow
Publisher<Message<?>>
autoStartOnSubscribe
true
IntegrationFlow
autoStartup = false
ApplicationContext
start()
IntegrationFlow
Flux.doOnSubscribe()
autoStartOnSubscribe
Flux.doOnCancel()
Flux.doOnTerminate()
对于完全相反的用例,当应该调用反应式流并在完成后继续时,在 .
此时的流被转换为a,该流被传播到提供的,在运算符中执行。
该函数的结果被包装成一个用于平面映射的输出,该输出由另一个用于下游流的订阅。IntegrationFlow
fluxTransform()
IntegrationFlowDefinition
FluxMessageChannel
fluxFunction
Flux.transform()
Mono<Message<?>>
Flux
FluxMessageChannel
有关更多信息,请参见 Java DSL 章节。
ReactiveMessageHandler
从版本 5.3 开始,框架本机支持 。
这种类型的消息处理程序专为响应式客户端而设计,这些客户端返回响应式类型,用于低级别操作执行的按需订阅,并且不提供任何回复数据来继续响应式流组合。
当在命令式积分流中使用 a 时,结果在返回后立即订阅,只是因为在这样的流中没有反应性流组成来尊重背压。
在本例中,框架将其包装成 - 一个 的简单实现。
然而,当 a 参与流动时(例如,当要消耗的通道是 a 时),这样的 a 被组成到整个反应流中,反应器操作员在消耗过程中遵守背压。ReactiveMessageHandler
ReactiveMessageHandler
handleMessage()
ReactiveMessageHandler
ReactiveMessageHandlerAdapter
MessageHandler
ReactiveStreamsConsumer
FluxMessageChannel
ReactiveMessageHandler
flatMap()
一个现成的实现是出站通道适配器。
有关更多信息,请参阅 MongoDB 反应式通道适配器。ReactiveMessageHandler
ReactiveMongoDbStoringMessageHandler
从版本 6.1 开始,公开了一个方便的终端操作员。
任何实现(即使只是使用 API 的普通 lambda)都可以用于此运算符。
框架会自动订阅返回的。
下面是此运算符的可能配置的简单示例:IntegrationFlowDefinition
handleReactive(ReactiveMessageHandler)
ReactiveMessageHandler
Mono
Mono<Void>
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handleReactive((message) -> Mono.just(message).log().then());
}
此运算符的重载版本接受 a 以围绕提供的 .Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>
ReactiveMessageHandler
此外,还提供了基于 的变体。
在大多数情况下,它们用于特定于协议的通道适配器实现。
请参阅下一节,以下链接指向具有相应反应式通道适配器的目标技术。ReactiveMessageHandlerSpec
电应式通道适配器
当集成的目标协议提供 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器变得简单明了。
入站、事件驱动的通道适配器实现是关于将请求(如有必要)包装到延迟或中,并仅在协议组件将订阅启动到从侦听器方法返回时执行发送(并生成回复,如果有)。
这样,我们就可以将反应式流解决方案精确封装在这个组件中。
当然,在输出通道上订阅的下游集成流应符合反应流规范,并以按需、背压就绪的方式执行。Mono
Flux
Mono
这在集成流中使用的处理器的性质(或当前实现)并不总是可用的。
当没有反应式实现时,可以使用线程池和队列或(见上文)在集成端点之前和之后处理此限制。MessageHandler
FluxMessageChannel
反应式事件驱动的入站通道适配器的示例:
public class CustomReactiveMessageProducer extends MessageProducerSupport {
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}
@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
}
}
用法如下所示:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
或者以声明方式:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
或者即使没有通道适配器,我们也可以按以下方式使用 Java DSL:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
反应式出站通道适配器实现是关于根据为目标协议提供的反应式 API 启动(或延续)反应式流以与外部系统交互。 入站有效负载本身可以是反应式类型,也可以是整个集成流的事件,它是顶部反应式流的一部分。 如果我们处于单向、即发即弃的场景中,则可以立即订阅返回的反应式类型,或者将其传播到下游(请求-应答场景)以进行进一步的集成流或在目标业务逻辑中显式订阅,但仍保留下游的反应式流语义。
反应式出站通道适配器的示例:
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}
@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}
private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}
private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}
public enum Type {
INSERT,
UPDATE,
}
}
我们将能够使用两个通道适配器:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
目前,Spring Integration 为 WebFlux、RSocket、MongoDb、R2DBC、ZeroMQ、GraphQL和 Apache Cassandra 提供通道适配器(或网关)实现。
Redis 流通道适配器也是响应式的,可以从 Spring Data 使用。
更多的反应式通道适配器即将到来,例如,Kafka 中的 Apache Kafka 基于 Spring for Apache Kafka 等。
对于许多其他非反应式通道适配器,建议使用线程池,以避免在反应式流处理期间阻塞。ReactiveStreamOperations
ReactiveKafkaProducerTemplate
ReactiveKafkaConsumerTemplate
对命令式上下文传播的反应
当 Context Propagation 库位于类路径上时,Project Reactor 可以获取值(例如 Micrometer Observation 或 )并将它们存储到上下文中。
当我们需要填充日志记录 MDC 进行跟踪或让我们从反应式流调用的服务从作用域恢复观察值时,也可以执行相反的操作。
在 Project Reactor 文档中查看有关其用于上下文传播的特殊算子的更多信息。
如果我们的整个解决方案是单个反应式流组合,则存储和恢复上下文可以顺利进行,因为上下文从下游到组合的开头都是可见的( 或 )。
但是,如果应用程序在不同的实例之间切换或进入命令式处理并返回,则与 关联的上下文可能不可用。
对于这样的用例,Spring Integration提供了一个额外的功能(从版本开始),将一个反应堆存储到从反应式流产生的消息头中,例如,当我们执行直接操作时。
然后,此标头用于恢复此通道将要发出的 Reactor 上下文。
目前,此标头是从 和 组件填充的,但可用于执行响应式到命令式集成的任何解决方案。
填充此标头的逻辑如下所示:ThreadLocal
SecurityContextHolder
Subscriber
Subscriber
Flux
Mono
Flux
Subscriber
6.0.5
ContextView
IntegrationMessageHeaderAccessor.REACTOR_CONTEXT
send()
FluxMessageChannel.subscribeTo()
Message
WebFluxInboundEndpoint
RSocketInboundGateway
return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
请注意,我们仍然需要使用运算符来使 Reactor 从上下文中恢复值。
即使它作为标头发送,框架也无法假设它是否要恢复到下游的值。handle()
ThreadLocal
ThreadLocal
要从另一个或组合恢复上下文,可以执行以下逻辑:Message
Flux
Mono
Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));