消息网关
网关隐藏了 Spring 集成提供的消息传递 API。 它允许你的应用程序的业务逻辑不知道 Spring 集成 API。 通过使用通用 Gateway,您的代码仅与一个简单的接互。
输入GatewayProxyFactoryBean
如前所述,不依赖于 Spring 集成 API (包括 gateway 类)就太好了。
因此, Spring 集成提供了GatewayProxyFactoryBean
,它为任何接口生成代理,并在内部调用如下所示的网关方法。
通过使用依赖关系注入,您可以向业务方法公开接口。
下面的示例展示了一个可用于与 Spring 集成交互的接口:
public interface Cafe {
void placeOrder(Order order);
}
Gateway XML 命名空间支持
还提供了命名空间支持。 它允许您将接口配置为服务,如下例所示:
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
定义此配置后,cafeService
现在可以注入到其他 bean 中,并且调用该代理实例的Cafe
接口不知道 Spring 集成 API。
请参阅“示例”附录,了解使用gateway
元素(在 Cafe 演示中)。
上述配置中的默认值将应用于网关接口上的所有方法。 如果未指定回复超时,则调用线程将等待回复 30 秒。 请参阅没有响应到达时的网关行为。
可以覆盖单个方法的默认值。 请参阅带有注释和 XML 的网关配置。
设置默认回复通道
通常,您无需指定default-reply-channel
,因为 Gateway 会自动创建一个临时的匿名回复通道,并在其中侦听回复。
但是,某些情况可能会提示您定义default-reply-channel
(或reply-channel
使用适配器网关,例如 HTTP、JMS 等)。
对于一些背景,我们简要讨论了 gateway 的一些内部工作原理。
网关创建一个临时的点对点回复通道。
它是匿名的,并添加到消息标头中,名称为replyChannel
.
当提供显式的default-reply-channel
(reply-channel
使用远程适配器网关),您可以指向 publish-subscribe 通道,之所以这样命名,是因为您可以向其添加多个订阅者。
在内部, Spring 集成在临时replyChannel
和显式定义的default-reply-channel
.
假设您希望您的回复不仅发送到网关,还发送到其他某个使用者。 在这种情况下,您需要两样东西:
-
您可以订阅的命名频道
-
该通道将成为 publish-subscribe-channel
网关使用的默认策略不能满足这些需求,因为添加到 Headers 的回复通道是匿名的和点对点的。
这意味着其他订户无法获得该消息的句柄,即使可以,该通道也具有点对点行为,因此只有一个订户可以获取该消息。
通过定义default-reply-channel
您可以指向您选择的频道。
在本例中,这是一个publish-subscribe-channel
.
网关会创建一个桥,从它到存储在 header 中的临时匿名回复通道。
您可能还希望显式提供一个回复通道,以便通过侦听器(例如,wiretap)进行监视或审计。 要配置通道拦截器,您需要一个命名通道。
从版本 5.4 开始,当网关方法返回类型为void ,框架会填充一个replyChannel header 作为nullChannel bean 引用(如果未明确提供此类标头)。
这允许丢弃来自下游流的任何可能的回复,从而满足单向网关协定。 |
带有注释和 XML 的网关配置
请考虑以下示例,该示例在前面的Cafe
interface 示例,方法是添加@Gateway
注解:
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
这@Header
annotation 允许您添加解释为消息标头的值,如下例所示:
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
如果您更喜欢使用 XML 方法来配置网关方法,则可以添加method
元素添加到网关配置中,如下例所示:
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
您还可以使用 XML 为每个方法调用提供单独的标头。
如果要设置的 Headers 本质上是静态的,并且您不想通过使用@Header
附注。
例如,在贷款经纪人示例中,我们希望根据发起的请求类型(单个报价或所有报价)来影响贷款报价的聚合方式。
通过评估调用的网关方法来确定请求的类型,尽管可能,但会违反关注点分离范式(该方法是一个 Java 工件)。
但是,在消息标头中表达您的意图(元信息)在消息传递架构中是很自然的。
以下示例演示如何为两种方法中的每一种方法添加不同的消息标头:
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
在前面的示例中,根据网关的方法为 'RESPONSE_TYPE' 标头设置了不同的值。
例如,如果您指定requestChannel 在<int:method/> 以及在@Gateway annotation 时,annotation 值优先。 |
如果在 XML 中指定了无参数网关,并且接口方法同时具有@Payload 和@Gateway 注解(带有payloadExpression 或payload-expression 在<int:method/> 元素)、@Payload 值被忽略。 |
表达式和 “global” 标头
这<header/>
元件支架expression
作为value
.
计算 SPEL 表达式以确定 header 的值。
从版本 5.2 开始,#root
object 是MethodArgsHolder
跟getMethod()
和getArgs()
访问。
例如,如果你希望对简单的方法名称进行路由,你可以添加一个带有以下表达式的 Headers:method.name
.
这java.reflect.Method 不可序列化。
表达式为method 如果稍后序列化消息,则会丢失。
因此,您可能希望使用method.name 或method.toString() 在那些情况下。
这toString() method 提供了一个String 方法的表示形式,包括 parameter 和 return 类型。 |
从 3.0 版本开始,<default-header/>
元素以将 Headers 添加到网关生成的所有消息中,而不管调用的方法如何。
为方法定义的特定标头优先于默认标头。
在此处为方法定义的特定标头将覆盖任何@Header
注释。
但是,默认标头不会覆盖任何@Header
注释。
网关现在还支持default-payload-expression
,该函数适用于所有方法(除非被覆盖)。
将方法参数映射到消息
使用上一节中的配置技术可以控制如何将方法参数映射到消息元素(有效负载和 Headers)。 如果未使用显式配置,则使用某些约定来执行映射。 在某些情况下,这些约定无法确定哪个参数是有效负载,哪些参数应该映射到 headers。 请考虑以下示例:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是Map
),第二个参数的内容变为 headers。
在第二种情况下(或第一种情况下,当参数 for 参数thing1
是一个Map
),框架无法确定哪个参数应该是有效负载。
因此,映射失败。
这通常可以使用payload-expression
一个@Payload
注释或@Headers
注解。
或者(每当约定被打破时),您可以承担将方法调用映射到消息的全部责任。
为此,请实现MethodArgsMessageMapper
并将其提供给<gateway/>
通过使用mapper
属性。
映射器将MethodArgsHolder
,这是一个简单的类,它将java.reflect.Method
instance 和Object[]
包含参数。
提供自定义映射器时,default-payload-expression
attribute 和<default-header/>
元素。
同样,payload-expression
attribute 和<header/>
元素不允许在任何<method/>
元素。
映射方法参数
以下示例显示了如何将方法参数映射到消息,并显示了无效配置的一些示例:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
1 | 请注意,在此示例中,SPEL 变量#this 引用参数 — 在本例中,s . |
XML 等效项看起来略有不同,因为没有#this
method 参数的 context 获取。
但是,表达式可以使用args
属性的MethodArgsHolder
root 对象(有关更多信息,请参阅表达式和“全局”标头),如下例所示:
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway
注解
从版本 4.0 开始,网关服务接口可以用@MessagingGateway
注解,而不是要求定义<gateway />
xml 元素进行配置。
以下一对示例比较了配置同一网关的两种方法:
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
与 XML 版本类似,当 Spring 集成在组件扫描期间发现这些注释时,它会创建proxy 实施。
要执行此扫描并注册BeanDefinition 在应用程序上下文中,添加@IntegrationComponentScan 注解添加到@Configuration 类。
标准@ComponentScan infrastructure 不处理接口。
因此,我们引入了@IntegrationComponentScan 逻辑来查找@MessagingGateway 接口和寄存器上的注释GatewayProxyFactoryBean 实例。
另请参阅 注释支持。 |
与@MessagingGateway
注解中,您可以使用@Profile
注解以避免创建 Bean(如果此类配置文件未处于活动状态)。
从版本 6.0 开始,带有@MessagingGateway
也可以标有@Primary
对于任何 Spring 都尽可能地进行相应配置逻辑的注释@Component
定义。
从版本 6.0 开始,@MessagingGateway
接口可用于标准 Spring@Import
配置。
这可以用作@IntegrationComponentScan
或手动AnnotationGatewayProxyFactoryBean
bean 定义。
这@MessagingGateway
使用@MessageEndpoint
自版本6.0
和name()
属性本质上是别名为@Compnent.value()
.
这样,网关代理的 bean 名称生成策略将与扫描和导入组件的标准 Spring 注释配置重新保持一致。
默认的AnnotationBeanNameGenerator
可以通过AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR
或作为@IntegrationComponentScan.nameGenerator()
属性。
如果您没有 XML 配置,则@EnableIntegration 至少需要一个 Comments@Configuration 类。
看Configuration 和@EnableIntegration 了解更多信息。 |
调用无参数方法
当在没有任何参数的 Gateway 接口上调用方法时,默认行为是接收Message
从PollableChannel
.
但是,有时您可能希望触发无参数方法,以便可以与下游不需要用户提供的参数的其他组件进行交互,例如触发无参数 SQL 调用或存储过程。
要实现发送和接收语义,您必须提供有效负载。
要生成有效负载,接口上的方法参数不是必需的。
您可以使用@Payload
annotation 或payload-expression
XML 中method
元素。
以下列表包括有效负载的几个示例:
-
文本字符串
-
#gatewayMethod.name
-
新的 java.util.Date()
-
@someBean.someMethod() 的返回值
以下示例演示如何使用@Payload
注解:
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
您还可以使用@Gateway
注解。
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
如果两个注解都存在(并且payloadExpression 提供)@Gateway 赢了。 |
另请参阅使用注释和 XML 的网关配置。
如果方法没有参数和返回值,但包含有效负载表达式,则将其视为仅发送作。
调用default
方法
网关代理的接口可能具有default
方法,从版本 5.3 开始,框架会注入一个DefaultMethodInvokingMethodInterceptor
转换为代理来调用default
方法使用java.lang.invoke.MethodHandle
方法而不是代理。
JDK 中的接口(例如java.util.function.Function
仍然可用于网关代理,但是它们的default
方法无法调用,因为MethodHandles.Lookup
针对 JDK 类的实例化。
这些方法也可以使用显式@Gateway
注解,或者proxyDefaultMethods
在@MessagingGateway
annotation 或<gateway>
XML 组件。
错误处理
网关调用可能会导致错误。 默认情况下,在网关的方法调用时,下游发生的任何错误都会“按原样”重新引发。 例如,请考虑以下简单流程:
gateway -> service-activator
如果服务激活器调用的服务抛出MyException
(例如),框架会将其包装在MessagingException
并将传递给 Service Activator 的消息附加到failedMessage
财产。
因此,框架执行的任何日志记录都具有完整的失败上下文。
默认情况下,当网关捕获到异常时,MyException
被解包并抛给调用方。
您可以配置throws
子句来匹配原因链中的特定异常类型。
例如,如果您想捕获整个MessagingException
有了 Reason of downstream error 的所有消息收发信息,您应该有一个类似于以下内容的网关方法:
public interface MyGateway {
void performProcess() throws MessagingException;
}
由于我们鼓励 POJO 编程,因此您可能不希望将调用方暴露给消息传递基础设施。
如果您的网关方法没有throws
子句中,网关会遍历 Cause Tree,查找RuntimeException
那不是MessagingException
.
如果未找到,框架将抛出MessagingException
.
如果MyException
在前面的讨论中,有一个原因为SomeOtherException
和您的方法throws SomeOtherException
,网关会进一步解包该 API 并将其抛给调用方。
当网关声明时没有service-interface
、内部框架界面RequestReplyExchanger
被使用。
请考虑以下示例:
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
在 5.0 版本之前,此exchange
方法没有throws
子句,因此,异常被解包。
如果你使用这个接口,并且想要恢复之前的 unwrap 行为,请使用自定义的service-interface
或访问cause
的MessagingException
你自己。
但是,您可能希望记录错误而不是传播错误,或者您可能希望将异常视为有效回复(通过将其映射到符合调用方理解的某个“错误消息”协定的消息)。
为了实现这一点,网关通过包含对error-channel
属性。
在以下示例中,“transformer”创建一个回复Message
从Exception
:
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
这exceptionTransformer
可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。
这将成为发送回给调用方的有效负载。
如有必要,您可以在这样的 “错误流” 中做很多更复杂的事情。
它可能涉及路由器(包括 Spring Integration 的ErrorMessageExceptionTypeRouter
)、过滤器等。
然而,大多数时候,一个简单的 “transformer” 就足够了。
或者,您可能希望仅记录异常(或将其异步发送到某个位置)。
如果您提供单向流,则不会将任何内容发送回调用方。
如果要完全禁止显示异常,则可以提供对全局nullChannel
(本质上是一个/dev/null
方法)。
最后,如上所述,如果没有error-channel
,则异常会照常传播。
当您使用@MessagingGateway
annotation (请参阅 @MessagingGateway' 注释), you can use an `errorChannel
属性。
从版本 5.0 开始,当您使用带有void
return 类型(单向流)时,该error-channel
reference (如果提供) 填充到标准errorChannel
标头。
此功能允许下游异步流,基于标准ExecutorChannel
配置(或QueueChannel
) 覆盖默认的全局变量errorChannel
异常发送行为。
以前,您必须手动指定errorChannel
标头,其中@GatewayHeader
annotation 或<header>
元素。
这error-channel
的 属性被忽略void
方法。
相反,错误消息被发送到默认的errorChannel
.
通过简单的 POJI 网关公开消息传递系统是有好处的,但“隐藏”底层消息传递系统的现实确实是有代价的,因此您应该考虑某些事项。
我们希望我们的 Java 方法尽快返回,而不是在调用者等待它返回时无限期挂起(无论是 void、返回值还是引发的 Exception)。
当常规方法用作消息传递系统前面的代理时,我们必须考虑底层消息传递的潜在异步性质。
这意味着由网关启动的消息有可能被过滤器丢弃,并且永远不会到达负责生成回复的组件。
某些服务激活器方法可能会导致异常,因此不提供回复(因为我们不生成 null 消息)。
换句话说,多种情况都可能导致回复消息永远不会出现。
这在消息传递系统中是非常自然的。
但是,请考虑对网关方法的含义。
网关的方法输入参数被合并到消息中并发送到下游。
回复消息将转换为网关方法的返回值。
因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。
否则,如果满足以下条件,则网关方法可能永远不会返回并无限期挂起reply-timeout 设置为负值。
处理这种情况的一种方法是使用异步网关(本节稍后将介绍)。
另一种处理方法是依赖默认的reply-timeout 作为30 秒。
这样,网关的挂起时间不会超过由reply-timeout 如果超时已过,则返回 'null'。
最后,你可能要考虑在服务激活器上设置下游标志,例如 'requires-reply' 或过滤器上的 'throw-exceptions-on-rejection'。
本章的最后一节将更详细地讨论这些选项。 |
如果下游流返回ErrorMessage 其payload (一个Throwable ) 被视为常规下游错误。
如果存在error-channel 配置,则会将其发送到 Error 流。
否则,有效负载将抛给网关的调用方。
同样,如果error-channel 返回一个ErrorMessage ,则其有效负载将抛出给调用方。
这同样适用于带有Throwable 有效载荷。
这在异步情况下非常有用,当您需要将Exception 直接发送给调用方。
为此,您可以返回Exception (作为reply 来自某些服务)或扔掉它。
通常,即使使用异步流,框架也会负责将下游流引发的异常传播回网关。
TCP 客户端-服务器多路复用示例演示了将异常返回给调用方的两种技术。
它通过使用aggregator 跟group-timeout (请参阅 Aggregator 和 Group Timeout )和MessagingTimeoutException reply on the discard 流。 |
网关超时
网关有两个超时属性:requestTimeout
和replyTimeout
.
仅当通道可以阻塞(例如,有界的QueueChannel
那是满的)。
这replyTimeout
value 是网关等待回复或返回的时间null
.
它默认为无穷大。
超时可以设置为网关上所有方法的默认值 (defaultRequestTimeout
和defaultReplyTimeout
) 或在MessagingGateway
interface 注解。
单个方法可以覆盖这些默认值(在<method/>
子元素)或@Gateway
注解。
从版本 5.0 开始,超时可以定义为表达式,如下例所示:
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
评估上下文有一个BeanResolver
(使用@someBean
引用其他 bean),并将args
array 属性#root
对象可用。
有关此根对象的更多信息,请参阅表达式和 “Global” Headers。
使用 XML 进行配置时,超时属性可以是长值或 SPEL 表达式,如下例所示:
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>
异步网关
作为一种模式,消息传递网关提供了一种很好的方法来隐藏特定于消息传递的代码,同时仍然公开消息传递系统的全部功能。
如前所述,GatewayProxyFactoryBean
提供了一种通过服务接口公开代理的便捷方法,使您可以基于 POJO 访问消息传递系统(基于您自己的域中的对象、原语/字符串或其他对象)。
但是,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。
由于消息传递系统本质上是异步的,因此您可能无法始终保证 “对于每个请求,总会有一个回复” 的合同。Spring Integration 2.0 引入了对异步网关的支持,当您可能不知道是否需要回复或回复到达需要多长时间时,它提供了一种方便的方式来启动流。
为了处理这些类型的场景, Spring 集成使用java.util.concurrent.Future
实例来支持异步网关。
在 XML 配置中,没有任何变化,您仍然以与定义常规网关相同的方式定义异步网关,如下例所示:
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
但是,网关接口(服务接口)略有不同,如下所示:
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
如前面的示例所示,网关方法的返回类型是Future
.
什么时候GatewayProxyFactoryBean
看到 gateway 方法的 return type 是一个Future
,它会立即使用AsyncTaskExecutor
.
这就是差异的程度。
对此类方法的调用总是立即返回Future
实例。
然后,您可以与Future
按照自己的节奏获取结果、取消等。
此外,与Future
实例, 调用get()
可能会显示 Timeout、Execution 异常等。
以下示例演示如何使用Future
从异步网关返回:
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
有关更详细的示例,请参阅 Spring 集成示例中的 async-gateway 示例。
AsyncTaskExecutor
默认情况下,GatewayProxyFactoryBean
使用org.springframework.core.task.SimpleAsyncTaskExecutor
提交内部时AsyncInvocationTask
返回类型为Future
.
但是,async-executor
属性中的<gateway/>
元素的配置允许您提供对java.util.concurrent.Executor
在 Spring 应用程序上下文中可用。
的 (默认)SimpleAsyncTaskExecutor
同时支持Future
和CompletableFuture
return 类型。
看CompletableFuture
.
即使有默认的执行程序,提供外部执行程序通常也很有用,这样你就可以在日志中识别其线程(当使用 XML 时,线程名称基于执行程序的 Bean 名称),如下例所示:
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果您希望退回不同的Future
实现中,您可以提供自定义 Executor 或完全禁用 Executor 并返回Future
在 reply message payload 中。
要禁用执行程序,请将其设置为null
在GatewayProxyFactoryBean
(通过使用setAsyncTaskExecutor(null)
).
使用 XML 配置网关时,请使用async-executor=""
.
使用@MessagingGateway
annotation 中,请使用类似于以下内容的代码:
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果返回类型是特定的 concreteFuture implementation 或配置的执行程序不支持的其他一些子接口,则流将在调用方的线程上运行,并且流必须在回复消息有效负载中返回所需的类型。 |
CompletableFuture
从版本 4.2 开始,网关方法现在可以返回CompletableFuture<?>
.
返回此类型时有两种作模式:
-
当提供了异步执行程序且返回类型恰好为
CompletableFuture
(不是子类)时,框架会在 Executor 上运行任务,并立即返回CompletableFuture
给调用方。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
用于创造未来。 -
当异步执行程序显式设置为
null
返回类型为CompletableFuture
或者返回类型是CompletableFuture
,则在调用方的线程上调用流。 在这种情况下,下游流应返回CompletableFuture
的适当类型。
使用场景
在下面的场景中,调用方线程立即返回CompletableFuture<Invoice>
,当下游流回复网关(使用Invoice
对象)。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
在下面的场景中,调用方线程返回一个CompletableFuture<Invoice>
当下游流将其作为回复的负载提供给网关时。
当发票准备好时,必须完成其他一些流程。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
在下面的场景中,调用方线程返回一个CompletableFuture<Invoice>
当下游流将其作为回复的负载提供给网关时。
当发票准备好时,必须完成其他一些流程。
如果DEBUG
logging 后,会发出一个日志条目,指示异步执行程序不能用于此方案。
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
CompletableFuture
实例可用于对回复执行其他作,如下例所示:
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
反应器Mono
从版本 5.0 开始,GatewayProxyFactoryBean
允许将 Project Reactor 与网关接口方法一起使用,使用Mono<T>
return 类型。
内部的AsyncInvocationTask
包装在Mono.fromCallable()
.
一个Mono
可用于稍后检索结果(类似于Future<?>
),或者您可以通过调用Consumer
当结果返回到网关时。
这Mono 不会立即被框架刷新。
因此,底层消息流不会在网关方法返回之前启动(就像使用Future<?> Executor 任务)。
流程在Mono 已订阅。
或者,Mono (作为“可组合项”)可能是 Reactor 流的一部分,当subscribe() 与整个Flux .
以下示例显示了如何使用 Project Reactor 创建网关: |
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);
}
@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}
其中,这样的网关可以用于处理Flux
数据:
@Autowired
TestGateway testGateway;
public void hadnleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}
使用 Project Reactor 的另一个示例是一个简单的回调场景,如下例所示:
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
调用线程继续,其中handleInvoice()
在流完成时被调用。
如需了解详情,另请参阅 Kotlin 协程。
返回异步类型的下游流
如AsyncTaskExecutor
部分,如果您希望某个下游组件返回带有异步有效负载 (Future
,Mono
等),则必须将异步执行程序显式设置为null
(或使用 XML 配置时)。
然后,在调用方线程上调用该流,稍后可以检索结果。""
异步void
返回类型
消息网关方法可以按如下方式声明:
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
但是下游异常不会传播回调用方。
为了确保下游流调用和异常传播到调用方的异步行为,从版本 6.0 开始,框架提供了对Future<Void>
和Mono<Void>
return 类型。
该用例类似于前面描述的普通 send-and-forget 行为void
return 类型,但不同的是,流执行是异步发生的,并且返回Future
(或Mono
) 以null
或根据send
作结果。
如果Future<Void> 是精确的下游流回复,则asyncExecutor 选项必须设置为 null (AnnotationConstants.NULL 对于@MessagingGateway configuration) 和send part 在 producer 线程上执行。
回复 1 取决于下游流配置。
这样,目标应用程序就可以生成Future<Void> 正确回复。
这Mono use-case 已经超出了框架线程控制范围,因此将asyncExecutor 设置为 null 没有意义。
那里Mono<Void> 作为 request-reply 网关作的结果,必须配置为Mono<?> 返回 Gateway 方法的类型。 |
未到达响应时的网关行为
如前所述,网关提供了一种通过 POJO 方法调用与消息传递系统交互的便捷方式。 但是,通常预期始终返回的典型方法调用(即使有 Exception)可能并不总是一对一地映射到消息交换(例如,回复消息可能未到达 — 相当于方法未返回)。
本节的其余部分介绍各种场景以及如何使网关的行为更具可预测性。
可以配置某些属性以使同步网关行为更具可预测性,但其中一些属性可能并不总是像您预期的那样工作。
其中之一是reply-timeout
(在方法级别或default-reply-timeout
在网关级别)。
我们检查reply-timeout
属性来了解它在各种场景中如何能够和不能影响同步网关的行为。
我们研究了单线程场景(下游的所有组件都通过直接通道连接)和多线程场景(例如,在下游的某个地方,你可能有一个打破单线程边界的 pollable 或 executor 通道)。
长时间运行的流程下游
- Sync Gateway,单线程
-
如果下游组件仍在运行(可能是由于无限循环或服务缓慢),则设置
reply-timeout
不起作用,并且 Gateway 方法调用不会返回,直到下游服务退出(通过返回或引发异常)。 - Sync Gateway,多线程
-
如果下游组件仍在多线程消息流中运行(可能是由于无限循环或服务缓慢),则设置
reply-timeout
通过允许网关方法调用在达到超时后返回来发挥作用,因为GatewayProxyFactoryBean
轮询 Reply 通道,等待消息,直到超时到期。 但是,如果在生成实际回复之前已达到超时,则可能导致网关方法返回 'null'。 您应该了解,回复消息(如果生成)是在网关方法调用可能返回后发送到回复通道的,因此您必须了解这一点并在设计流程时牢记这一点。
另请参阅errorOnTimeout
属性来抛出MessageTimeoutException
而不是返回null
,当发生超时时。
下游组件返回 'null'
- Sync Gateway — 单线程
-
如果下游组件返回 'null' 并且
reply-timeout
已配置为负值,则网关方法调用将无限期挂起,除非requires-reply
属性,该属性已在可能返回 'null' 的下游组件(例如,服务激活器)上设置。 在这种情况下,将引发异常并将其传播到网关。 - Sync Gateway — 多线程
-
该行为与前一种情况相同。
下游组件返回签名为“void”,而网关方法签名为非 void
- Sync Gateway — 单线程
-
如果下游组件返回 'void' 并且
reply-timeout
已配置为负值,则网关方法调用将无限期挂起。 - Sync Gateway — 多线程
-
该行为与前一种情况相同。
下游组件导致运行时异常
- Sync Gateway — 单线程
-
如果组件下游引发运行时异常,则异常将通过错误消息传播回网关并重新引发。
- Sync Gateway — 多线程
-
该行为与前一种情况相同。
您应该了解,默认情况下,reply-timeout 是无界的。
因此,如果您将reply-timeout 设置为负值时,您的 Gateway 方法调用可能会无限期挂起。
因此,为了确保您分析了流程,并且如果这些场景之一发生的可能性很小,您应该设置reply-timeout 属性设置为 “'safe'” 值。
是的30 秒。
更好的是,您可以将requires-reply 属性设置为 'true' 以确保及时响应,因为一旦下游组件在内部返回 null,就会引发异常。
但是,您还应该意识到,在某些情况下(请参阅第一个),其中reply-timeout 没有帮助。
这意味着分析您的消息流并决定何时使用同步网关而不是异步网关也很重要。
如前所述,后一种情况是定义返回Future 实例。
然后,您可以保证收到该返回值,并且您可以对调用结果进行更精细的控制。
此外,在处理路由器时,您应该记住设置resolution-required 属性设置为 'true' 会导致路由器在无法解析特定通道时抛出异常。
同样,在处理 Filter 时,你可以设置throw-exception-on-rejection 属性。
在这两种情况下,生成的流的行为类似于它包含具有 'requires-reply' 属性的服务激活器。
换句话说,它有助于确保网关方法调用的及时响应。 |
您应该了解,计时器在线程返回到网关时启动,即当流完成或将消息传递给另一个线程时。 此时,调用线程开始等待回复。 如果流是完全同步的,则回复将立即可用。 对于异步流,线程将等待 this time。 |
从版本 6.2 开始,errorOnTimeout
内部属性MethodInvocationGateway
扩展MessagingGatewaySupport
在@MessagingGateway
和GatewayEndpointSpec
.
此选项的含义与 Endpoint Summary 一章末尾解释的任何入站网关的含义完全相同。
换句话说,将此选项设置为true
,将导致MessageTimeoutException
从 Send and-receive 网关作中引发,而不是返回null
当接收超时用尽时。
看IntegrationFlow
作为 网关在 Java DSL 一章中,了解通过IntegrationFlow
.