消息发布

消息发布

(面向方面编程的)AOP 消息发布功能允许您构造消息并将其作为方法调用的副产品发送。 例如,假设您有一个组件,每次该组件的状态发生变化时,您都希望通过消息收到通知。 发送此类通知的最简单方法是将消息发送到专用通道,但是如何将更改对象状态的方法调用连接到消息发送过程,以及通知消息的结构应该如何? AOP 消息发布功能使用配置驱动的方法处理这些责任。spring-doc.cadn.net.cn

消息发布配置

Spring 集成提供了两种方法:XML 配置和注释驱动(Java)配置。spring-doc.cadn.net.cn

注解驱动的配置,使用@Publisher注解

注释驱动方法允许您使用@Publisher注解来指定 'channel' 属性。 从版本 5.1 开始,要打开此功能,您必须使用@EnablePublisher对某些@Configuration类。 看Configuration 和@EnableIntegration了解更多信息。 该消息是根据方法调用的返回值构造的,并发送到由 'channel' 属性指定的通道。 要进一步管理消息结构,您还可以将两者结合使用@Payload@Header附注。spring-doc.cadn.net.cn

在内部,Spring Integration 的这个消息发布功能通过定义PublisherAnnotationAdvisor和 Spring 表达式语言 (SpEL),为您提供了相当大的灵活性和对Message它发布。spring-doc.cadn.net.cn

PublisherAnnotationAdvisor定义并绑定以下变量:spring-doc.cadn.net.cn

  • #return:绑定到返回值,允许您引用它或其属性(例如#return.something,其中 'something' 是绑定到#return)spring-doc.cadn.net.cn

  • #exception:如果方法调用引发异常,则绑定到异常spring-doc.cadn.net.cn

  • #args:绑定到方法参数,以便您可以按名称提取各个参数(例如#args.fname)spring-doc.cadn.net.cn

请考虑以下示例:spring-doc.cadn.net.cn

@Publisher
public String defaultPayload(String fname, String lname) {
  return fname + " " + lname;
}

在前面的示例中,消息使用以下结构构造:spring-doc.cadn.net.cn

  • 消息有效负载是方法的返回类型和值。 这是默认设置。spring-doc.cadn.net.cn

  • 新构造的消息将发送到配置了 Comments 后处理器的默认 publisher 通道(本节稍后将介绍)。spring-doc.cadn.net.cn

以下示例与前面的示例相同,只是它不使用默认发布渠道:spring-doc.cadn.net.cn

@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
  return fname + " " + lname;
}

我们不是使用默认发布渠道,而是通过设置@Publisher注解。 我们还添加了一个@Header注释,这会导致名为 'last' 的消息标头与 'lname' 方法参数具有相同的值。 该标头将添加到新构造的消息中。spring-doc.cadn.net.cn

以下示例与前面的示例几乎相同:spring-doc.cadn.net.cn

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
  return fname + " " + lname;
}

唯一的区别是我们使用@Payload注解来显式指定该方法的返回值应该用作消息的有效负载。spring-doc.cadn.net.cn

以下示例通过在@Payload注解进一步指示框架应如何构造消息:spring-doc.cadn.net.cn

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
  return fname + " " + lname;
}

在前面的示例中,消息是方法调用的返回值和 'lname' 输入参数的串联。 名为 'x' 的 Message 标头的值由 'num' 输入参数确定。 该标头将添加到新构造的消息中。spring-doc.cadn.net.cn

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
  return fname + " " + lname;
}

在前面的示例中,您会看到@Payload注解。 在这里,我们注释了一个 method 参数,该参数将成为新构造消息的有效负载。spring-doc.cadn.net.cn

与 Spring 中的大多数其他 Comments 驱动功能一样,您需要注册一个后处理器(PublisherAnnotationBeanPostProcessor). 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

对于更简洁的配置,您可以改用 namespace support,如下例所示:spring-doc.cadn.net.cn

<int:annotation-config>
    <int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>

对于 Java 配置,您必须使用@EnablePublisherannotation 中,如下例所示:spring-doc.cadn.net.cn

@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
    ...
}

从版本 5.1.3 开始,<int:enable-publisher>组件,以及@EnablePublisher注解具有proxy-target-classorder用于调整ProxyFactory配置。spring-doc.cadn.net.cn

与其他 Spring 注解 (@Component,@Scheduled等),也可以使用@Publisher作为元注释。 这意味着您可以定义自己的注解,这些注解的处理方式与@Publisher本身。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}

在前面的示例中,我们定义了@Audit注解,它本身用@Publisher. 另请注意,您可以定义channel属性来封装消息在此注释中的发送位置。 现在,您可以使用@Auditannotation 中,如下例所示:spring-doc.cadn.net.cn

@Audit
public String test() {
    return "Hello";
}

在前面的示例中,每次调用test()方法会生成一条消息,其中包含从其返回值创建的有效负载。 每条消息都会发送到名为auditChannel. 此技术的好处之一是,您可以避免在多个 annotation 中重复相同的通道名称。 您还可以在自己的(可能特定于域的)注释和框架提供的注释之间提供间接级别。spring-doc.cadn.net.cn

您还可以对类进行注释,这样您就可以将此注释的属性应用于该类的每个公共方法,如下例所示:spring-doc.cadn.net.cn

@Audit
static class BankingOperationsImpl implements BankingOperations {

  public String debit(String amount) {
     . . .

  }

  public String credit(String amount) {
     . . .
  }

}

基于 XML 的方法与<publishing-interceptor>元素

基于 XML 的方法允许您配置与基于命名空间的MessagePublishingInterceptor. 与注释驱动方法相比,它当然有一些好处,因为它允许您使用 AOP 切入点表达式,因此可能会一次拦截多个方法,或者拦截和发布您没有源代码的方法。spring-doc.cadn.net.cn

