对于最新的稳定版本,请使用 Spring Integration 6.3.1

对于最新的稳定版本,请使用 Spring Integration 6.3.1

网关隐藏了 Spring Integration 提供的消息传递 API。 它使应用程序的业务逻辑无法识别Spring Integration API。 通过使用通用网关,您的代码仅与简单接口进行交互。

输入GatewayProxyFactoryBean

如前所述,如果不依赖 Spring Integration API(包括网关类)就太好了。 出于这个原因,Spring Integration 提供了 ,它为任何接口生成代理,并在内部调用如下所示的网关方法。 通过使用依赖关系注入,您可以将接口公开给业务方法。GatewayProxyFactoryBean

以下示例显示了可用于与 Spring Integration 交互的接口:

public interface Cafe {

    void placeOrder(Order order);

}

网关 XML 命名空间支持

还提供命名空间支持。 它允许您将接口配置为服务,如以下示例所示:

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

定义此配置后,现在可以将其注入到其他 bean 中,并且在该接口的代理实例上调用方法的代码对 Spring Integration API 没有感知。 有关使用该元素的示例(在 Cafe 演示中),请参阅“示例”附录。cafeServiceCafegateway

上述配置中的默认值适用于网关接口上的所有方法。 如果未指定应答超时,则调用线程将等待应答 30 秒。 请参阅未响应到达时的网关行为

可以覆盖单个方法的默认值。 请参阅使用注释和 XML 的网关配置

设置默认回复通道

通常,您不需要指定 ,因为网关会自动创建一个临时的匿名应答通道,并在其中侦听应答。 但是,在某些情况下,可能会提示您定义(或使用适配器网关,例如 HTTP、JMS 等)。default-reply-channeldefault-reply-channelreply-channel

对于一些背景信息,我们简要讨论了网关的一些内部工作原理。 网关创建临时点对点回复通道。 它是匿名的,并添加到名为 的邮件头中。 提供显式(使用远程适配器网关)时,可以指向发布-订阅通道,该通道之所以如此命名,是因为您可以向其添加多个订阅者。 在内部,Spring Integration 在临时和显式定义之间创建了一座桥梁。replyChanneldefault-reply-channelreply-channelreplyChanneldefault-reply-channel

假设您希望您的回复不仅发送到网关,还发送给其他使用者。 在这种情况下,您需要两件事:

  • 您可以订阅的命名频道

  • 该频道将成为发布-订阅-频道

网关使用的默认策略不能满足这些需求,因为添加到标头的回复通道是匿名的,并且是点对点的。 这意味着没有其他订阅者可以获取它的句柄,即使可以,该频道也具有点对点行为,因此只有一个订阅者会收到消息。 通过定义一个,您可以指向您选择的通道。 在本例中,这是一个 . 网关创建从它到存储在标头中的临时匿名回复通道的桥接。default-reply-channelpublish-subscribe-channel

您可能还希望显式提供应答通道,以便通过拦截器(例如,窃听)进行监视或审核。 要配置信道拦截器,您需要一个命名信道。

从 V5.4 开始,当网关方法返回类型为 时,如果未显式提供此类头,则框架会将此类头填充为 Bean 引用。 这允许丢弃来自下游流的任何可能的回复,从而满足单向网关协定。voidreplyChannelnullChannel
从 V5.4 开始,当网关方法返回类型为 时,如果未显式提供此类头,则框架会将此类头填充为 Bean 引用。 这允许丢弃来自下游流的任何可能的回复,从而满足单向网关协定。voidreplyChannelnullChannel

使用注释和 XML 的网关配置

请考虑以下示例,该示例通过添加注释来扩展上一个接口示例:Cafe@Gateway

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

批注允许您添加被解释为邮件头的值,如以下示例所示:@Header

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

如果更喜欢 XML 方法来配置网关方法,则可以将元素添加到网关配置中,如以下示例所示:method

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

还可以使用 XML 为每个方法调用提供单独的标头。 如果要设置的标头本质上是静态的,并且您不希望使用注释将它们嵌入到网关的方法签名中,这可能很有用。 例如,在贷款经纪人示例中,我们希望根据发起的请求类型(单个报价或所有报价)来影响贷款报价的聚合方式。 通过评估调用的网关方法来确定请求的类型(尽管可能)会违反关注点分离范式(该方法是 Java 工件)。 但是,在消息标题中表达您的意图(元信息)在消息传递体系结构中是很自然的。 下面的示例演示如何为两种方法中的每种方法添加不同的邮件头:@Header

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

