Java DSL
Java DSL
Spring 集成 Java 配置和 DSL 提供了一组方便的构建器和一个流畅的 API,允许您配置来自 Spring 的 Spring 集成消息流@Configuration
类。
(另请参阅 Kotlin DSL。
(另请参见 Groovy DSL。
用于 Spring Integration 的 Java DSL 本质上是 Spring Integration 的门面。
DSL 提供了一种简单的方法,通过使用 Fluent 将 Spring 集成消息流嵌入到你的应用程序中Builder
模式与 Spring Framework 和 Spring Integration 中的现有 Java 配置一起运行。
我们还使用并支持 lambda(在 Java 8 中提供)来进一步简化 Java 配置。
这家咖啡馆提供了使用 DSL 的一个很好的例子。
DSL 由IntegrationFlow
Fluent API(请参阅IntegrationFlowBuilder
).
这将生成IntegrationFlow
组件,该组件应注册为 Spring Bean(通过使用@Bean
注释)。
构建器模式用于将任意复杂的结构表示为可以接受 lambda 作为参数的方法层次结构。
这IntegrationFlowBuilder
仅收集集成组件 (MessageChannel
实例AbstractEndpoint
实例,依此类推),在IntegrationFlow
bean 在应用程序上下文中进一步解析和注册具体 bean 的IntegrationFlowBeanPostProcessor
.
Java DSL 直接使用 Spring 集成类,并绕过任何 XML 生成和解析。 然而,DSL 在 XML 之上提供的不仅仅是语法糖。 它最引人注目的功能之一是能够定义内联 lambda 来实现终端节点逻辑,无需外部类来实现自定义逻辑。 从某种意义上说,Spring 集成对 Spring 表达式语言 (SpEL) 和内联脚本的支持解决了这个问题,但 lambda 更简单、功能更强大。
下面的示例展示了如何使用 Java 配置进行 Spring 集成:
@Configuration
@EnableIntegration
public class MyConfiguration {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.fromSupplier(integerSource()::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}
前面的配置示例的结果是,它在ApplicationContext
启动、Spring 集成端点和消息通道。
Java 配置可用于替换和增强 XML 配置。
您无需替换所有现有的 XML 配置即可使用 Java 配置。
DSL 基础知识
这org.springframework.integration.dsl
package 包含IntegrationFlowBuilder
API 和一些IntegrationComponentSpec
实现,它们也是构建器,并提供 Fluent API 来配置具体端点。
这IntegrationFlowBuilder
基础设施为基于消息的应用程序(例如通道、终端节点、轮询程序和通道拦截器)提供常见的企业集成模式 (EIP)。
端点在 DSL 中表示为动词,以提高可读性。 以下列表包括常见的 DSL 方法名称和关联的 EIP Endpoint:
-
转换→
Transformer
-
筛选→
Filter
-
手柄 →
ServiceActivator
-
拆分→
Splitter
-
聚合→
Aggregator
-
路线 →
Router
-
桥接→
Bridge
从概念上讲,集成流程是通过将这些端点组合成一个或多个消息流来构建的。
请注意,EIP 并未正式定义术语“消息流”,但将其视为使用众所周知的消息传递模式的工作单元是有用的。
DSL 提供了一个IntegrationFlow
组件来定义通道和它们之间的端点的组合,但现在IntegrationFlow
仅扮演配置角色以在应用程序上下文中填充真实 bean,而不在运行时使用。
但是,用于IntegrationFlow
可以作为Lifecycle
控制start()
和stop()
对于整个流,它被委托给与此关联的所有 Spring 集成组件IntegrationFlow
.
以下示例使用IntegrationFlow
Fluent API 来定义IntegrationFlow
bean 使用 EIP 方法IntegrationFlowBuilder
:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<String, Integer>transform(Integer::parseInt)
.get();
}
这transform
方法接受 Lambda 作为终端节点参数以对消息负载进行作。
此方法的真正参数是GenericTransformer<S, T>
实例。
因此,任何提供的转换器 (ObjectToJsonTransformer
,FileToStringTransformer
和其他)都可以在此处使用。
在被窝里,IntegrationFlowBuilder
识别MessageHandler
及其端点,使用MessageTransformingHandler
和ConsumerEndpointFactoryBean
分别。
考虑另一个例子:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("input")
.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println)
.get();
}
前面的示例将Filter → Transformer → Service Activator
.
流程是 “'单向'”。
也就是说,它不提供回复消息,而只将有效负载打印到 STDOUT。
终端节点使用直接通道自动连接在一起。
lambda 和
Message<?> 参数在 EIP 方法中使用 lambda 表达式时,“input” 参数通常是消息负载。
如果您希望访问整个消息,请使用采用
这将在运行时失败,并显示 相反,请使用:
|
Bean 定义覆盖
Java DSL 可以为流定义中内联定义的对象注册 bean,也可以重用现有的注入的 bean。
如果为内嵌对象和现有 bean 定义定义了相同的 bean 名称,则 |
消息通道
除了IntegrationFlowBuilder
使用 EIP 方法,Java DSL 提供了一个 Fluent API 来配置MessageChannel
实例。
为此,MessageChannels
builder factory 的 builder 工厂。
以下示例演示如何使用它:
@Bean
public MessageChannel priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap())
.get();
}
一样MessageChannels
builder factory 可以在channel()
EIP 方法从IntegrationFlowBuilder
连接端点,类似于连接input-channel
/output-channel
对。
默认情况下,端点与DirectChannel
bean 名称基于以下模式的实例:[IntegrationFlow.beanName].channel#[channelNameIndex]
.
此规则也适用于 inline 生成的未命名通道MessageChannels
builder factory 使用情况。
但是,所有MessageChannels
方法具有一个变体,该变体知道channelId
可用于设置MessageChannel
实例。
这MessageChannel
references 和beanName
可以用作 Bean 方法调用。
以下示例显示了使用channel()
弹性公网 IP 方式:
@Bean
public MessageChannel queueChannel() {
return MessageChannels.queue().get();
}
@Bean
public MessageChannel publishSubscribe() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")
的意思是“'查找并使用MessageChannel
替换为 “input” id,或创建一个'”。 -
fixedSubscriberChannel()
生成一个FixedSubscriberChannel
并使用名称channelFlow.channel#0
. -
channel("queueChannel")
工作方式相同,但使用现有的queueChannel
豆。 -
channel(publishSubscribe())
是 Bean 方法引用。 -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))
是IntegrationFlowBuilder
这暴露了IntegrationComponentSpec
到ExecutorChannel
并将其注册为executorChannel
. -
channel("output")
注册DirectChannel
bean 替换为output
作为其名称,只要不存在具有此名称的 bean。
注意:前面的IntegrationFlow
定义有效,并且其所有通道都应用于具有BridgeHandler
实例。
请小心使用相同的内联通道定义MessageChannels 来自不同的工厂IntegrationFlow 实例。
即使 DSL 解析器将不存在的对象注册为 bean,它也无法确定相同的对象(MessageChannel ) 来自不同的IntegrationFlow 器皿。
以下示例是错误的: |
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
该错误示例的结果是以下异常:
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
要使其正常工作,您需要声明@Bean
对于该通道,并使用来自不同IntegrationFlow
实例。
轮询器
Spring 集成还提供了一个 Fluent API,允许您配置PollerMetadata
为AbstractPollingEndpoint
实现。
您可以使用Pollers
builder factory 配置公共 bean 定义或从IntegrationFlowBuilder
EIP 方法,如下例所示:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500)
.errorChannel("myErrors");
}
看Pollers
和PollerSpec
在 Javadoc 中了解更多信息。
如果使用 DSL 构造PollerSpec 作为@Bean ,请不要调用get() 方法。
这PollerSpec 是一个FactoryBean 这会生成PollerMetadata object 并初始化其所有属性。 |
这reactive()
端点
从版本 5.5 开始,ConsumerEndpointSpec
提供reactive()
configuration 属性与可选的定制器Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
.
此选项将目标终端节点配置为ReactiveStreamsConsumer
实例,独立于输入通道类型,该类型转换为Flux
通过IntegrationReactiveUtils.messageChannelToFlux()
.
提供的函数从Flux.transform()
运算符进行自定义 (publishOn()
,log()
,doOnNext()
等)来自 input 通道的反应式流源。
下面的示例演示了如何将发布线程从独立于最终订阅者和生产者的 input 通道更改为该DirectChannel
:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
.get();
}
有关更多信息,请参阅 Reactive Streams Support 。
DSL 和端点配置
都IntegrationFlowBuilder
EIP 方法有一个变体,该变体应用 lambda 参数来提供AbstractEndpoint
实例:SmartLifecycle
,PollerMetadata
,request-handler-advice-chain
等。
它们中的每一个都有通用参数,因此它允许您配置终端节点,甚至其MessageHandler
在上下文中,如下例所示:
@Bean
public IntegrationFlow flow2() {
return IntegrationFlow.from(this.inputChannel)
.transform(new PayloadSerializingTransformer(),
c -> c.autoStartup(false).id("payloadSerializingTransformer"))
.transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
.get();
}
此外,EndpointSpec
提供了一个id()
方法,让您使用给定的 bean 名称而不是生成的 bean 名称注册端点 bean。
如果MessageHandler
作为 Bean 引用,则任何现有的adviceChain
如果.advice()
method 的 API 定义中:
@Bean
public TcpOutboundGateway tcpOut() {
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(cf());
gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
return gateway;
}
@Bean
public IntegrationFlow clientTcpFlow() {
return f -> f
.handle(tcpOut(), e -> e.advice(testAdvice()))
.transform(Transformers.objectToString());
}
它们不会被合并,只有testAdvice()
在这种情况下使用 bean。
变形金刚
DSL API 提供了一个方便、流畅的Transformers
Factory 中用作.transform()
弹性公网 IP 方法。
以下示例演示如何使用它:
@Bean
public IntegrationFlow transformFlow() {
return IntegrationFlow.from("input")
.transform(Transformers.fromJson(MyPojo.class))
.transform(Transformers.serializer())
.get();
}
它避免了使用 setter 进行不方便的编码,并使流定义更加简单。
请注意,您可以使用Transformers
声明目标Transformer
实例设置为@Bean
实例,并再次从IntegrationFlow
定义为 Bean 方法。
尽管如此,如果内联对象尚未定义为 bean,则 DSL 解析器会处理这些对象的 bean 声明。
有关更多信息和支持的工厂方法,请参阅 Javadoc 中的 Transformers。
另请参阅lambda 和Message<?>
参数.
入站通道适配器
通常,消息流从入站通道适配器(例如<int-jdbc:inbound-channel-adapter>
).
适配器配置了<poller>
,它会要求MessageSource<?>
定期生成消息。
Java DSL 允许启动IntegrationFlow
从MessageSource<?>
太。
为此,IntegrationFlow
Fluent API 提供了一个重载的IntegrationFlow.from(MessageSource<?> messageSource)
方法。
您可以配置MessageSource<?>
作为 Bean 提供,并将其作为该方法的参数提供。
第二个参数IntegrationFlow.from()
是一个Consumer<SourcePollingChannelAdapterSpec>
Lambda 允许您提供选项(例如PollerMetadata
或SmartLifecycle
) 的SourcePollingChannelAdapter
.
以下示例演示如何使用 Fluent API 和 lambda 创建一个IntegrationFlow
:
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}
对于那些没有构建要求的情况Message
对象,您可以使用IntegrationFlow.fromSupplier()
变体,该变体基于java.util.function.Supplier
.
结果Supplier.get()
会自动包装在Message
(如果它还不是Message
).
消息路由器
Spring 集成原生提供了专门的路由器类型,包括:
-
HeaderValueRouter
-
PayloadTypeRouter
-
ExceptionTypeRouter
-
RecipientListRouter
-
XPathRouter
与许多其他 DSL 一样IntegrationFlowBuilder
EIP 方法、route()
method 可以应用任何AbstractMessageRouter
implementation 或者为方便起见,使用String
作为 SPEL 表达式或ref
-method
双。
此外,您还可以配置route()
替换为 lambda,并将 lambda 用于Consumer<RouterSpec<MethodInvokingRouter>>
.
Fluent API 还提供AbstractMappingMessageRouter
选项,例如channelMapping(String key, String channelName)
对,如下例所示:
@Bean
public IntegrationFlow routeFlowByLambda() {
return IntegrationFlow.from("routerInput")
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.suffix("Channel")
.channelMapping(true, "even")
.channelMapping(false, "odd")
)
.get();
}
以下示例显示了一个简单的基于表达式的路由器:
@Bean
public IntegrationFlow routeFlowByExpression() {
return IntegrationFlow.from("routerInput")
.route("headers['destChannel']")
.get();
}
这routeToRecipients()
method 采用Consumer<RecipientListRouterSpec>
,如下例所示:
@Bean
public IntegrationFlow recipientListFlow() {
return IntegrationFlow.from("recipientListInput")
.<String, String>transform(p -> p.replaceFirst("Payload", ""))
.routeToRecipients(r -> r
.recipient("thing1-channel", "'thing1' == payload")
.recipientMessageSelector("thing2-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("thing3"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"thing3".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
.get();
}
这.defaultOutputToParentFlow()
的.routeToRecipients()
definition 允许您设置路由器的defaultOutput
作为网关,以继续处理 MAIN FLOW 中不匹配的消息。
另请参阅lambda 和Message<?>
参数.
分配器
要创建拆分器,请使用split()
弹性公网 IP 方法。
默认情况下,如果有效负载是Iterable
一Iterator
一Array
一个Stream
或反应式Publisher
这split()
方法将每个项目输出为单独的消息。
它接受 lambda、spEL 表达式或任何AbstractMessageSplitter
实现。
或者,您也可以在不带参数的情况下使用它来提供DefaultMessageSplitter
.
以下示例演示如何使用split()
方法:
@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlow.from("splitInput")
.split(s -> s.applySequence(false).delimiters(","))
.channel(MessageChannels.executor(taskExecutor()))
.get();
}
前面的示例创建一个拆分器,该拆分器将包含逗号分隔的消息String
.
另请参阅lambda 和Message<?>
参数.
聚合器和 Resequencer
一Aggregator
在概念上与Splitter
.
它将一系列单独的消息聚合到一条消息中,并且必然更复杂。
默认情况下,聚合器返回一条消息,其中包含来自传入消息的有效负载集合。
相同的规则也适用于Resequencer
.
以下示例显示了 splitter-aggregator 模式的规范示例:
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
这split()
方法将列表拆分为单独的消息,并将它们发送到ExecutorChannel
.
这resequence()
method 按消息标头中找到的序列详细信息对消息进行重新排序。
这aggregate()
method 收集这些消息。
但是,您可以通过指定发布策略和关联策略等来更改默认行为。 请考虑以下示例:
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
前面的示例将具有myCorrelationKey
headers 并在至少累积了 10 封邮件后释放邮件。
为resequence()
弹性公网 IP 方法。
Service Activator 和.handle()
方法
这.handle()
EIP 方法的目标是调用任何MessageHandler
implementation 或任何方法。
另一种选择是使用 lambda 表达式定义 “活动”。
因此,我们引入了一个泛型GenericHandler<P>
functional 接口。
其handle
method 需要两个参数:P payload
和MessageHeaders headers
(从版本 5.1 开始)。
有了这个,我们可以定义一个 flow,如下所示:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("flow3Input")
.<Integer>handle((p, h) -> p * 2)
.get();
}
前面的示例将它接收的任何整数加倍。
但是,Spring 集成的一个主要目标是loose coupling
,通过从消息负载到消息处理程序的目标参数的运行时类型转换。
由于 Java 不支持 lambda 类的泛型类型解析,因此我们引入了一种解决方法,即payloadType
参数,以及LambdaMessageProcessor
.
这样做会将硬转换工作委托给 Spring 的ConversionService
,它使用提供的type
以及请求的 message 到 target 方法参数。
以下示例显示了生成的IntegrationFlow
可能看起来像:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<byte[], String>transform(p - > new String(p, "UTF-8"))
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
我们也可以注册一些BytesToIntegerConverter
在ConversionService
去掉那个额外的.transform()
:
@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
return new BytesToIntegerConverter();
}
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
另请参阅lambda 和Message<?>
参数.
作员 gateway()
这gateway()
运算符包含在IntegrationFlow
definition 是一种特殊的 Service Activator 实现,通过其 input 通道调用其他端点或集成流并等待回复。
从技术上讲,它与嵌套的<gateway>
组件<chain>
定义(请参阅从 Chain 中调用 Chain),并允许流更清晰、更直接。
从逻辑上和业务角度来看,它是一个消息传递网关,允许在目标集成解决方案的不同部分之间分发和重用功能(请参阅 消息传递网关)。
此运算符具有多个用于不同目标的重载:
-
gateway(String requestChannel)
按其名称将消息发送到某个端点的 input 通道; -
gateway(MessageChannel requestChannel)
通过直接注入将消息发送到某个端点的 input 通道; -
gateway(IntegrationFlow flow)
将消息发送到提供的IntegrationFlow
.
所有这些都与第二个Consumer<GatewayEndpointSpec>
参数来配置目标GatewayMessageHandler
和各自的AbstractEndpoint
.
此外,IntegrationFlow
-based methods允许调用现有的IntegrationFlow
bean 或通过IntegrationFlow
functional 接口或将其提取到private
Method Cleaner 代码样式:
@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流并不总是返回回复,则应将requestTimeout 设置为 0 以防止无限期挂起调用线程。
在这种情况将在该点结束,并释放线程以进行进一步的工作。 |
运算符 log()
为方便起见,要通过 Spring 集成流(<logging-channel-adapter>
)、一个log()
运算符。
在内部,它由WireTap
ChannelInterceptor
替换为LoggingHandler
作为其订阅者。
它负责将传入消息记录到下一个终端节点或当前通道中。
以下示例演示如何使用LoggingHandler
:
.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)
在前面的示例中,id
标头记录在ERROR
水平test.category
仅适用于通过筛选条件且在路由之前的消息。
从版本 6.0 开始,此运算符在 flow 末尾的行为与其在 middle 中的用法一致。
换句话说,即使log()
运算符。
因此,如果预计不会在流结束时生成回复,则nullChannel()
建议在最后log()
.
运算符 intercept()
从版本 5.3 开始,intercept()
operator 允许注册一个或多个ChannelInterceptor
实例MessageChannel
在流中。
这是创建显式MessageChannel
通过MessageChannels
应用程序接口。
以下示例使用MessageSelectingInterceptor
要拒绝某些邮件并出现例外:
.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)
MessageChannelSpec.wireTap()
Spring 集成包括一个.wireTap()
流畅的 APIMessageChannelSpec
建设者。
以下示例演示如何使用wireTap
记录输入的方法:
@Bean
public QueueChannelSpec myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input");
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
如果
|
当当前MessageChannel
不实现InterceptableChannel
,则隐式DirectChannel
和BridgeHandler
注入到IntegrationFlow
和WireTap
已添加到此新DirectChannel
.
以下示例没有任何 channel 声明:
.handle(...)
.log()
}
在前面的示例中(以及任何时候未声明 channel),隐式的DirectChannel
被注入到IntegrationFlow
并用作当前配置的ServiceActivatingHandler
(来自.handle()
,如前所述)。
使用消息流
IntegrationFlowBuilder
提供顶级 API 以生成连接到消息流的集成组件。
当您的集成可以通过单个流完成时(通常是这种情况),这很方便。
交互IntegrationFlow
实例可以通过MessageChannel
实例。
默认情况下,MessageFlow
在 Spring 集成的说法中表现为“链”。
也就是说,端点由DirectChannel
实例。
消息流实际上并不是作为一个链构建的,这提供了更大的灵活性。
例如,您可以向流中的任何组件发送消息,前提是您知道其inputChannel
名称(即,如果您明确定义了它)。
您还可以在流中引用外部定义的通道,以允许使用通道适配器(以启用远程传输协议、文件 I/O 等),而不是直接通道。
因此,DSL 不支持 Spring 集成chain
元素,因为它在这种情况下不会增加太多价值。
由于 Spring 集成 Java DSL 生成与任何其他配置选项相同的 bean 定义模型,并且基于现有的 Spring 框架@Configuration
基础结构,它可以与 XML 定义一起使用,并与 Spring Integration 消息传递注释配置连接。
您还可以定义 directIntegrationFlow
实例。
以下示例显示了如何执行此作:
@Bean
public IntegrationFlow lambdaFlow() {
return f -> f.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}
此定义的结果是与隐式直接通道连接的同一组集成组件。
此处的唯一限制是此流从命名的直接渠道 -lambdaFlow.input
.
此外,Lambda 流不能从MessageSource
或MessageProducer
.
从版本 5.1 开始,这种IntegrationFlow
包装到代理中,以公开生命周期控制并提供对inputChannel
内部关联的StandardIntegrationFlow
.
从版本 5.0.6 开始,为IntegrationFlow
包括 Flow Bean,后跟一个点 (.
) 作为前缀。
例如,ConsumerEndpointFactoryBean
对于.transform("Hello "::concat)
在前面的示例中,将生成 Bean 名称lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0
.
(该o.s.i
是org.springframework.integration
以适应页面。
这Transformer
该端点的实现 Bean 名称为lambdaFlow.transformer#0
(从版本 5.1 开始),其中MethodInvokingTransformer
class 时,会使用其组件类型。
相同的模式适用于所有NamedComponent
s。
这些生成的 Bean 名称前面加上流 ID,用于解析日志或在某些分析工具中将组件分组在一起,以及避免在运行时同时注册集成流时出现争用情况。
有关更多信息,请参阅 动态和运行时集成流 。
FunctionExpression
我们引入了FunctionExpression
类(SPEL 的Expression
接口),让我们使用 lambda 表达式和generics
.
这Function<T, R>
选项,以及expression
选项,当存在隐式Strategy
变体。
以下示例演示如何使用函数表达式:
.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))
这FunctionExpression
还支持运行时类型转换,就像SpelExpression
.
子流支持
部分if…else
和publish-subscribe
组件提供了使用子流指定其逻辑或映射的功能。
最简单的示例是.publishSubscribeChannel()
,如下例所示:
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
您可以使用单独的IntegrationFlow
@Bean
定义,但我们希望你发现 logic composition 的 sub-flow 风格很有用。
我们发现它会导致更短(因此更具可读性)的代码。
从版本 5.3 开始,BroadcastCapableChannel
-基于publishSubscribeChannel()
提供 implementation 以在 broker 支持的消息通道上配置子流订阅者。
例如,我们现在可以将多个订阅者配置为Jms.publishSubscribeChannel()
:
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub")
.get();
}
@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
.destination("pubsub")
.get();
}
类似的publish-subscribe
sub-flow 组合提供.routeToRecipients()
方法。
另一个例子是使用.discardFlow()
而不是.discardChannel()
在.filter()
方法。
这.route()
值得特别关注。
请考虑以下示例:
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
这.channelMapping()
继续像在常规Router
mapping 的 map,但.subFlowMapping()
将该子流绑定到 Main Flow。
换句话说,任何路由器的子流在.route()
.
有时,您需要引用现有的
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. 当您将子流配置为 lambda 时,框架会处理与子流的请求-回复交互,并且不需要网关。 |
子流可以嵌套到任何深度,但我们不建议这样做。 事实上,即使在路由器的情况下,在流中添加复杂的子流也会很快开始看起来像一盘意大利面,人类很难解析。
在 DSL 支持子流配置的情况下,当正在配置的组件通常需要一个通道,并且该子流以
框架在内部创建一个 |
使用协议适配器
到目前为止显示的所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型来支持消息传递体系结构。 但是,我们还没有做任何真正的集成。 这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源,或者访问本地文件系统。 Spring 集成支持所有这些以及更多。 理想情况下,DSL 应该为所有这些提供一流的支持,但是实现所有这些并跟上 Spring Integration 中新适配器的添加是一项艰巨的任务。 因此,期望 DSL 不断赶上 Spring 集成。
因此,我们提供了高级 API 来无缝定义特定于协议的消息收发。
我们使用 Factory 和 Builder 模式以及 lambda 来执行此作。
你可以将工厂类视为“名称空间工厂”,因为它们与来自具体协议特定的 Spring 集成模块的组件的 XML 名称空间起着相同的作用。
目前,Spring 集成 Java DSL 支持Amqp
,Feed
,Jms
,Files
,(S)Ftp
,Http
,JPA
,MongoDb
,TCP/UDP
,Mail
,WebFlux
和Scripts
命名空间工厂。
以下示例演示如何使用其中的三个 (Amqp
,Jms
和Mail
):
@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlow.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}
@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlow.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}
前面的示例展示了如何使用“命名空间工厂”作为内联适配器声明。
但是,您可以从@Bean
definitions 进行IntegrationFlow
方法链更具可读性。
在将精力投入到其他命名空间工厂之前,我们会征求社区对这些命名空间工厂的反馈。 我们也感谢对我们接下来应该支持的适配器和网关的优先级的任何意见。 |
您可以在本参考手册中特定于协议的章节中找到更多 Java DSL 示例。
所有其他协议通道适配器都可以配置为通用 bean 并连接到IntegrationFlow
,如下例所示:
@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}
@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlow.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}
@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}
IntegrationFlowAdapter
这IntegrationFlow
interface 可以直接实现并指定为 scanning 组件,如下例所示:
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
它由IntegrationFlowBeanPostProcessor
并在应用程序上下文中正确解析和注册。
为了方便起见并获得松散耦合架构的好处,我们提供了IntegrationFlowAdapter
基类实现。
它需要一个buildFlow()
方法实现以生成IntegrationFlowDefinition
通过使用from()
方法,如下例所示:
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new AtomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this::messageSource,
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("thing1", "THING1"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "T,H,I,N,G,2";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> thing1) {
return thing1.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String thing1) {
return payload + ":" + thing1;
}
}
动态和运行时集成流
IntegrationFlow
并且其所有依赖组件都可以在运行时注册。
在 5.0 版本之前,我们使用了BeanFactory.registerSingleton()
钩。
从 Spring 框架开始5.0
,我们使用instanceSupplier
程序化的钩子BeanDefinition
注册。
下面的示例展示了如何以编程方式注册一个 Bean:
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
请注意,在前面的示例中,instanceSupplier
hook 是genericBeanDefinition
方法,在本例中由 Lambda 提供。
所有必要的 bean 初始化和生命周期都是自动完成的,就像使用标准上下文配置 bean 定义一样。
为了简化开发体验,Spring Integration 引入了IntegrationFlowContext
注册和管理IntegrationFlow
实例,如下例所示:
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
当我们有多个配置选项并且必须创建类似流的多个实例时,这非常有用。
为此,我们可以迭代我们的选项并创建和注册IntegrationFlow
实例。
另一种变体是当我们的数据源不是基于 Spring 的,因此我们必须动态创建它。
这样的示例是 Reactive Streams 事件源,如下例所示:
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
这IntegrationFlowRegistrationBuilder
(由于IntegrationFlowContext.registration()
) 可用于指定IntegrationFlow
注册,控制其autoStartup
,并注册非 Spring 集成 bean。
通常,这些额外的 bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和反序列化器或任何其他必需的支持组件。
您可以使用IntegrationFlowRegistration.destroy()
callback 删除动态注册的IntegrationFlow
以及它的所有依赖的 bean。
请参阅IntegrationFlowContext
Javadoc了解更多信息。
从版本 5.0.6 开始,所有生成的 bean 名称都位于IntegrationFlow definition 的前缀为流 ID 作为前缀。
我们建议始终指定显式流 ID。
否则,将在IntegrationFlowContext 生成 Bean 名称,以便为IntegrationFlow 并注册其 bean。
我们在这两个作上进行同步,以避免当相同的生成的 bean 名称可能用于不同的 bean 时出现竞争条件IntegrationFlow 实例。 |
此外,从版本 5.0.6 开始,注册构建器 API 有一个新方法:useFlowIdAsPrefix()
.
如果您希望声明同一流的多个实例,并在流中的组件具有相同的 ID 时避免 bean 名称冲突,这将非常有用,如下例所示:
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
在这种情况下,可以使用名称为tcp1.client.handler
.
一id 属性是必需的useFlowIdAsPrefix() . |
IntegrationFlow
作为网关
这IntegrationFlow
可以从提供GatewayProxyFactoryBean
组件,如下例所示:
public interface ControlBusGateway {
void send(String command);
}
...
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from(ControlBusGateway.class)
.controlBus()
.get();
}
接口方法的所有代理都随通道一起提供,用于将消息发送到IntegrationFlow
.
您可以使用@MessagingGateway
注解,并使用@Gateway
附注。
尽管如此,requestChannel
被忽略并被IntegrationFlow
.
否则,使用IntegrationFlow
这没有意义。
默认情况下,GatewayProxyFactoryBean
获取约定的 bean 名称,例如[FLOW_BEAN_NAME.gateway]
.
您可以使用@MessagingGateway.name()
属性或重载的IntegrationFlow.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)
Factory 方法。
此外,来自@MessagingGateway
注解应用于目标GatewayProxyFactoryBean
.
当 annotation 配置不适用时,Consumer<GatewayProxySpec>
variant 可用于为目标代理提供适当的选项。
此 DSL 方法从版本 5.2 开始可用。
在 Java 8 中,您甚至可以使用java.util.function
接口,如下例所示:
@Bean
public IntegrationFlow errorRecovererFlow() {
return IntegrationFlow.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
.<Object>handle((p, h) -> {
throw new RuntimeException("intentional");
}, e -> e.advice(retryAdvice()))
.get();
}
那errorRecovererFlow
可以按如下方式使用:
@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;
DSL 扩展
从版本 5.3 开始,IntegrationFlowExtension
以允许使用自定义或组合的 EIP 运算符扩展现有 Java DSL。
所需要的只是这个类的扩展,它提供可以在IntegrationFlow
bean 定义。
扩展类也可以用于自定义IntegrationComponentSpec
配置;例如,missed 或 default 选项可以在现有的IntegrationComponentSpec
外延。
下面的示例演示了复合自定义运算符和AggregatorSpec
默认自定义outputProcessor
:
public class CustomIntegrationFlowDefinition
extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {
public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
return split()
.transform("payload.toUpperCase()");
}
public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
return register(new CustomAggregatorSpec(), aggregator);
}
}
public class CustomAggregatorSpec extends AggregatorSpec {
CustomAggregatorSpec() {
outputProcessor(group ->
group.getMessages()
.stream()
.map(Message::getPayload)
.map(String.class::cast)
.collect(Collectors.joining(", ")));
}
}
对于方法链流,这些扩展中的新 DSL 运算符必须返回扩展类。
这样,目标IntegrationFlow
definition 将与新的和现有的 DSL 运算符一起使用:
@Bean
public IntegrationFlow customFlowDefinition() {
return
new CustomIntegrationFlowDefinition()
.log()
.upperCaseAfterSplit()
.channel("innerChannel")
.customAggregate(customAggregatorSpec ->
customAggregatorSpec.expireGroupsUponCompletion(true))
.logAndReply();
}
集成流组合
使用MessageChannel
抽象作为 Spring Integration 中的一等公民,集成流的组成总是被假定的。
流中任何终端节点的输入通道都可用于从任何其他终端节点发送消息,而不仅仅是从将此通道作为输出的终端节点发送消息。
此外,使用@MessagingGateway
合约、内容扩充器组件、复合端点(如<chain>
,现在有了IntegrationFlow
Beans(例如IntegrationFlowAdapter
),在较短的、可重用的部分之间分配业务逻辑非常简单。
最终组合所需要的只是关于MessageChannel
以发送到或接收自。
从 version 开始5.5.4
,从MessageChannel
并对最终用户隐藏实现详细信息,IntegrationFlow
介绍from(IntegrationFlow)
factory 方法允许启动电流IntegrationFlow
从现有流的输出中:
@Bean
IntegrationFlow templateSourceFlow() {
return IntegrationFlow.fromSupplier(() -> "test data")
.channel("sourceChannel")
.get();
}
@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
return IntegrationFlow.from(templateSourceFlow)
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("compositionMainFlowResult"))
.get();
}
另一方面,IntegrationFlowDefinition
已添加to(IntegrationFlow)
terminal 运算符在一些其他流的输入通道上继续电流:
@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
return f -> f
.<String, String>transform(String::toUpperCase)
.to(otherFlow);
}
@Bean
IntegrationFlow otherFlow() {
return f -> f
.<String, String>transform(p -> p + " from other flow")
.channel(c -> c.queue("otherFlowResultChannel"));
}
流中间的组合可以通过现有的gateway(IntegrationFlow)
EIP 方法。
通过这种方式,我们可以从更简单、可重用的逻辑块组合流,从而构建任何复杂的流。
例如,您可以添加IntegrationFlow
beans 作为依赖项,只需将它们的配置类导入到最终项目中并为您的IntegrationFlow
定义。