Java DSL

Java DSL

Spring 集成 Java 配置和 DSL 提供了一组方便的构建器和一个流畅的 API,允许您配置来自 Spring 的 Spring 集成消息流@Configuration类。spring-doc.cadn.net.cn

用于 Spring Integration 的 Java DSL 本质上是 Spring Integration 的门面。 DSL 提供了一种简单的方法,通过使用 Fluent 将 Spring 集成消息流嵌入到你的应用程序中Builder模式与 Spring Framework 和 Spring Integration 中的现有 Java 配置一起运行。 我们还使用并支持 lambda(在 Java 8 中提供)来进一步简化 Java 配置。spring-doc.cadn.net.cn

这家咖啡馆提供了使用 DSL 的一个很好的例子。spring-doc.cadn.net.cn

DSL 由IntegrationFlowFluent API(请参阅IntegrationFlowBuilder). 这将生成IntegrationFlow组件,该组件应注册为 Spring Bean(通过使用@Bean注释)。 构建器模式用于将任意复杂的结构表示为可以接受 lambda 作为参数的方法层次结构。spring-doc.cadn.net.cn

IntegrationFlowBuilder仅收集集成组件 (MessageChannel实例AbstractEndpoint实例,依此类推),在IntegrationFlowbean 在应用程序上下文中进一步解析和注册具体 bean 的IntegrationFlowBeanPostProcessor.spring-doc.cadn.net.cn

Java DSL 直接使用 Spring 集成类,并绕过任何 XML 生成和解析。 然而,DSL 在 XML 之上提供的不仅仅是语法糖。 它最引人注目的功能之一是能够定义内联 lambda 来实现终端节点逻辑,无需外部类来实现自定义逻辑。 从某种意义上说,Spring 集成对 Spring 表达式语言 (SpEL) 和内联脚本的支持解决了这个问题,但 lambda 更简单、功能更强大。spring-doc.cadn.net.cn

下面的示例展示了如何使用 Java 配置进行 Spring 集成:spring-doc.cadn.net.cn

@Configuration
@EnableIntegration
public class MyConfiguration {

    @Bean
    public AtomicInteger integerSource() {
        return new AtomicInteger();
    }

    @Bean
    public IntegrationFlow myFlow() {
        return IntegrationFlow.fromSupplier(integerSource()::getAndIncrement,
                                         c -> c.poller(Pollers.fixedRate(100)))
                    .channel("inputChannel")
                    .filter((Integer p) -> p > 0)
                    .transform(Object::toString)
                    .channel(MessageChannels.queue())
                    .get();
    }
}

前面的配置示例的结果是,它在ApplicationContext启动、Spring 集成端点和消息通道。 Java 配置可用于替换和增强 XML 配置。 您无需替换所有现有的 XML 配置即可使用 Java 配置。spring-doc.cadn.net.cn

DSL 基础知识

org.springframework.integration.dslpackage 包含IntegrationFlowBuilderAPI 和一些IntegrationComponentSpec实现,它们也是构建器,并提供 Fluent API 来配置具体端点。 这IntegrationFlowBuilder基础设施为基于消息的应用程序(例如通道、终端节点、轮询程序和通道拦截器)提供常见的企业集成模式 (EIP)。spring-doc.cadn.net.cn

端点在 DSL 中表示为动词,以提高可读性。 以下列表包括常见的 DSL 方法名称和关联的 EIP Endpoint:spring-doc.cadn.net.cn

从概念上讲,集成流程是通过将这些端点组合成一个或多个消息流来构建的。 请注意,EIP 并未正式定义术语“消息流”,但将其视为使用众所周知的消息传递模式的工作单元是有用的。 DSL 提供了一个IntegrationFlow组件来定义通道和它们之间的端点的组合,但现在IntegrationFlow仅扮演配置角色以在应用程序上下文中填充真实 bean,而不在运行时使用。 但是,用于IntegrationFlow可以作为Lifecycle控制start()stop()对于整个流,它被委托给与此关联的所有 Spring 集成组件IntegrationFlow. 以下示例使用IntegrationFlowFluent API 来定义IntegrationFlowbean 使用 EIP 方法IntegrationFlowBuilder:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

transform方法接受 Lambda 作为终端节点参数以对消息负载进行作。 此方法的真正参数是GenericTransformer<S, T>实例。 因此,任何提供的转换器 (ObjectToJsonTransformer,FileToStringTransformer和其他)都可以在此处使用。spring-doc.cadn.net.cn

在被窝里,IntegrationFlowBuilder识别MessageHandler及其端点,使用MessageTransformingHandlerConsumerEndpointFactoryBean分别。 考虑另一个例子:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

前面的示例将Filter → Transformer → Service Activator. 流程是 “'单向'”。 也就是说,它不提供回复消息,而只将有效负载打印到 STDOUT。 终端节点使用直接通道自动连接在一起。spring-doc.cadn.net.cn

lambda 和Message<?>参数

在 EIP 方法中使用 lambda 表达式时,“input” 参数通常是消息负载。 如果您希望访问整个消息,请使用采用Class<?>作为第一个参数。 例如,this 将不起作用:spring-doc.cadn.net.cn

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