在前面的示例中,根据网关的方法为“RESPONSE_TYPE”标头设置了不同的值。

例如,如果指定 in 和 in 注释,则注释值优先。requestChannel<int:method/>@Gateway
如果在 XML 中指定了无参数网关,并且接口方法同时具有 a 和 annotation(元素中带有 a 或 a),则忽略该值。@Payload@GatewaypayloadExpressionpayload-expression<int:method/>@Payload

表达式和“全局”标头

该元素支持作为 的替代方法。 计算 SpEL 表达式以确定标头的值。 从 V5.2 开始,评估上下文的对象是 with 和 访问器。 例如,如果希望在简单方法名称上进行路由,则可以添加具有以下表达式的标头:.<header/>expressionvalue#rootMethodArgsHoldergetMethod()getArgs()method.name

不可序列化。 如果以后序列化邮件,则带有表达式 的标头将丢失。 因此,在这些情况下,您可能希望使用 or。 该方法提供方法的表示形式,包括参数和返回类型。java.reflect.Methodmethodmethod.namemethod.toString()toString()String

从版本 3.0 开始,可以定义元素以将标头添加到网关生成的所有消息中,而不管调用的方法如何。 为方法定义的特定标头优先于默认标头。 此处为方法定义的特定标头将覆盖服务接口中的任何注释。 但是,默认标头不会覆盖服务接口中的任何注释。<default-header/>@Header@Header

网关现在还支持 ,该方法适用于所有方法(除非被覆盖)。default-payload-expression

例如,如果指定 in 和 in 注释,则注释值优先。requestChannel<int:method/>@Gateway
如果在 XML 中指定了无参数网关,并且接口方法同时具有 a 和 annotation(元素中带有 a 或 a),则忽略该值。@Payload@GatewaypayloadExpressionpayload-expression<int:method/>@Payload
不可序列化。 如果以后序列化邮件,则带有表达式 的标头将丢失。 因此,在这些情况下,您可能希望使用 or。 该方法提供方法的表示形式,包括参数和返回类型。java.reflect.Methodmethodmethod.namemethod.toString()toString()String

将方法参数映射到消息

使用上一节中的配置技术可以控制方法参数如何映射到消息元素(有效负载和标头)。 如果未使用显式配置,则使用某些约定来执行映射。 在某些情况下,这些约定无法确定哪个参数是有效负载,哪些参数应映射到标头。 请看以下示例:

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是 ),第二个参数的内容成为标头。Map

在第二种情况下(或第一种情况下,当参数的参数为 时),框架无法确定哪个参数应该是有效负载。 因此,映射失败。 这通常可以使用 、 注释 或 注释来解决。thing1Mappayload-expression@Payload@Headers

或者(每当约定中断时),您可以承担将方法调用映射到消息的全部责任。 为此,请使用该属性实现 并将其提供给 。 映射器映射 ,这是一个包装实例和包含参数的简单类。 提供自定义映射器时,网关上不允许使用属性和元素。 同样,任何元素都不允许使用属性和元素。MethodArgsMessageMapper<gateway/>mapperMethodArgsHolderjava.reflect.MethodObject[]default-payload-expression<default-header/>payload-expression<header/><method/>

映射方法参数

以下示例演示如何将方法参数映射到消息,并显示一些无效配置的示例:

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("args[0] + args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  (1)

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
1 请注意,在此示例中,SpEL 变量 ,指的是参数 — 在本例中为 的值。#thiss

XML 等效项看起来略有不同,因为没有方法参数的上下文。 但是,表达式可以通过使用根对象的属性来引用方法参数(有关详细信息,请参阅表达式和“全局”标头),如以下示例所示:#thisargsMethodArgsHolder

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
  <int:method name="send3" payload-expression="method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="args[2].toUpperCase()"/>
  </int:method>
</int:gateway>
1 请注意,在此示例中,SpEL 变量 ,指的是参数 — 在本例中为 的值。#thiss

@MessagingGateway注解

从版本 4.0 开始,网关服务接口可以用注释进行标记,而不需要定义 xml 元素进行配置。 以下一对示例比较了配置同一网关的两种方法:@MessagingGateway<gateway />

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}
与XML版本类似,当Spring Integration在组件扫描期间发现这些注释时,它会使用其消息传递基础结构创建实现。 若要执行此扫描并在应用程序上下文中注册,请将批注添加到类中。 标准基础结构不处理接口。 因此,我们引入了自定义逻辑来查找接口上的注释并为它们注册实例。 另请参阅注释支持proxyBeanDefinition@IntegrationComponentScan@Configuration@ComponentScan@IntegrationComponentScan@MessagingGatewayGatewayProxyFactoryBean