要使用 XML 配置消息发布,您只需执行以下两项作:spring-doc.cadn.net.cn

以下示例显示如何配置publishing-interceptor元素:spring-doc.cadn.net.cn

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
  <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" value="something"/>
  </method>
  <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" expression="'something'.toUpperCase()"/>
  </method>
  <method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>

<publishing-interceptor>配置看起来与基于 Comments 的方法非常相似,并且它还使用了 Spring 表达式语言的强大功能。spring-doc.cadn.net.cn

在前面的示例中,echo方法testBean渲染一个Message具有以下结构:spring-doc.cadn.net.cn

第二种方法与第一种方法非常相似。 在这里,每个以 'repl' 开头的方法都会渲染一个Message具有以下结构:spring-doc.cadn.net.cn

第二个方法,映射任何以echoDef生成一个Message具有以下结构:spring-doc.cadn.net.cn

对于简单的映射规则,您可以依赖publisherdefaults,如下例所示:spring-doc.cadn.net.cn

<publishing-interceptor id="anotherInterceptor"/>

前面的示例将匹配切入点表达式的每个方法的返回值映射到有效负载,并发送到default-channel. 如果未指定defaultChannel(前面的示例没有这样做),消息将发送到全局nullChannel(相当于/dev/null).spring-doc.cadn.net.cn

异步发布

发布与组件的执行发生在同一个线程中。 因此,默认情况下,它是同步的。 这意味着整个消息流必须等待发布者的流完成。 但是,开发人员通常希望完全相反:使用此消息发布功能来启动异步流。 例如,您可以托管接收远程请求的服务 (HTTP、WS 等)。 您可能希望在内部将此请求发送到可能需要一段时间的进程中。 但是,您可能还希望立即回复用户。 因此,您可以使用“output-channel”或“replyChannel”标头将简单的类似确认的回复发送回调用者,同时使用 message-publisher 功能启动复杂的流,而不是将入站请求发送到输出通道进行处理(传统方式)。spring-doc.cadn.net.cn

以下示例中的服务接收复杂的有效负载(需要进一步发送以进行处理),但它还需要使用简单的确认来回复调用方:spring-doc.cadn.net.cn

public String echo(Object complexPayload) {
     return "ACK";
}

因此,我们不是将复杂的流挂接到输出通道,而是使用消息发布功能。 我们将其配置为使用 service 方法的 input 参数(如前面的示例所示)创建新消息,并将其发送到 'localProcessChannel'。 为了确保此流是异步的,我们需要做的就是将其发送到任何类型的异步通道 (ExecutorChannel在下一个例子中)。 以下示例演示如何将异步publishing-interceptor:spring-doc.cadn.net.cn

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleservice" class="test.SampleService"/>

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(sampleservice)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
  <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
    <int:header name="sample_header" expression="'some sample value'"/>
  </int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
  <int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>

处理此类情况的另一种方法是使用窃听。 请参见Wire Tapspring-doc.cadn.net.cn

基于计划触发器生成和发布消息

在前面的部分中,我们了解了消息发布功能,该功能将消息作为方法调用的副产品构建和发布。 但是,在这些情况下,您仍然负责调用该方法。 Spring 集成 2.0 添加了对预定消息生产者和发布者的支持,使用了新的expression'inbound-channel-adapter' 元素上的属性。 你可以基于多个触发器进行调度,其中任何一个都可以在 'poller' 元素上配置。 目前,我们支持cron,fixed-rate,fixed-delay以及您实现并由 'trigger' 属性值引用的任何自定义触发器。spring-doc.cadn.net.cn

如前所述,对 scheduled producer 和 publisher 的支持是通过<inbound-channel-adapter>XML 元素。 请考虑以下示例:spring-doc.cadn.net.cn

<int:inbound-channel-adapter id="fixedDelayProducer"
       expression="'fixedDelayTest'"
       channel="fixedDelayChannel">
    <int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>

前面的示例创建了一个入站通道适配器,该适配器构造了一个Message,其 payload 是expression属性。 每次fixed-delay属性发生。spring-doc.cadn.net.cn

以下示例与前面的示例类似,不同之处在于它使用了fixed-rate属性:spring-doc.cadn.net.cn

<int:inbound-channel-adapter id="fixedRateProducer"
       expression="'fixedRateTest'"
       channel="fixedRateChannel">
    <int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>

fixed-rate属性允许您以固定速率发送消息(从每个任务的开始时间开始测量)。spring-doc.cadn.net.cn

以下示例显示了如何应用在cron属性:spring-doc.cadn.net.cn

<int:inbound-channel-adapter id="cronProducer"
       expression="'cronTest'"
       channel="cronChannel">
    <int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>

以下示例显示了如何在消息中插入其他标头:spring-doc.cadn.net.cn

<int:inbound-channel-adapter id="headerExpressionsProducer"
       expression="'headerExpressionsTest'"
       channel="headerExpressionsChannel"
       auto-startup="false">
    <int:poller fixed-delay="5000"/>
    <int:header name="foo" expression="6 * 7"/>
    <int:header name="bar" value="x"/>
</int:inbound-channel-adapter>

额外的消息头可以采用标量值或评估 Spring 表达式的结果。spring-doc.cadn.net.cn

如果您需要实现自己的自定义触发器,可以使用trigger属性来提供对任何 Spring 配置的 bean 的引用,该 bean 实现了org.springframework.scheduling.Trigger接口。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<int:inbound-channel-adapter id="triggerRefProducer"
       expression="'triggerRefTest'" channel="triggerRefChannel">
    <int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
    <beans:constructor-arg value="9999"/>
</beans:bean>