这将在运行时失败,并显示ClassCastException因为 Lambda 不保留参数类型,并且框架会尝试将有效负载转换为Message<?>.spring-doc.cadn.net.cn

相反,请使用:spring-doc.cadn.net.cn

.(Message.class, m -> newFooFromMessage(m))
Bean 定义覆盖

Java DSL 可以为流定义中内联定义的对象注册 bean,也可以重用现有的注入的 bean。 如果为内嵌对象和现有 bean 定义定义了相同的 bean 名称,则BeanDefinitionOverrideException表示此类配置错误。 但是,当您处理prototypebean 中,无法从集成流处理器中检测现有的 bean 定义,因为每次我们调用prototypebean 的BeanFactory我们得到一个新实例。 这样,在IntegrationFlow原样没有任何 bean 注册和对现有prototypebean 定义。 然而BeanFactory.initializeBean()如果此对象具有显式的id并且此名称的 bean 定义位于prototype范围。spring-doc.cadn.net.cn

消息通道

除了IntegrationFlowBuilder使用 EIP 方法,Java DSL 提供了一个 Fluent API 来配置MessageChannel实例。 为此,MessageChannelsbuilder factory 的 builder 工厂。 以下示例演示如何使用它:spring-doc.cadn.net.cn

@Bean
public MessageChannel priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap())
                        .get();
}

一样MessageChannelsbuilder factory 可以在channel()EIP 方法从IntegrationFlowBuilder连接端点,类似于连接input-channel/output-channel对。 默认情况下,端点与DirectChannelbean 名称基于以下模式的实例:[IntegrationFlow.beanName].channel#[channelNameIndex]. 此规则也适用于 inline 生成的未命名通道MessageChannelsbuilder factory 使用情况。 但是,所有MessageChannels方法具有一个变体,该变体知道channelId可用于设置MessageChannel实例。 这MessageChannelreferences 和beanName可以用作 Bean 方法调用。 以下示例显示了使用channel()弹性公网 IP 方式:spring-doc.cadn.net.cn

@Bean
public MessageChannel queueChannel() {
    return MessageChannels.queue().get();
}

@Bean
public MessageChannel publishSubscribe() {
    return MessageChannels.publishSubscribe().get();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlow.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input")的意思是“'查找并使用MessageChannel替换为 “input” id,或创建一个'”。spring-doc.cadn.net.cn

  • fixedSubscriberChannel()生成一个FixedSubscriberChannel并使用名称channelFlow.channel#0.spring-doc.cadn.net.cn

  • channel("queueChannel")工作方式相同,但使用现有的queueChannel豆。spring-doc.cadn.net.cn

  • channel(publishSubscribe())是 Bean 方法引用。spring-doc.cadn.net.cn

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor))IntegrationFlowBuilder这暴露了IntegrationComponentSpecExecutorChannel并将其注册为executorChannel.spring-doc.cadn.net.cn

  • channel("output")注册DirectChannelbean 替换为output作为其名称,只要不存在具有此名称的 bean。spring-doc.cadn.net.cn

注意:前面的IntegrationFlow定义有效,并且其所有通道都应用于具有BridgeHandler实例。spring-doc.cadn.net.cn

请小心使用相同的内联通道定义MessageChannels来自不同的工厂IntegrationFlow实例。 即使 DSL 解析器将不存在的对象注册为 bean,它也无法确定相同的对象(MessageChannel) 来自不同的IntegrationFlow器皿。 以下示例是错误的:
@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlow.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

该错误示例的结果是以下异常:spring-doc.cadn.net.cn

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

要使其正常工作,您需要声明@Bean对于该通道,并使用来自不同IntegrationFlow实例。spring-doc.cadn.net.cn

轮询器

Spring 集成还提供了一个 Fluent API,允许您配置PollerMetadataAbstractPollingEndpoint实现。 您可以使用Pollersbuilder factory 配置公共 bean 定义或从IntegrationFlowBuilderEIP 方法,如下例所示:spring-doc.cadn.net.cn

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500)
        .errorChannel("myErrors");
}

PollersPollerSpec在 Javadoc 中了解更多信息。spring-doc.cadn.net.cn

如果使用 DSL 构造PollerSpec作为@Bean,请不要调用get()方法。 这PollerSpec是一个FactoryBean这会生成PollerMetadataobject 并初始化其所有属性。

reactive()端点

从版本 5.5 开始,ConsumerEndpointSpec提供reactive()configuration 属性与可选的定制器Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>. 此选项将目标终端节点配置为ReactiveStreamsConsumer实例,独立于输入通道类型,该类型转换为Flux通过IntegrationReactiveUtils.messageChannelToFlux(). 提供的函数从Flux.transform()运算符进行自定义 (publishOn(),log(),doOnNext()等)来自 input 通道的反应式流源。spring-doc.cadn.net.cn

下面的示例演示了如何将发布线程从独立于最终订阅者和生产者的 input 通道更改为该DirectChannel:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .<String, Integer>transform(Integer::parseInt,
                    e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
            .get();
}

有关更多信息,请参阅 Reactive Streams Supportspring-doc.cadn.net.cn

DSL 和端点配置