除了注释之外,您还可以使用注释标记服务接口,以避免在此类配置文件未处于活动状态时创建 Bean。@MessagingGateway@Profile

从 6.0 版开始,与 的接口也可以用相应配置逻辑的注释来标记,就像任何 Spring 定义一样。@MessagingGateway@Primary@Component

从版本 6.0 开始,可以在标准 Spring 配置中使用接口。 这可以用作 或 手动 Bean 定义的替代方法。@MessagingGateway@Import@IntegrationComponentScanAnnotationGatewayProxyFactoryBean

该属性使用 since 版本进行元注释,并且该属性本质上是 . 这样,网关代理的 Bean 名称生成策略将与扫描和导入组件的标准 Spring 注释配置重新对齐。 默认值可以通过 or 作为属性全局覆盖。@MessagingGateway@MessageEndpoint6.0name()@Compnent.value()AnnotationBeanNameGeneratorAnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR@IntegrationComponentScan.nameGenerator()

如果没有 XML 配置,则至少一个类需要注释。 有关详细信息,请参阅配置和@EnableIntegration@EnableIntegration@Configuration
与XML版本类似,当Spring Integration在组件扫描期间发现这些注释时,它会使用其消息传递基础结构创建实现。 若要执行此扫描并在应用程序上下文中注册,请将批注添加到类中。 标准基础结构不处理接口。 因此,我们引入了自定义逻辑来查找接口上的注释并为它们注册实例。 另请参阅注释支持proxyBeanDefinition@IntegrationComponentScan@Configuration@ComponentScan@IntegrationComponentScan@MessagingGatewayGatewayProxyFactoryBean
如果没有 XML 配置,则至少一个类需要注释。 有关详细信息,请参阅配置和@EnableIntegration@EnableIntegration@Configuration

调用无参数方法

在没有任何参数的网关接口上调用方法时,默认行为是从 .MessagePollableChannel

但是,有时您可能希望触发无参数方法,以便可以与不需要用户提供的参数的下游其他组件进行交互,例如触发无参数 SQL 调用或存储过程。

若要实现发送和接收语义,必须提供有效负载。 要生成有效负载,不需要接口上的方法参数。 您可以在元素上使用 XML 中的批注或属性。 以下列表包括有效负载可能是什么的几个示例:@Payloadpayload-expressionmethod

  • 文本字符串

  • #gatewayMethod.name

  • 新的java.util.Date()

  • @someBean.someMethod() 的返回值

以下示例演示如何使用批注:@Payload

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

您也可以使用批注。@Gateway

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}
如果两个注释都存在(并且提供了注释),则获胜。payloadExpression@Gateway

如果方法没有参数和返回值,但包含有效负载表达式,则将其视为仅发送操作。

如果两个注释都存在(并且提供了注释),则获胜。payloadExpression@Gateway

调用方法default

网关代理的接口也可能有方法,从版本 5.3 开始,框架将一个注入到代理中,以便使用方法而不是代理来调用方法。 JDK 中的接口(如 )仍可用于网关代理,但由于内部 Java 安全原因,无法调用它们的方法,以便针对 JDK 类进行实例化。 还可以使用方法上的显式注释或注释或 XML 组件来代理这些方法(丢失其实现逻辑,同时恢复以前的网关代理行为)。defaultDefaultMethodInvokingMethodInterceptordefaultjava.lang.invoke.MethodHandlejava.util.function.FunctiondefaultMethodHandles.Lookup@GatewayproxyDefaultMethods@MessagingGateway<gateway>

错误处理

网关调用可能会导致错误。 默认情况下,下游发生的任何错误都会在网关的方法调用时“按原样”重新抛出。 例如,请考虑以下简单流程:

gateway -> service-activator

例如,如果服务激活器调用的服务抛出 (),则框架会将其包装在 中,并将传递给属性中的服务激活器的消息附加。 因此,框架执行的任何日志记录都包含失败的完整上下文。 默认情况下,当网关捕获异常时,将解包并抛给调用方。 您可以在网关方法声明上配置子句,以匹配原因链中的特定异常类型。 例如,如果要捕获包含下游错误原因的所有消息传递信息的整体,则应具有类似于以下内容的网关方法:MyExceptionMessagingExceptionfailedMessageMyExceptionthrowsMessagingException

public interface MyGateway {

    void performProcess() throws MessagingException;

}