IntegrationFlowBuilderEIP 方法有一个变体,该变体应用 lambda 参数来提供AbstractEndpoint实例:SmartLifecycle,PollerMetadata,request-handler-advice-chain等。 它们中的每一个都有通用参数,因此它允许您配置终端节点,甚至其MessageHandler在上下文中,如下例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow2() {
    return IntegrationFlow.from(this.inputChannel)
                .transform(new PayloadSerializingTransformer(),
                       c -> c.autoStartup(false).id("payloadSerializingTransformer"))
                .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
                .get();
}

此外,EndpointSpec提供了一个id()方法,让您使用给定的 bean 名称而不是生成的 bean 名称注册端点 bean。spring-doc.cadn.net.cn

如果MessageHandler作为 Bean 引用,则任何现有的adviceChain如果.advice()method 的 API 定义中:spring-doc.cadn.net.cn

@Bean
public TcpOutboundGateway tcpOut() {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(cf());
    gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
    return gateway;
}

@Bean
public IntegrationFlow clientTcpFlow() {
    return f -> f
        .handle(tcpOut(), e -> e.advice(testAdvice()))
        .transform(Transformers.objectToString());
}

它们不会被合并,只有testAdvice()在这种情况下使用 bean。spring-doc.cadn.net.cn

变形金刚

DSL API 提供了一个方便、流畅的TransformersFactory 中用作.transform()弹性公网 IP 方法。 以下示例演示如何使用它:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow transformFlow() {
    return IntegrationFlow.from("input")
            .transform(Transformers.fromJson(MyPojo.class))
            .transform(Transformers.serializer())
            .get();
}

它避免了使用 setter 进行不方便的编码,并使流定义更加简单。 请注意,您可以使用Transformers声明目标Transformer实例设置为@Bean实例,并再次从IntegrationFlow定义为 Bean 方法。 尽管如此,如果内联对象尚未定义为 bean,则 DSL 解析器会处理这些对象的 bean 声明。spring-doc.cadn.net.cn

有关更多信息和支持的工厂方法,请参阅 Javadoc 中的 Transformersspring-doc.cadn.net.cn

入站通道适配器

通常,消息流从入站通道适配器(例如<int-jdbc:inbound-channel-adapter>). 适配器配置了<poller>,它会要求MessageSource<?>定期生成消息。 Java DSL 允许启动IntegrationFlowMessageSource<?>太。 为此,IntegrationFlowFluent API 提供了一个重载的IntegrationFlow.from(MessageSource<?> messageSource)方法。 您可以配置MessageSource<?>作为 Bean 提供,并将其作为该方法的参数提供。 第二个参数IntegrationFlow.from()是一个Consumer<SourcePollingChannelAdapterSpec>Lambda 允许您提供选项(例如PollerMetadataSmartLifecycle) 的SourcePollingChannelAdapter. 以下示例演示如何使用 Fluent API 和 lambda 创建一个IntegrationFlow:spring-doc.cadn.net.cn

@Bean
public MessageSource<Object> jdbcMessageSource() {
    return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}

@Bean
public IntegrationFlow pollingFlow() {
    return IntegrationFlow.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
            .transform(Transformers.toJson())
            .channel("furtherProcessChannel")
            .get();
}

对于那些没有构建要求的情况Message对象,您可以使用IntegrationFlow.fromSupplier()变体,该变体基于java.util.function.Supplier. 结果Supplier.get()会自动包装在Message(如果它还不是Message).spring-doc.cadn.net.cn

消息路由器

Spring 集成原生提供了专门的路由器类型,包括:spring-doc.cadn.net.cn

与许多其他 DSL 一样IntegrationFlowBuilderEIP 方法、route()method 可以应用任何AbstractMessageRouterimplementation 或者为方便起见,使用String作为 SPEL 表达式或ref-method双。 此外,您还可以配置route()替换为 lambda,并将 lambda 用于Consumer<RouterSpec<MethodInvokingRouter>>. Fluent API 还提供AbstractMappingMessageRouter选项,例如channelMapping(String key, String channelName)对,如下例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routeFlowByLambda() {
    return IntegrationFlow.from("routerInput")
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.suffix("Channel")
                            .channelMapping(true, "even")
                            .channelMapping(false, "odd")
            )
            .get();
}

以下示例显示了一个简单的基于表达式的路由器:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routeFlowByExpression() {
    return IntegrationFlow.from("routerInput")
            .route("headers['destChannel']")
            .get();
}

routeToRecipients()method 采用Consumer<RecipientListRouterSpec>,如下例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow recipientListFlow() {
    return IntegrationFlow.from("recipientListInput")
            .<String, String>transform(p -> p.replaceFirst("Payload", ""))
            .routeToRecipients(r -> r
                    .recipient("thing1-channel", "'thing1' == payload")
                    .recipientMessageSelector("thing2-channel", m ->
                            m.getHeaders().containsKey("recipient")
                                    && (boolean) m.getHeaders().get("recipient"))
                    .recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
                            f -> f.<String, String>transform(String::toUpperCase)
                                    .channel(c -> c.queue("recipientListSubFlow1Result")))
                    .recipientFlow((String p) -> p.startsWith("thing3"),
                            f -> f.transform("Hello "::concat)
                                    .channel(c -> c.queue("recipientListSubFlow2Result")))
                    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                    "thing3".equals(m.getPayload())),
                            f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
                    .defaultOutputToParentFlow())
            .get();
}