由于我们鼓励 POJO 编程,因此您可能不希望将呼叫者暴露给消息传递基础结构。

如果网关方法没有子句,则网关将遍历原因树,查找不是 . 如果未找到任何内容,则框架将抛出 . 如果前面讨论中的原因为 和 您的方法 ,网关将进一步解包并将其抛给调用方。throwsRuntimeExceptionMessagingExceptionMessagingExceptionMyExceptionSomeOtherExceptionthrows SomeOtherException

当网关声明为 no 时,将使用内部框架接口。service-interfaceRequestReplyExchanger

请看以下示例:

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

在版本 5.0 之前,此方法没有子句,因此,异常被解包。 如果您使用此接口并希望恢复以前的解包行为,请改用自定义或访问您自己的。exchangethrowsservice-interfacecauseMessagingException

但是,您可能希望记录错误而不是传播它,或者您可能希望将异常视为有效回复(通过将其映射到符合调用方理解的某些“错误消息”协定的消息)。 为此,网关通过包含对属性的支持来提供专用于错误的消息通道的支持。 在以下示例中,“transformer”从以下位置创建回复:error-channelMessageException

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。 这将成为发送回调用方的有效负载。 如有必要,您可以在这样的“错误流”中做更多更复杂的事情。 它可能涉及路由器(包括 Spring Integration)、过滤器等。 然而,大多数时候,一个简单的“变压器”就足够了。exceptionTransformerErrorMessageExceptionTypeRouter

或者,您可能只想记录异常(或将其异步发送到某个位置)。 如果提供单向流,则不会将任何内容发送回调用方。 如果要完全禁止异常,可以提供对全局的引用(本质上是一种方法)。 最后,如上所述,如果定义了 no,则异常将照常传播。nullChannel/dev/nullerror-channel