.defaultOutputToParentFlow().routeToRecipients()definition 允许您设置路由器的defaultOutput作为网关,以继续处理 MAIN FLOW 中不匹配的消息。spring-doc.cadn.net.cn

分配器

要创建拆分器,请使用split()弹性公网 IP 方法。 默认情况下,如果有效负载是IterableIteratorArray一个Stream或反应式Publishersplit()方法将每个项目输出为单独的消息。 它接受 lambda、spEL 表达式或任何AbstractMessageSplitter实现。 或者,您也可以在不带参数的情况下使用它来提供DefaultMessageSplitter. 以下示例演示如何使用split()方法:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitFlow() {
    return IntegrationFlow.from("splitInput")
              .split(s -> s.applySequence(false).delimiters(","))
              .channel(MessageChannels.executor(taskExecutor()))
              .get();
}

前面的示例创建一个拆分器,该拆分器将包含逗号分隔的消息String.spring-doc.cadn.net.cn

聚合器和 Resequencer

Aggregator在概念上与Splitter. 它将一系列单独的消息聚合到一条消息中,并且必然更复杂。 默认情况下,聚合器返回一条消息,其中包含来自传入消息的有效负载集合。 相同的规则也适用于Resequencer. 以下示例显示了 splitter-aggregator 模式的规范示例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split()方法将列表拆分为单独的消息,并将它们发送到ExecutorChannel. 这resequence()method 按消息标头中找到的序列详细信息对消息进行重新排序。 这aggregate()method 收集这些消息。spring-doc.cadn.net.cn

但是,您可以通过指定发布策略和关联策略等来更改默认行为。 请考虑以下示例:spring-doc.cadn.net.cn

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

前面的示例将具有myCorrelationKeyheaders 并在至少累积了 10 封邮件后释放邮件。spring-doc.cadn.net.cn

resequence()弹性公网 IP 方法。spring-doc.cadn.net.cn

Service Activator 和.handle()方法

.handle()EIP 方法的目标是调用任何MessageHandlerimplementation 或任何方法。 另一种选择是使用 lambda 表达式定义 “活动”。 因此,我们引入了一个泛型GenericHandler<P>functional 接口。 其handlemethod 需要两个参数:P payloadMessageHeaders headers(从版本 5.1 开始)。 有了这个,我们可以定义一个 flow,如下所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("flow3Input")
        .<Integer>handle((p, h) -> p * 2)
        .get();
}

前面的示例将它接收的任何整数加倍。spring-doc.cadn.net.cn

但是,Spring 集成的一个主要目标是loose coupling,通过从消息负载到消息处理程序的目标参数的运行时类型转换。 由于 Java 不支持 lambda 类的泛型类型解析,因此我们引入了一种解决方法,即payloadType参数,以及LambdaMessageProcessor. 这样做会将硬转换工作委托给 Spring 的ConversionService,它使用提供的type以及请求的 message 到 target 方法参数。 以下示例显示了生成的IntegrationFlow可能看起来像:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<byte[], String>transform(p - > new String(p, "UTF-8"))
            .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

我们也可以注册一些BytesToIntegerConverterConversionService去掉那个额外的.transform():spring-doc.cadn.net.cn

@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
   return new BytesToIntegerConverter();
}

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
             .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

作员 gateway()

gateway()运算符包含在IntegrationFlowdefinition 是一种特殊的 Service Activator 实现,通过其 input 通道调用其他端点或集成流并等待回复。 从技术上讲,它与嵌套的<gateway>组件<chain>定义(请参阅从 Chain 中调用 Chain),并允许流更清晰、更直接。 从逻辑上和业务角度来看,它是一个消息传递网关,允许在目标集成解决方案的不同部分之间分发和重用功能(请参阅 消息传递网关)。 此运算符具有多个用于不同目标的重载:spring-doc.cadn.net.cn

  • gateway(String requestChannel)按其名称将消息发送到某个端点的 input 通道;spring-doc.cadn.net.cn

  • gateway(MessageChannel requestChannel)通过直接注入将消息发送到某个端点的 input 通道;spring-doc.cadn.net.cn

  • gateway(IntegrationFlow flow)将消息发送到提供的IntegrationFlow.spring-doc.cadn.net.cn

所有这些都与第二个Consumer<GatewayEndpointSpec>参数来配置目标GatewayMessageHandler和各自的AbstractEndpoint. 此外,IntegrationFlow-based methods允许调用现有的IntegrationFlowbean 或通过IntegrationFlowfunctional 接口或将其提取到privateMethod Cleaner 代码样式:spring-doc.cadn.net.cn

@Bean
IntegrationFlow someFlow() {
        return IntegrationFlow
                .from(...)
                .gateway(subFlow())
                .handle(...)
                .get();
}

private static IntegrationFlow subFlow() {
        return f -> f
                .scatterGather(s -> s.recipientFlow(...),
                        g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流并不总是返回回复,则应将requestTimeout设置为 0 以防止无限期挂起调用线程。 在这种情况将在该点结束,并释放线程以进行进一步的工作。

运算符 log()

为方便起见,要通过 Spring 集成流(<logging-channel-adapter>)、一个log()运算符。 在内部,它由WireTap ChannelInterceptor替换为LoggingHandler作为其订阅者。 它负责将传入消息记录到下一个终端节点或当前通道中。 以下示例演示如何使用LoggingHandler:spring-doc.cadn.net.cn

.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)

在前面的示例中,id标头记录在ERROR水平test.category仅适用于通过筛选条件且在路由之前的消息。spring-doc.cadn.net.cn

从版本 6.0 开始,此运算符在 flow 末尾的行为与其在 middle 中的用法一致。 换句话说,即使log()运算符。 因此,如果预计不会在流结束时生成回复,则nullChannel()建议在最后log().spring-doc.cadn.net.cn

运算符 intercept()

从版本 5.3 开始,intercept()operator 允许注册一个或多个ChannelInterceptor实例MessageChannel在流中。 这是创建显式MessageChannel通过MessageChannels应用程序接口。 以下示例使用MessageSelectingInterceptor要拒绝某些邮件并出现例外:spring-doc.cadn.net.cn

.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)

MessageChannelSpec.wireTap()

Spring 集成包括一个.wireTap()流畅的 APIMessageChannelSpec建设者。 以下示例演示如何使用wireTap记录输入的方法:spring-doc.cadn.net.cn

@Bean
public QueueChannelSpec myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input");
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

如果MessageChannelInterceptableChannellog(),wireTap()intercept()运算符应用于当前MessageChannel. 否则,中间DirectChannel注入到当前配置的终端节点的流中。 在以下示例中,WireTapinterceptor 被添加到myChannel直接,因为DirectChannel实现InterceptableChannel:spring-doc.cadn.net.cn

@Bean
MessageChannel myChannel() {
    return new DirectChannel();
}

...
    .channel(myChannel())
    .log()
}

当当前MessageChannel不实现InterceptableChannel,则隐式DirectChannelBridgeHandler注入到IntegrationFlowWireTap已添加到此新DirectChannel. 以下示例没有任何 channel 声明:spring-doc.cadn.net.cn

.handle(...)
.log()
}

在前面的示例中(以及任何时候未声明 channel),隐式的DirectChannel被注入到IntegrationFlow并用作当前配置的ServiceActivatingHandler(来自.handle(),如前所述)。spring-doc.cadn.net.cn

使用消息流

IntegrationFlowBuilder提供顶级 API 以生成连接到消息流的集成组件。 当您的集成可以通过单个流完成时(通常是这种情况),这很方便。 交互IntegrationFlow实例可以通过MessageChannel实例。spring-doc.cadn.net.cn

默认情况下,MessageFlow在 Spring 集成的说法中表现为“链”。 也就是说,端点由DirectChannel实例。 消息流实际上并不是作为一个链构建的,这提供了更大的灵活性。 例如,您可以向流中的任何组件发送消息,前提是您知道其inputChannel名称(即,如果您明确定义了它)。 您还可以在流中引用外部定义的通道,以允许使用通道适配器(以启用远程传输协议、文件 I/O 等),而不是直接通道。 因此,DSL 不支持 Spring 集成chain元素,因为它在这种情况下不会增加太多价值。spring-doc.cadn.net.cn

由于 Spring 集成 Java DSL 生成与任何其他配置选项相同的 bean 定义模型,并且基于现有的 Spring 框架@Configuration基础结构,它可以与 XML 定义一起使用,并与 Spring Integration 消息传递注释配置连接。spring-doc.cadn.net.cn

您还可以定义 directIntegrationFlow实例。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow lambdaFlow() {
    return f -> f.filter("World"::equals)
                   .transform("Hello "::concat)
                   .handle(System.out::println);
}

此定义的结果是与隐式直接通道连接的同一组集成组件。 此处的唯一限制是此流从命名的直接渠道 -lambdaFlow.input. 此外,Lambda 流不能从MessageSourceMessageProducer.spring-doc.cadn.net.cn

从版本 5.1 开始,这种IntegrationFlow包装到代理中,以公开生命周期控制并提供对inputChannel内部关联的StandardIntegrationFlow.spring-doc.cadn.net.cn