使用注释时(请参阅@MessagingGateway的注释属性。@MessagingGateway), you can use an `errorChannel

从 V5.0 开始,当您使用具有返回类型(单向流)的网关方法时,引用(如果提供)将填充在每条已发送消息的标准标头中。 此功能允许基于标准配置(或 )的下游异步流覆盖默认的全局异常发送行为。 以前,您必须手动指定带有注释或元素的标题。 对于具有异步流的方法,将忽略该属性。 相反,错误消息被发送到默认的 .voiderror-channelerrorChannelExecutorChannelQueueChannelerrorChannelerrorChannel@GatewayHeader<header>error-channelvoiderrorChannel

通过简单的 POJI 网关公开消息传递系统会带来好处,但“隐藏”底层消息传递系统的现实确实是有代价的,因此您应该考虑某些事情。 我们希望我们的 Java 方法尽可能快地返回,而不是在调用方等待它返回时无限期挂起(无论是 void、返回值还是抛出的异常)。 当常规方法用作消息传递系统前面的代理时,我们必须考虑底层消息传递的潜在异步性质。 这意味着,由网关发起的消息可能会被过滤器丢弃,并且永远不会到达负责生成回复的组件。 某些服务激活器方法可能会导致异常,从而不提供任何回复(因为我们不生成 null 消息)。 换言之,多种情况都可能导致回复消息永远不会出现。 这在消息传递系统中是完全自然的。 但是,请考虑对网关方法的影响。 网关的方法输入参数被合并到消息中并发送到下游。 回复消息将转换为网关方法的返回值。 因此,您可能希望确保对于每个网关调用,始终有一条应答消息。 否则,如果设置为负值,网关方法可能永远不会返回并无限期挂起。 处理这种情况的一种方法是使用异步网关(本节稍后将介绍)。 处理它的另一种方法是依赖默认值作为秒。 这样,网关挂起的时间不会超过 指定的时间,如果超时确实过去了,则返回“null”。 最后,您可能需要考虑在服务激活器上设置下游标志,例如“requires-reply”或在筛选器上设置“throw-exceptions-on-rejection”。 本章最后一节将更详细地讨论这些选项。reply-timeoutreply-timeout30reply-timeout
如果下游流返回 ,则其 (a) 被视为常规下游错误。 如果已配置,则将其发送到错误流。 否则,有效负载将抛给网关的调用方。 同样,如果 上的错误流返回 ,则其有效负载将抛给调用方。 这同样适用于任何具有有效负载的消息。 这在异步情况下非常有用,当您需要将 直接传播到调用方时。 为此,您可以返回一个(作为来自某个服务)或抛出它。 通常,即使使用异步流,框架也会负责将下游流引发的异常传播回网关。 TCP 客户端-服务器多路复用示例演示了将异常返回给调用方的两种技术。 它通过使用 with(请参阅聚合器和组超时)和对丢弃流的回复来模拟等待线程的套接字 IO 错误。ErrorMessagepayloadThrowableerror-channelerror-channelErrorMessageThrowableExceptionExceptionreplyaggregatorgroup-timeoutMessagingTimeoutException
通过简单的 POJI 网关公开消息传递系统会带来好处,但“隐藏”底层消息传递系统的现实确实是有代价的,因此您应该考虑某些事情。 我们希望我们的 Java 方法尽可能快地返回,而不是在调用方等待它返回时无限期挂起(无论是 void、返回值还是抛出的异常)。 当常规方法用作消息传递系统前面的代理时,我们必须考虑底层消息传递的潜在异步性质。 这意味着,由网关发起的消息可能会被过滤器丢弃,并且永远不会到达负责生成回复的组件。 某些服务激活器方法可能会导致异常,从而不提供任何回复(因为我们不生成 null 消息)。 换言之,多种情况都可能导致回复消息永远不会出现。 这在消息传递系统中是完全自然的。 但是,请考虑对网关方法的影响。 网关的方法输入参数被合并到消息中并发送到下游。 回复消息将转换为网关方法的返回值。 因此,您可能希望确保对于每个网关调用,始终有一条应答消息。 否则,如果设置为负值,网关方法可能永远不会返回并无限期挂起。 处理这种情况的一种方法是使用异步网关(本节稍后将介绍)。 处理它的另一种方法是依赖默认值作为秒。 这样,网关挂起的时间不会超过 指定的时间,如果超时确实过去了,则返回“null”。 最后,您可能需要考虑在服务激活器上设置下游标志,例如“requires-reply”或在筛选器上设置“throw-exceptions-on-rejection”。 本章最后一节将更详细地讨论这些选项。reply-timeoutreply-timeout30reply-timeout
如果下游流返回 ,则其 (a) 被视为常规下游错误。 如果已配置,则将其发送到错误流。 否则,有效负载将抛给网关的调用方。 同样,如果 上的错误流返回 ,则其有效负载将抛给调用方。 这同样适用于任何具有有效负载的消息。 这在异步情况下非常有用,当您需要将 直接传播到调用方时。 为此,您可以返回一个(作为来自某个服务)或抛出它。 通常,即使使用异步流,框架也会负责将下游流引发的异常传播回网关。 TCP 客户端-服务器多路复用示例演示了将异常返回给调用方的两种技术。 它通过使用 with(请参阅聚合器和组超时)和对丢弃流的回复来模拟等待线程的套接字 IO 错误。ErrorMessagepayloadThrowableerror-channelerror-channelErrorMessageThrowableExceptionExceptionreplyaggregatorgroup-timeoutMessagingTimeoutException

网关超时

网关有两个超时属性:和 。 仅当通道可以阻塞(例如,已满的边界)时,请求超时才适用。 该值是网关等待回复或返回的时间。 它默认为无穷大。requestTimeoutreplyTimeoutQueueChannelreplyTimeoutnull

超时可以设置为网关( 和 )或接口注释上所有方法的默认值。 单个方法可以覆盖这些默认值(在子元素中)或注释上。defaultRequestTimeoutdefaultReplyTimeoutMessagingGateway<method/>@Gateway

从 V5.0 开始,可以将超时定义为表达式,如以下示例所示:

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

评估上下文具有 (用于引用其他 Bean),并且对象中的数组属性可用。 有关此根对象的详细信息,请参阅表达式和“全局”标头。 使用 XML 进行配置时,timeout 属性可以是长整型值或 SpEL 表达式,如以下示例所示:BeanResolver@someBeanargs#root

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="args[0]"
                      request-timeout="1000"
                      reply-timeout="args[1]">
</method>

异步网关

作为一种模式,消息传递网关提供了一种隐藏特定于消息传递的代码的好方法,同时仍然公开消息传递系统的全部功能。 如前所述,它提供了一种通过服务接口公开代理的便捷方法,使您能够基于 POJO 访问消息传递系统(基于您自己的域中的对象、基元/字符串或其他对象)。 但是,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。 由于消息传递系统是天生异步的,因此您可能并不总是能够保证“对于每个请求,总会有一个回复”的契约。Spring Integration 2.0 引入了对异步网关的支持,当您可能不知道是否需要回复或回复到达需要多长时间时,它提供了一种方便的方式来启动流。GatewayProxyFactoryBean

为了处理这些类型的场景,Spring Integration 使用实例来支持异步网关。java.util.concurrent.Future

在 XML 配置中,没有任何变化,并且您仍然以与定义常规网关相同的方式定义异步网关,如以下示例所示:

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

但是,网关接口(服务接口)略有不同,如下所示:

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

如前面的示例所示,网关方法的返回类型是 . 当看到网关方法的返回类型为 时,它会立即使用 . 这就是差异的程度。 对此类方法的调用始终立即返回实例。 然后,您可以按照自己的节奏与 进行交互以获得结果、取消等。 此外,与实例的任何其他使用一样,调用可能会显示超时、执行异常等。 以下示例演示如何使用从异步网关返回的 It:FutureGatewayProxyFactoryBeanFutureAsyncTaskExecutorFutureFutureFutureget()Future

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

有关更详细的示例,请参阅 Spring Integration 示例中的 async-gateway 示例。

AsyncTaskExecutor

默认情况下,在提交返回类型为 . 但是,元素配置中的属性允许您提供对 Spring 应用程序上下文中可用的任何实现的引用。GatewayProxyFactoryBeanorg.springframework.core.task.SimpleAsyncTaskExecutorAsyncInvocationTaskFutureasync-executor<gateway/>java.util.concurrent.Executor

(默认)支持两种类型和返回类型。 请参阅 CompletableFuture。 即使存在默认执行程序,提供外部执行程序通常也很有用,以便您可以在日志中标识其线程(使用 XML 时,线程名称基于执行程序的 Bean 名称),如以下示例所示:SimpleAsyncTaskExecutorFutureCompletableFuture

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果要返回其他实现,可以提供自定义执行程序或完全禁用执行程序,并从下游流返回回复消息中的有效负载。 要禁用执行程序,请将其设置为 (通过使用 )。 使用 XML 配置网关时,请使用 . 使用批注进行配置时,请使用类似于以下内容的代码:FutureFuturenullGatewayProxyFactoryBeansetAsyncTaskExecutor(null)async-executor=""@MessagingGateway

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}
如果返回类型是特定的具体实现或配置的执行程序不支持的其他子接口,则流在调用方的线程上运行,并且流必须在回复消息有效负载中返回所需的类型。Future

CompletableFuture

从版本 4.2 开始,网关方法现在可以返回 。 返回此类型时有两种操作模式:CompletableFuture<?>

  • 当提供异步执行器并且返回类型完全(不是子类)时,框架会在执行器上运行任务,并立即向调用方返回 a。 用来创造未来。CompletableFutureCompletableFutureCompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)

  • 当异步执行程序显式设置为 且返回类型为 或返回类型为 的子类时,将在调用方的线程上调用流。 在此方案中,下游流应返回适当类型的 a。nullCompletableFutureCompletableFutureCompletableFuture

从 Spring Framework 开始已弃用。 现在建议迁移到提供类似处理功能的org.springframework.util.concurrent.ListenableFuture6.0CompletableFuture

使用场景

在以下方案中,调用方线程立即返回 ,并在下游流回复网关(使用对象)时完成。CompletableFuture<Invoice>Invoice

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

在以下方案中,当下游流将其作为对网关的应答的有效负载提供时,调用方线程将返回 a。 当发票准备就绪时,必须完成其他一些过程。CompletableFuture<Invoice>

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

在以下方案中,当下游流将其作为对网关的应答的有效负载提供时,调用方线程将返回 a。 当发票准备就绪时,必须完成其他一些过程。 如果启用了日志记录,则会发出一个日志条目,指示异步执行程序不能用于此方案。CompletableFuture<Invoice>DEBUG

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

CompletableFuture实例可用于对回复执行其他操作,如以下示例所示:

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

反应器Mono

从版本 5.0 开始,允许使用 Mono<T> 返回类型将 Project Reactor 与网关接口方法一起使用。 内部包裹在 .GatewayProxyFactoryBeanAsyncInvocationTaskMono.fromCallable()

A 可用于稍后检索结果(类似于 ),或者您可以通过在结果返回到网关时调用 your 来与调度程序一起使用它。MonoFuture<?>Consumer

框架不会立即刷新。 因此,在网关方法返回之前,基础消息流不会启动(就像任务一样)。 订阅后,流开始。 或者,当 与整个 . 以下示例演示如何使用 Project Reactor 创建网关:MonoFuture<?>ExecutorMonoMonosubscribe()Flux
@MessagingGateway
public interface TestGateway {

    @Gateway(requestChannel = "multiplyChannel")
    Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
    return value * 2;
}

其中这样的网关可以在处理数据的某些服务中使用:Flux

@Autowired
TestGateway testGateway;

public void hadnleFlux() {
    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(System.out::println);
}

使用 Project Reactor 的另一个示例是一个简单的回调方案,如以下示例所示:

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

调用线程将继续,并在流完成时被调用。handleInvoice()

有关更多信息,另请参阅 Kotlin 协程

返回异步类型的下游流

如上面的 AsyncTaskExecutor 部分所述,如果希望某些下游组件返回具有异步有效负载(、 和其他有效负载)的消息,则必须将异步执行程序显式设置为(或在使用 XML 配置时)。 然后,在调用方线程上调用流,稍后可以检索结果。FutureMononull""

异步返回类型void

消息传递网关方法可以这样声明:

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

但下游异常不会传播回调用方。 为了确保下游流调用和异常传播到调用方的异步行为,从 V6.0 开始,框架提供了对 和 返回类型的支持。 该用例类似于之前针对普通返回类型描述的发送和忘记行为,但不同之处在于流执行是异步发生的,并且根据操作结果异常返回(或)是完整的。Future<Void>Mono<Void>voidFutureMononullsend

如果 is exact 下游流应答,则网关的选项必须设置为 null(对于配置),并且该部分在生产者线程上执行。 回复取决于下游流配置。 这样,就可以正确生成回复。 用例已经超出了框架线程控制,因此设置为 null 没有意义。 由于请求-应答,网关操作必须配置为网关方法的返回类型。Future<Void>asyncExecutorAnnotationConstants.NULL@MessagingGatewaysendFuture<Void>MonoasyncExecutorMono<Void>Mono<?>
如果返回类型是特定的具体实现或配置的执行程序不支持的其他子接口,则流在调用方的线程上运行,并且流必须在回复消息有效负载中返回所需的类型。Future
从 Spring Framework 开始已弃用。 现在建议迁移到提供类似处理功能的org.springframework.util.concurrent.ListenableFuture6.0CompletableFuture
框架不会立即刷新。 因此,在网关方法返回之前,基础消息流不会启动(就像任务一样)。 订阅后,流开始。 或者,当 与整个 . 以下示例演示如何使用 Project Reactor 创建网关:MonoFuture<?>ExecutorMonoMonosubscribe()Flux
如果 is exact 下游流应答,则网关的选项必须设置为 null(对于配置),并且该部分在生产者线程上执行。 回复取决于下游流配置。 这样,就可以正确生成回复。 用例已经超出了框架线程控制,因此设置为 null 没有意义。 由于请求-应答,网关操作必须配置为网关方法的返回类型。Future<Void>asyncExecutorAnnotationConstants.NULL@MessagingGatewaysendFuture<Void>MonoasyncExecutorMono<Void>Mono<?>

未响应到达时的网关行为

如前所述,网关提供了一种通过 POJO 方法调用与消息传递系统交互的便捷方式。 但是,通常期望始终返回的典型方法调用(即使有异常)可能并不总是一对一地映射到消息交换(例如,回复消息可能不会到达,这等效于方法不返回)。

本部分的其余部分介绍各种方案,以及如何使网关的行为更具可预测性。 可以配置某些属性以使同步网关行为更可预测,但其中一些属性可能并不总是像预期的那样工作。 其中之一是(在方法级别或网关级别)。 我们检查该属性,以了解它在各种情况下如何影响同步网关的行为。 我们研究了单线程方案(下游的所有组件都通过直接通道连接)和多线程方案(例如,在下游的某个地方,你可能有一个打破单线程边界的可轮询通道或执行程序通道)。reply-timeoutdefault-reply-timeoutreply-timeout

长时间运行的流程下游

Sync Gateway,单线程

如果下游组件仍在运行(可能是由于无限循环或服务速度较慢),则设置 a 不起作用,并且网关方法调用不会返回,直到下游服务退出(通过返回或引发异常)。reply-timeout

Sync Gateway,多线程

如果下游组件仍在多线程消息流中运行(可能是由于无限循环或服务速度慢),那么设置 将允许网关方法调用在达到超时后返回,从而生效,因为应答通道上的轮询会等待消息,直到超时到期。 但是,如果在生成实际回复之前已达到超时,则可能会导致网关方法返回“null”。 您应该了解,在网关方法调用可能返回后,应答消息(如果生成)将发送到应答通道,因此您必须意识到这一点,并在设计流时牢记这一点。reply-timeoutGatewayProxyFactoryBean

另请参阅在发生超时时要抛出 a 而不是返回的属性。errorOnTimeoutMessageTimeoutExceptionnull

下游组件返回“null”

Sync Gateway — 单线程

如果下游组件返回“null”,并且已配置为负值,则网关方法调用将无限期挂起,除非已在下游组件(例如,服务激活器)上设置了可能返回“null”的属性。 在这种情况下,将引发异常并将其传播到网关。reply-timeoutrequires-reply

同步网关 — 多线程

该行为与前一种情况相同。

下游组件返回签名为“void”,而网关方法签名为非 void

Sync Gateway — 单线程

如果下游组件返回“void”,并且已配置为负值,则网关方法调用将无限期挂起。reply-timeout

同步网关 — 多线程

该行为与前一种情况相同。

下游组件导致运行时异常

Sync Gateway — 单线程

如果下游组件引发运行时异常,则该异常将通过错误消息传播回网关并重新引发。

同步网关 — 多线程

该行为与前一种情况相同。

您应该了解,默认情况下,它是无界的。 因此,如果将 设置为负值,则网关方法调用可能会无限期挂起。 因此,为了确保分析您的流,如果发生这些情况之一的可能性很小,您应该将属性设置为“'safe'”值。 默认为秒。 更好的是,您可以将下游组件的属性设置为“true”,以确保及时响应,因为一旦下游组件在内部返回 null,就会引发异常。 但是,您还应该意识到,在某些情况下(请参阅第一个场景)无济于事。 这意味着分析消息流并决定何时使用同步网关而不是异步网关也很重要。 如前所述,后一种情况是定义返回实例的网关方法的问题。 然后,可以保证收到该返回值,并且可以更精细地控制调用结果。 此外,在处理路由器时,您应该记住,如果路由器无法解析特定通道,则将属性设置为“true”会导致路由器引发异常。 同样,在处理过滤器时,您可以设置属性。 在这两种情况下,生成的流的行为就像它包含一个具有“requires-reply”属性的服务激活器一样。 换言之,它有助于确保网关方法调用的及时响应。reply-timeoutreply-timeoutreply-timeout30requires-replyreply-timeoutFutureresolution-requiredthrow-exception-on-rejection
您应该了解,计时器在线程返回到网关时启动,即当流完成或消息传递到另一个线程时。 此时,调用线程开始等待回复。 如果流程是完全同步的,则立即提供回复。 对于异步流,线程最多等待此时间。

从版本 6.2 开始,在 和 上公开了 的内部扩展的属性。 此选项的含义与“终结点摘要”一章末尾所述的任何入站网关的含义完全相同。 换言之,将此选项设置为 将导致从发送和接收网关操作中抛出,而不是在接收超时耗尽时返回。errorOnTimeoutMethodInvocationGatewayMessagingGatewaySupport@MessagingGatewayGatewayEndpointSpectrueMessageTimeoutExceptionnull

请参阅 Java DSL 一章中的 IntegrationFlow as Gateway,了解通过 定义网关的选项。IntegrationFlow

您应该了解,默认情况下,它是无界的。 因此,如果将 设置为负值,则网关方法调用可能会无限期挂起。 因此,为了确保分析您的流,如果发生这些情况之一的可能性很小,您应该将属性设置为“'safe'”值。 默认为秒。 更好的是,您可以将下游组件的属性设置为“true”,以确保及时响应,因为一旦下游组件在内部返回 null,就会引发异常。 但是,您还应该意识到,在某些情况下(请参阅第一个场景)无济于事。 这意味着分析消息流并决定何时使用同步网关而不是异步网关也很重要。 如前所述,后一种情况是定义返回实例的网关方法的问题。 然后,可以保证收到该返回值,并且可以更精细地控制调用结果。 此外,在处理路由器时,您应该记住,如果路由器无法解析特定通道,则将属性设置为“true”会导致路由器引发异常。 同样,在处理过滤器时,您可以设置属性。 在这两种情况下,生成的流的行为就像它包含一个具有“requires-reply”属性的服务激活器一样。 换言之,它有助于确保网关方法调用的及时响应。reply-timeoutreply-timeoutreply-timeout30requires-replyreply-timeoutFutureresolution-requiredthrow-exception-on-rejection
您应该了解,计时器在线程返回到网关时启动,即当流完成或消息传递到另一个线程时。 此时,调用线程开始等待回复。 如果流程是完全同步的,则立即提供回复。 对于异步流,线程最多等待此时间。