从版本 5.0.6 开始,为IntegrationFlow包括 Flow Bean,后跟一个点 (.) 作为前缀。 例如,ConsumerEndpointFactoryBean对于.transform("Hello "::concat)在前面的示例中,将生成 Bean 名称lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0. (该o.s.iorg.springframework.integration以适应页面。 这Transformer该端点的实现 Bean 名称为lambdaFlow.transformer#0(从版本 5.1 开始),其中MethodInvokingTransformerclass 时,会使用其组件类型。 相同的模式适用于所有NamedComponents。 这些生成的 Bean 名称前面加上流 ID,用于解析日志或在某些分析工具中将组件分组在一起,以及避免在运行时同时注册集成流时出现争用情况。 有关更多信息,请参阅 动态和运行时集成流spring-doc.cadn.net.cn

FunctionExpression

我们引入了FunctionExpression类(SPEL 的Expression接口),让我们使用 lambda 表达式和generics. 这Function<T, R>选项,以及expression选项,当存在隐式Strategy变体。 以下示例演示如何使用函数表达式:spring-doc.cadn.net.cn

.enrich(e -> e.requestChannel("enrichChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("date", m -> new Date()))

FunctionExpression还支持运行时类型转换,就像SpelExpression.spring-doc.cadn.net.cn

子流支持

部分if…​elsepublish-subscribe组件提供了使用子流指定其逻辑或映射的功能。 最简单的示例是.publishSubscribeChannel(),如下例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

您可以使用单独的IntegrationFlow @Bean定义,但我们希望你发现 logic composition 的 sub-flow 风格很有用。 我们发现它会导致更短(因此更具可读性)的代码。spring-doc.cadn.net.cn

从版本 5.3 开始,BroadcastCapableChannel-基于publishSubscribeChannel()提供 implementation 以在 broker 支持的消息通道上配置子流订阅者。 例如,我们现在可以将多个订阅者配置为Jms.publishSubscribeChannel():spring-doc.cadn.net.cn

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub")
                .get();
}

@Bean
public IntegrationFlow pubSubFlow() {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel(),
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
    return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
            .destination("pubsub")
            .get();
}

类似的publish-subscribesub-flow 组合提供.routeToRecipients()方法。spring-doc.cadn.net.cn

另一个例子是使用.discardFlow()而不是.discardChannel().filter()方法。spring-doc.cadn.net.cn

.route()值得特别关注。 请考虑以下示例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

.channelMapping()继续像在常规Routermapping 的 map,但.subFlowMapping()将该子流绑定到 Main Flow。 换句话说,任何路由器的子流在.route().spring-doc.cadn.net.cn

有时,您需要引用现有的IntegrationFlow @Bean.subFlowMapping(). 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


在这种情况下,当您需要接收来自此类子流的回复并继续主流时,此IntegrationFlowbean 引用(或其输入通道)必须用.gateway()如前面的示例所示。 这oddFlow()reference 未包装到.gateway(). 因此,我们不希望此路由分支回复。 否则,您最终会得到类似于以下内容的异常:spring-doc.cadn.net.cn

Caused by: org.springframework.beans.factory.BeanCreationException:
    The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
    is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
    This is the end of the integration flow.

当您将子流配置为 lambda 时,框架会处理与子流的请求-回复交互,并且不需要网关。spring-doc.cadn.net.cn

子流可以嵌套到任何深度,但我们不建议这样做。 事实上,即使在路由器的情况下,在流中添加复杂的子流也会很快开始看起来像一盘意大利面,人类很难解析。spring-doc.cadn.net.cn

在 DSL 支持子流配置的情况下,当正在配置的组件通常需要一个通道,并且该子流以channel()元素中,框架会隐式地放置一个bridge()在 Component Output Channel 和 Flow 的 Input Channel 之间。 例如,在这个filter定义:spring-doc.cadn.net.cn

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

框架在内部创建一个DirectChannelbean 中注入到MessageFilter.discardChannel. 然后,它将子流包装到IntegrationFlow从订阅的这个隐式通道开始,并放置一个bridgechannel()在流中指定。 当现有的IntegrationFlowbean 用作子流引用(而不是内联子流,例如 lambda),则不需要这样的桥,因为框架可以解析流 bean 中的第一个通道。 对于内联子流,输入通道尚不可用。spring-doc.cadn.net.cn

使用协议适配器

到目前为止显示的所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型来支持消息传递体系结构。 但是,我们还没有做任何真正的集成。 这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源,或者访问本地文件系统。 Spring 集成支持所有这些以及更多。 理想情况下,DSL 应该为所有这些提供一流的支持,但是实现所有这些并跟上 Spring Integration 中新适配器的添加是一项艰巨的任务。 因此,期望 DSL 不断赶上 Spring 集成。spring-doc.cadn.net.cn

因此,我们提供了高级 API 来无缝定义特定于协议的消息收发。 我们使用 Factory 和 Builder 模式以及 lambda 来执行此作。 你可以将工厂类视为“名称空间工厂”,因为它们与来自具体协议特定的 Spring 集成模块的组件的 XML 名称空间起着相同的作用。 目前,Spring 集成 Java DSL 支持Amqp,Feed,Jms,Files,(S)Ftp,Http,JPA,MongoDb,TCP/UDP,Mail,WebFluxScripts命名空间工厂。 以下示例演示如何使用其中的三个 (Amqp,JmsMail):spring-doc.cadn.net.cn

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlow.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlow.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

前面的示例展示了如何使用“命名空间工厂”作为内联适配器声明。 但是,您可以从@Beandefinitions 进行IntegrationFlow方法链更具可读性。spring-doc.cadn.net.cn

在将精力投入到其他命名空间工厂之前,我们会征求社区对这些命名空间工厂的反馈。 我们也感谢对我们接下来应该支持的适配器和网关的优先级的任何意见。

您可以在本参考手册中特定于协议的章节中找到更多 Java DSL 示例。spring-doc.cadn.net.cn

所有其他协议通道适配器都可以配置为通用 bean 并连接到IntegrationFlow,如下例所示:spring-doc.cadn.net.cn

@Bean
public QueueChannelSpec wrongMessagesChannel() {
    return MessageChannels
            .queue()
            .wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
    return IntegrationFlow.from("inputChannel")
            .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
                    e -> e.discardChannel(wrongMessagesChannel))
            .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
            .route(xpathRouter(wrongMessagesChannel))
            .get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
    XPathRouter router = new XPathRouter("local-name(/*)");
    router.setEvaluateAsString(true);
    router.setResolutionRequired(false);
    router.setDefaultOutputChannel(wrongMessagesChannel);
    router.setChannelMapping("Tags", "splittingChannel");
    router.setChannelMapping("Tag", "receivedChannel");
    return router;
}

IntegrationFlowAdapter

IntegrationFlowinterface 可以直接实现并指定为 scanning 组件,如下例所示:spring-doc.cadn.net.cn

@Component
public class MyFlow implements IntegrationFlow {

    @Override
    public void configure(IntegrationFlowDefinition<?> f) {
        f.<String, String>transform(String::toUpperCase);
    }

}

它由IntegrationFlowBeanPostProcessor并在应用程序上下文中正确解析和注册。spring-doc.cadn.net.cn

为了方便起见并获得松散耦合架构的好处,我们提供了IntegrationFlowAdapter基类实现。 它需要一个buildFlow()方法实现以生成IntegrationFlowDefinition通过使用from()方法,如下例所示:spring-doc.cadn.net.cn

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

    private final AtomicBoolean invoked = new AtomicBoolean();

    public Date nextExecutionTime(TriggerContext triggerContext) {
          return this.invoked.getAndSet(true) ? null : new Date();
    }

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(this::messageSource,
                      e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                 .split(this)
                 .transform(this)
                 .aggregate(a -> a.processor(this, null), null)
                 .enrichHeaders(Collections.singletonMap("thing1", "THING1"))
                 .filter(this)
                 .handle(this)
                 .channel(c -> c.queue("myFlowAdapterOutput"));
    }

    public String messageSource() {
         return "T,H,I,N,G,2";
    }

    @Splitter
    public String[] split(String payload) {
         return StringUtils.commaDelimitedListToStringArray(payload);
    }

    @Transformer
    public String transform(String payload) {
         return payload.toLowerCase();
    }

    @Aggregator
    public String aggregate(List<String> payloads) {
           return payloads.stream().collect(Collectors.joining());
    }

    @Filter
    public boolean filter(@Header Optional<String> thing1) {
            return thing1.isPresent();
    }

    @ServiceActivator
    public String handle(String payload, @Header String thing1) {
           return payload + ":" + thing1;
    }

}

动态和运行时集成流

IntegrationFlow并且其所有依赖组件都可以在运行时注册。 在 5.0 版本之前,我们使用了BeanFactory.registerSingleton()钩。 从 Spring 框架开始5.0,我们使用instanceSupplier程序化的钩子BeanDefinition注册。 下面的示例展示了如何以编程方式注册一个 Bean:spring-doc.cadn.net.cn

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

请注意,在前面的示例中,instanceSupplierhook 是genericBeanDefinition方法,在本例中由 Lambda 提供。spring-doc.cadn.net.cn

所有必要的 bean 初始化和生命周期都是自动完成的,就像使用标准上下文配置 bean 定义一样。spring-doc.cadn.net.cn

为了简化开发体验,Spring Integration 引入了IntegrationFlowContext注册和管理IntegrationFlow实例,如下例所示:spring-doc.cadn.net.cn

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

当我们有多个配置选项并且必须创建类似流的多个实例时,这非常有用。 为此,我们可以迭代我们的选项并创建和注册IntegrationFlow实例。 另一种变体是当我们的数据源不是基于 Spring 的,因此我们必须动态创建它。 这样的示例是 Reactive Streams 事件源,如下例所示:spring-doc.cadn.net.cn

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlow.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

IntegrationFlowRegistrationBuilder(由于IntegrationFlowContext.registration()) 可用于指定IntegrationFlow注册,控制其autoStartup,并注册非 Spring 集成 bean。 通常,这些额外的 bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和反序列化器或任何其他必需的支持组件。spring-doc.cadn.net.cn

您可以使用IntegrationFlowRegistration.destroy()callback 删除动态注册的IntegrationFlow以及它的所有依赖的 bean。 请参阅IntegrationFlowContextJavadoc了解更多信息。spring-doc.cadn.net.cn

从版本 5.0.6 开始,所有生成的 bean 名称都位于IntegrationFlowdefinition 的前缀为流 ID 作为前缀。 我们建议始终指定显式流 ID。 否则,将在IntegrationFlowContext生成 Bean 名称,以便为IntegrationFlow并注册其 bean。 我们在这两个作上进行同步,以避免当相同的生成的 bean 名称可能用于不同的 bean 时出现竞争条件IntegrationFlow实例。

此外,从版本 5.0.6 开始,注册构建器 API 有一个新方法:useFlowIdAsPrefix(). 如果您希望声明同一流的多个实例,并在流中的组件具有相同的 ID 时避免 bean 名称冲突,这将非常有用,如下例所示:spring-doc.cadn.net.cn

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

在这种情况下,可以使用名称为tcp1.client.handler.spring-doc.cadn.net.cn

id属性是必需的useFlowIdAsPrefix().

IntegrationFlow作为网关

IntegrationFlow可以从提供GatewayProxyFactoryBean组件,如下例所示:spring-doc.cadn.net.cn

public interface ControlBusGateway {

    void send(String command);
}

...

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlow.from(ControlBusGateway.class)
            .controlBus()
            .get();
}

接口方法的所有代理都随通道一起提供,用于将消息发送到IntegrationFlow. 您可以使用@MessagingGateway注解,并使用@Gateway附注。 尽管如此,requestChannel被忽略并被IntegrationFlow. 否则,使用IntegrationFlow这没有意义。spring-doc.cadn.net.cn

默认情况下,GatewayProxyFactoryBean获取约定的 bean 名称,例如[FLOW_BEAN_NAME.gateway]. 您可以使用@MessagingGateway.name()属性或重载的IntegrationFlow.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)Factory 方法。 此外,来自@MessagingGateway注解应用于目标GatewayProxyFactoryBean. 当 annotation 配置不适用时,Consumer<GatewayProxySpec>variant 可用于为目标代理提供适当的选项。 此 DSL 方法从版本 5.2 开始可用。spring-doc.cadn.net.cn

在 Java 8 中,您甚至可以使用java.util.function接口,如下例所示:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow errorRecovererFlow() {
    return IntegrationFlow.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
            .<Object>handle((p, h) -> {
                throw new RuntimeException("intentional");
            }, e -> e.advice(retryAdvice()))
            .get();
}

errorRecovererFlow可以按如下方式使用:spring-doc.cadn.net.cn

@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;

DSL 扩展

从版本 5.3 开始,IntegrationFlowExtension以允许使用自定义或组合的 EIP 运算符扩展现有 Java DSL。 所需要的只是这个类的扩展,它提供可以在IntegrationFlowbean 定义。 扩展类也可以用于自定义IntegrationComponentSpec配置;例如,missed 或 default 选项可以在现有的IntegrationComponentSpec外延。 下面的示例演示了复合自定义运算符和AggregatorSpec默认自定义outputProcessor:spring-doc.cadn.net.cn

public class CustomIntegrationFlowDefinition
        extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {

    public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
        return split()
                .transform("payload.toUpperCase()");
    }

    public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
        return register(new CustomAggregatorSpec(), aggregator);
    }

}

public class CustomAggregatorSpec extends AggregatorSpec {

    CustomAggregatorSpec() {
        outputProcessor(group ->
                group.getMessages()
                        .stream()
                        .map(Message::getPayload)
                        .map(String.class::cast)
                        .collect(Collectors.joining(", ")));
    }

}

对于方法链流,这些扩展中的新 DSL 运算符必须返回扩展类。 这样,目标IntegrationFlowdefinition 将与新的和现有的 DSL 运算符一起使用:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow customFlowDefinition() {
    return
            new CustomIntegrationFlowDefinition()
                    .log()
                    .upperCaseAfterSplit()
                    .channel("innerChannel")
                    .customAggregate(customAggregatorSpec ->
                            customAggregatorSpec.expireGroupsUponCompletion(true))
                    .logAndReply();
}

集成流组合

使用MessageChannel抽象作为 Spring Integration 中的一等公民,集成流的组成总是被假定的。 流中任何终端节点的输入通道都可用于从任何其他终端节点发送消息,而不仅仅是从将此通道作为输出的终端节点发送消息。 此外,使用@MessagingGateway合约、内容扩充器组件、复合端点(如<chain>,现在有了IntegrationFlowBeans(例如IntegrationFlowAdapter),在较短的、可重用的部分之间分配业务逻辑非常简单。 最终组合所需要的只是关于MessageChannel以发送到或接收自。spring-doc.cadn.net.cn

从 version 开始5.5.4,从MessageChannel并对最终用户隐藏实现详细信息,IntegrationFlow介绍from(IntegrationFlow)factory 方法允许启动电流IntegrationFlow从现有流的输出中:spring-doc.cadn.net.cn

@Bean
IntegrationFlow templateSourceFlow() {
    return IntegrationFlow.fromSupplier(() -> "test data")
            .channel("sourceChannel")
            .get();
}

@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
    return IntegrationFlow.from(templateSourceFlow)
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("compositionMainFlowResult"))
            .get();
}

另一方面,IntegrationFlowDefinition已添加to(IntegrationFlow)terminal 运算符在一些其他流的输入通道上继续电流:spring-doc.cadn.net.cn

@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
    return f -> f
            .<String, String>transform(String::toUpperCase)
            .to(otherFlow);
}

@Bean
IntegrationFlow otherFlow() {
    return f -> f
            .<String, String>transform(p -> p + " from other flow")
            .channel(c -> c.queue("otherFlowResultChannel"));
}

流中间的组合可以通过现有的gateway(IntegrationFlow)EIP 方法。 通过这种方式,我们可以从更简单、可重用的逻辑块组合流,从而构建任何复杂的流。 例如,您可以添加IntegrationFlowbeans 作为依赖项,只需将它们的配置类导入到最终项目中并为您的IntegrationFlow定义。spring-doc.cadn.net.cn