消息转换
消息转换
转换器
消息转换器在实现消息生成者和消息使用者的松散耦合方面起着非常重要的作用。
您可以在这些组件之间添加转换器,而不是要求每个消息生成组件都知道下一个使用者期望的类型。
泛型转换器,例如将String
转换为 XML 文档,也可以高度重用。
对于某些系统,最好提供规范的数据模型,但 Spring 集成的一般理念是不需要任何特定的格式。 相反,为了获得最大的灵活性, Spring 集成旨在提供最简单的扩展模型。 与其他端点类型一样,在 XML 或 Java 注释中使用声明性配置使简单的 POJO 能够适应消息转换器的角色。 本章的其余部分介绍了这些配置选项。
为了最大限度地提高灵活性, Spring 不需要基于 XML 的消息有效负载。 尽管如此,如果这确实是您的应用程序的正确选择,那么该框架确实提供了一些方便的转换器来处理基于 XML 的有效负载。 有关这些转换器的更多信息,请参见 XML 支持 - 处理 XML 负载。 |
使用 XML 配置 Transformer
这<transformer>
元素用于创建消息转换终端节点。
除了input-channel
和output-channel
属性,它需要一个ref
属性。
这ref
可能指向包含@Transformer
注释(请参阅使用注释配置转换器),或者它可以与method
属性。
<int:transformer id="testTransformer" ref="testTransformerBean" input-channel="inChannel"
method="transform" output-channel="outChannel"/>
<beans:bean id="testTransformerBean" class="org.foo.TestTransformer" />
使用ref
属性,如果自定义转换器处理程序实现可以在其他<transformer>
定义。
但是,如果自定义转换器处理程序实现的范围应限定为<transformer>
,你可以定义一个内部 Bean 定义,如下例所示:
<int:transformer id="testTransformer" input-channel="inChannel" method="transform"
output-channel="outChannel">
<beans:bean class="org.foo.TestTransformer"/>
</transformer>
同时使用ref attribute 和内部处理程序定义位于同一<transformer> 不允许配置,因为它会产生不明确的条件并导致引发异常。 |
如果ref 属性引用一个扩展AbstractMessageProducingHandler (例如框架本身提供的 transformers),通过将 output channel 直接注入到 handler 中来优化配置。
在这种情况下,每个ref 必须发送到单独的 bean 实例(或prototype -scoped bean)或使用内部的<bean/> 配置类型。
如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。 |
使用 POJO 时,用于转换的方法可能需要Message
type 或入站消息的有效负载类型。
它还可以单独接受消息头值,或者通过使用@Header
和@Headers
parameter 注解。
该方法的返回值可以是任何类型。
如果返回值本身是一个Message
,该通道将传递到 transformer 的 output 通道。
从 Spring Integration 2.0 开始,消息转换器的转换方法不能再返回null
.
返回null
会导致异常,因为应始终期望 Message Transformer 将每个源消息转换为有效的目标消息。
换句话说,消息转换器不应该用作消息过滤器,因为有一个专用的<filter>
选项。
但是,如果您确实需要这种类型的行为(组件可能会返回null
这不应被视为错误),您可以使用 Service Activator 。
其requires-reply
value 为false
默认情况下,但可以设置为true
为了引发null
返回值,就像 transformer 一样。
转换器和 Spring 表达式语言 (SpEL)
与路由器、聚合器和其他组件一样,从 Spring Integration 2.0 开始,只要转换逻辑相对简单,转换器也可以从 SPEL 支持中受益。 下面的示例展示了如何使用 SPEL 表达式:
<int:transformer input-channel="inChannel"
output-channel="outChannel"
expression="payload.toUpperCase() + '- [' + T(System).currentTimeMillis() + ']'"/>
前面的示例在不编写自定义转换器的情况下转换有效负载。
我们的 payload(假设为String
) 是大写的,与当前时间戳连接,并应用了一些格式。
共 电 转换器
Spring 集成提供了一些 transformer 实现。
对象到字符串转换器
因为使用toString()
表示Object
中,Spring Integration 提供了一个ObjectToStringTransformer
其输出为Message
替换为 Stringpayload
.
那String
是调用toString()
作。
下面的示例演示如何声明 object-to-string 转换器的实例:
<int:object-to-string-transformer input-channel="in" output-channel="out"/>
这个转换器的一个潜在用途是向file
Namespace。
而该 channel 适配器仅支持String
、byte-array 或java.io.File
payloads 中,在 Adapter 处理必要的转换之前立即添加此转换器。
只要toString()
call 是你想要写入文件的内容。
否则,您可以使用前面显示的通用 'transformer' 元素来提供基于 POJO 的自定义 transformer。
调试时,通常不需要此转换器,因为logging-channel-adapter 能够记录消息负载。
有关更多详细信息,请参见 Wire Tap。 |
对象到字符串转换器非常简单。
它调用
对于更复杂的作(例如在运行时动态选择字符集),可以改用基于 SPEL 表达式的转换器,如下例所示:
|
如果您需要序列化Object
转换为字节数组,或将字节数组反序列化回Object
,Spring Integration 提供了对称的序列化转换器。
默认情况下,这些使用标准的 Java 序列化,但你可以提供 Spring 的实现Serializer
或Deserializer
策略serializer
和deserializer
属性。
下面的示例展示了如何使用 Spring 的序列化器和反序列化器:
<int:payload-serializing-transformer input-channel="objectsIn" output-channel="bytesOut"/>
<int:payload-deserializing-transformer input-channel="bytesIn" output-channel="objectsOut"
allow-list="com.mycom.*,com.yourcom.*"/>
当反序列化来自不受信任来源的数据时,您应该考虑添加allow-list 的包和类模式。
默认情况下,所有类都被反序列化。 |
Object
-自-Map
和Map
-自-Object
变形金刚
Spring 集成还提供Object
-自-Map
和Map
-自-Object
transformers,它使用 JSON 来序列化和反序列化对象图。
对象层次结构被内省到最原始的类型 (String
,int
等)。
此类型的路径由 SPEL 描述,它将成为key
在变换的Map
.
基元类型变为值。
请考虑以下示例:
public class Parent{
private Child child;
private String name;
// setters and getters are omitted
}
public class Child{
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
前面示例中的两个类将转换为以下内容Map
:
{person.name=George, person.child.name=Jenna, person.child.nickNames[0]=Jen ...}
基于 JSON 的Map
允许您在不共享实际类型的情况下描述对象结构,这样,只要您保持结构,就可以将对象图还原并重新构建为不同类型的对象图。
例如,可以使用Map
-自-Object
转换器:
public class Father {
private Kid child;
private String name;
// setters and getters are omitted
}
public class Kid {
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
如果您需要创建“结构化”映射,则可以提供flatten
属性。
默认值为 'true'。
如果将其设置为 'false',则结构为Map
之Map
对象。
请考虑以下示例:
public class Parent {
private Child child;
private String name;
// setters and getters are omitted
}
public class Child {
private String name;
private List<String> nickNames;
// setters and getters are omitted
}
前面示例中的两个类将转换为以下内容Map
:
{name=George, child={name=Jenna, nickNames=[Bimbo, ...]}}
为了配置这些转换器, Spring 集成为 Object-to-Map 提供了名称空间支持,如下例所示:
<int:object-to-map-transformer input-channel="directInput" output-channel="output"/>
您还可以设置flatten
属性设置为 false,如下所示:
<int:object-to-map-transformer input-channel="directInput" output-channel="output" flatten="false"/>
Spring 集成为 Map-to-Object 提供名称空间支持,如下例所示:
<int:map-to-object-transformer input-channel="input"
output-channel="output"
type="org.something.Person"/>
或者,您可以使用ref
属性和原型范围的 bean,如下例所示:
<int:map-to-object-transformer input-channel="inputA"
output-channel="outputA"
ref="person"/>
<bean id="person" class="org.something.Person" scope="prototype"/>
'ref' 和 'type' 属性是互斥的。
此外,如果使用 'ref' 属性,则必须指向 'prototype' 范围的 bean。
否则,一个BeanCreationException 被抛出。 |
从版本 5.0 开始,您可以提供ObjectToMapTransformer
使用自定义的JsonObjectMapper
— 当您需要特殊格式的日期或空集合的 null(和其他用途)时。
有关更多信息,请参阅 JSON 转换器JsonObjectMapper
实现。
Stream Transformer
这StreamTransformer
变换InputStream
payloads 添加到byte[]
( 或String
如果charset
)。
以下示例演示如何使用stream-transformer
元素:
<int:stream-transformer input-channel="directInput" output-channel="output"/> <!-- byte[] -->
<int:stream-transformer id="withCharset" charset="UTF-8"
input-channel="charsetChannel" output-channel="output"/> <!-- String -->
以下示例演示如何使用StreamTransformer
类和@Transformer
注解在 Java 中配置流转换器:
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToBytes() {
return new StreamTransformer(); // transforms to byte[]
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToString() {
return new StreamTransformer("UTF-8"); // transforms to String
}
JSON 转换器
Spring 集成提供了 Object-to-JSON 和 JSON to-Object 转换器。 下面的一对示例演示如何在 XML 中声明它们:
<int:object-to-json-transformer input-channel="objectMapperInput"/>
<int:json-to-object-transformer input-channel="objectMapperInput"
type="foo.MyDomainObject"/>
默认情况下,前面清单中的 transformer 使用 vanillaJsonObjectMapper
.
它基于 Classpath 中的实现。
您可以提供自己的自定义JsonObjectMapper
implementation 或基于所需的库(例如 GSON),如下例所示:
<int:json-to-object-transformer input-channel="objectMapperInput"
type="something.MyDomainObject" object-mapper="customObjectMapper"/>
从版本 3.0 开始, |
您可能希望考虑使用FactoryBean
或工厂方法来创建JsonObjectMapper
具有所需的特性。
下面的示例展示了如何使用这样的工厂:
public class ObjectMapperFactory {
public static Jackson2JsonObjectMapper getMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
return new Jackson2JsonObjectMapper(mapper);
}
}
以下示例演示如何在 XML 中执行相同的作
<bean id="customObjectMapper" class="something.ObjectMapperFactory"
factory-method="getMapper"/>
从版本 2.2 开始, 如果您希望将 |
从版本 3.0 开始,ObjectToJsonTransformer
将反映 Source 类型的 Headers 添加到消息中。
同样,JsonToObjectTransformer
在将 JSON 转换为对象时可以使用这些类型标头。
这些 Headers 在 AMQP 适配器中映射,因此它们与 Spring-AMQP 完全兼容JsonMessageConverter
.
这使以无需任何特殊配置即可工作:
-
…→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→json-to-object-transformer→…
其中,出站适配器配置了
JsonMessageConverter
入站适配器使用默认的SimpleMessageConverter
. -
…→object-to-json-transformer→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→…
其中,出站适配器配置了
SimpleMessageConverter
入站适配器使用默认的JsonMessageConverter
. -
…→object-to-json-transformer→amqp-outbound-adapter---→
-
---→amqp-inbound-adapter→json-to-object-transformer→
其中,两个适配器都配置了
SimpleMessageConverter
.
使用 Headers 确定类型时,不应提供class 属性,因为它优先于标头。 |
除了 JSON 转换器之外, Spring 集成还提供了一个内置的#jsonPath
用于表达式的 SPEL 函数。
有关更多信息,请参阅 Spring 表达式语言 (SpEL)。
从 3.0 版本开始, Spring 集成还提供了一个内置的#xpath
用于表达式的 SPEL 函数。
有关更多信息,请参阅 #xpath SPEL 函数。
从版本 4.0 开始,ObjectToJsonTransformer
支持resultType
属性,以指定节点 JSON 表示形式。
结果节点树表示取决于提供的JsonObjectMapper
.
默认情况下,ObjectToJsonTransformer
使用Jackson2JsonObjectMapper
并将对象到节点树的转换委托给ObjectMapper#valueToTree
方法。
节点 JSON 表示形式为使用JsonPropertyAccessor
当下游消息流使用具有 JSON 数据属性访问权限的 SpEL 表达式时。
有关更多信息,请参见 Property Accessors 。
从版本 5.1 开始,resultType
可以配置为BYTES
生成一条消息,并使用byte[]
payload 的 Payload作,以便在使用使用此数据类型的下游处理程序时方便使用。
从版本 5.2 开始,JsonToObjectTransformer
可以配置ResolvableType
以支持在使用目标 JSON 处理器进行反序列化期间使用泛型。
此外,此组件现在首先查询请求消息标头,以了解是否存在JsonHeaders.RESOLVABLE_TYPE
或JsonHeaders.TYPE_ID
否则,将回退到配置的类型。
这ObjectToJsonTransformer
现在还会填充JsonHeaders.RESOLVABLE_TYPE
标头。
从版本 5.2.6 开始,JsonToObjectTransformer
可随附valueTypeExpression
要解析ResolvableType
以便在运行时根据请求消息从 JSON 转换的有效负载。
默认情况下,它会咨询JsonHeaders
在请求消息中。
如果此表达式返回null
或ResolvableType
building 会抛出一个ClassNotFoundException
,transformer 将回退到提供的targetType
.
此逻辑以表达式的形式存在,因为JsonHeaders
可能没有真正的类值,而是一些类型 ID,这些 ID 必须根据某些外部注册表映射到目标类。
Apache Avro 转换器
版本 5.2 添加了简单的转换器,用于与 Apache Avro 进行转换。
它们并不复杂,因为没有 schema 注册表;转换器只使用嵌入在SpecificRecord
从 Avro 架构生成的实现。
发送到SimpleToAvroTransformer
必须具有实现SpecificRecord
;transformer 可以处理多种类型。
这SimpleFromAvroTransformer
必须配置SpecificRecord
class 用作反序列化的默认类型。
您还可以指定 SPEL 表达式来确定要使用setTypeExpression
方法。
默认的 SPEL 表达式是headers[avro_type]
(AvroHeaders.TYPE
),默认情况下,它由SimpleToAvroTransformer
替换为源类的完全限定类名。
如果表达式返回null
这defaultType
被使用。
这SimpleToAvroTransformer
还有一个setTypeExpression
方法。
这允许生产者和使用者分离,发送者可以将标头设置为表示类型的某个令牌,然后使用者将该令牌映射到类型。
配置带有注释的转换器
您可以添加@Transformer
注解添加到需要Message
type 或消息负载类型。
返回值的处理方式与前面描述的完全相同在描述<transformer>
元素.
以下示例演示如何使用@Transformer
注解来转换String
转换为Order
:
@Transformer
Order generateOrder(String productId) {
return new Order(productId);
}
Transformer 方法也可以接受@Header
和@Headers
注解,如Annotation Support
.
以下示例演示如何使用@Header
注解:
@Transformer
Order generateOrder(String productId, @Header("customerName") String customer) {
return new Order(productId, customer);
}
另请参阅使用注释通知终端节点。
标头筛选器
有时,您的转换使用案例可能就像删除几个标头一样简单。 对于这样的用例, Spring 集成提供了一个 Headers 过滤器,它允许你指定应该从输出消息中删除的某些 Headers 名称(例如,出于安全原因删除 Headers 或仅临时需要的值)。 基本上,Header 过滤器与 Header Enricher 相反。 后者在 Header Enricher 中讨论。 以下示例定义了一个标头过滤器:
<int:header-filter input-channel="inputChannel"
output-channel="outputChannel" header-names="lastName, state"/>
如您所见,报头过滤器的配置非常简单。
它是一个典型的端点,具有输入和输出通道以及header-names
属性。
该属性接受需要删除的 Headers 的名称(如果有多个,则用逗号分隔)。
因此,在前面的示例中,出站消息中不存在名为 'lastName' 和 'state' 的标头。
基于编解码器的转换器
请参阅 编解码器。
内容丰富器
有时,您可能需要使用比目标系统提供的信息更多的信息来增强请求。 数据扩充器模式描述了各种方案以及允许您满足此类需求的组件 (Enricher)。
Spring 集成Core
模块包括两个扩充器:
它还包括三个特定于适配器的标头扩充器:
请参阅本参考手册中特定于适配器的部分,以了解有关这些适配器的更多信息。
有关表达式支持的更多信息,请参阅 Spring 表达式语言 (SpEL)。
标头扩充器
如果您只需要向消息添加 Headers,并且 Headers 不是由消息内容动态确定的,则引用转换器的自定义实现可能有点矫枉过正。
出于这个原因, Spring 集成提供了对 Headers Enricher 模式的支持。
它通过<header-enricher>
元素。
以下示例演示如何使用它:
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="foo" value="123"/>
<int:header name="bar" ref="someBean"/>
</int:header-enricher>
Header Enricher 还提供了有用的子元素来设置众所周知的 Headers 名称,如下例所示:
<int:header-enricher input-channel="in" output-channel="out">
<int:error-channel ref="applicationErrorChannel"/>
<int:reply-channel ref="quoteReplyChannel"/>
<int:correlation-id value="123"/>
<int:priority value="HIGHEST"/>
<routing-slip value="channel1; routingSlipRoutingStrategy; request.headers[myRoutingSlipChannel]"/>
<int:header name="bar" ref="someBean"/>
</int:header-enricher>
上述配置表明,对于众所周知的 Headers(例如errorChannel
,correlationId
,priority
,replyChannel
,routing-slip
等),而不是使用泛型<header>
子元素中,你必须同时提供标题 'name' 和 'value',你可以使用方便的子元素直接设置这些值。
从版本 4.1 开始,标头丰富器提供了routing-slip
sub-元素。
有关更多信息,请参阅路由单。
POJO 支持
通常,标头值不能静态定义,必须根据消息中的某些内容动态确定。
这就是为什么标头丰富器还允许您通过使用ref
和method
属性。
指定的方法计算 header 值。
考虑以下配置和一个 Bean,该方法修改了String
:
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="something" method="computeValue" ref="myBean"/>
</int:header-enricher>
<bean id="myBean" class="thing1.thing2.MyBean"/>
public class MyBean {
public String computeValue(String payload){
return payload.toUpperCase() + "_US";
}
}
你还可以将 POJO 配置为内部 Bean,如下例所示:
<int:header-enricher input-channel="inputChannel" output-channel="outputChannel">
<int:header name="some_header">
<bean class="org.MyEnricher"/>
</int:header>
</int:header-enricher>
同样,您可以指向 Groovy 脚本,如下例所示:
<int:header-enricher input-channel="inputChannel" output-channel="outputChannel">
<int:header name="some_header">
<int-groovy:script location="org/SampleGroovyHeaderEnricher.groovy"/>
</int:header>
</int:header-enricher>
SPEL 支持
在 Spring Integration 2.0 中,我们引入了 Spring 表达式语言 (SpEL) 的便利性,以帮助配置许多不同的组件。 header enricher 就是其中之一。 再看一下前面显示的 POJO 示例。 你可以看到,确定 header 值的计算逻辑非常简单。 一个自然而然的问题是:“有没有更简单的方法可以做到这一点? 这就是 SpEL 展示其真正力量的地方。 请考虑以下示例:
<int:header-enricher input-channel="in" output-channel="out">
<int:header name="foo" expression="payload.toUpperCase() + '_US'"/>
</int:header-enricher>
通过将 SPEL 用于此类简单情况,您不再需要提供单独的类并在应用程序上下文中对其进行配置。
您需要做的就是配置expression
属性。
'payload'和'headers'变量绑定到 SPEL 评估上下文,为您提供对传入消息的完全访问权限。
使用 Java 配置配置 Header Enricher
以下两个示例显示了如何将 Java 配置用于 Headers 扩充器:
@Bean
@Transformer(inputChannel = "enrichHeadersChannel", outputChannel = "emailChannel")
public HeaderEnricher enrichHeaders() {
Map<String, ? extends HeaderValueMessageProcessor<?>> headersToAdd =
Collections.singletonMap("emailUrl",
new StaticHeaderValueMessageProcessor<>(this.imapUrl));
HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
return enricher;
}
@Bean
@Transformer(inputChannel="enrichHeadersChannel", outputChannel="emailChannel")
public HeaderEnricher enrichHeaders() {
Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
headersToAdd.put("emailUrl", new StaticHeaderValueMessageProcessor<String>(this.imapUrl));
Expression expression = new SpelExpressionParser().parseExpression("payload.from[0].toString()");
headersToAdd.put("from",
new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
return enricher;
}
第一个示例添加单个文本标头。 第二个示例添加了两个 Headers,一个 Literal Headers 和一个基于 SpEL 表达式。
使用 Java DSL 配置头文件丰富器
以下示例显示了标头扩充器的 Java DSL 配置:
@Bean
public IntegrationFlow enrichHeadersInFlow() {
return f -> f
...
.enrichHeaders(h -> h.header("emailUrl", this.emailUrl)
.headerExpression("from", "payload.from[0].toString()"))
.handle(...);
}
Header Channel Registry
从 Spring Integration 3.0 开始,一个新的子元素<int:header-channels-to-string/>
可用。
它没有属性。
这个新的子元素将现有的replyChannel
和errorChannel
headers(当它们是MessageChannel
) 转换为String
并将频道存储在注册表中,以便以后在发送回复或处理错误时进行解析。
这在标头可能丢失的情况下非常有用,例如,在将消息序列化到消息存储中或通过 JMS 传输消息时。
如果标头尚不存在或不是MessageChannel
,则不会进行任何更改。
使用此功能需要存在HeaderChannelRegistry
豆。
默认情况下,框架会创建一个DefaultHeaderChannelRegistry
默认到期时间(60 秒)。
在此时间之后,将从注册表中删除通道。
要更改此行为,请使用id
之integrationHeaderChannelRegistry
并使用 constructor 参数(以毫秒为单位)配置所需的默认延迟。
从版本 4.1 开始,您可以设置一个名为removeOnGet
自true
在<bean/>
定义,并且 Mapping 条目会在首次使用时立即删除。
这在高容量环境中以及当 channel 只使用一次时可能很有用,而不是等待 reaper 将其删除。
这HeaderChannelRegistry
具有size()
方法来确定注册表的当前大小。
这runReaper()
method 取消当前计划任务并立即运行 reaper。
然后,根据当前延迟计划任务再次运行。
可以通过获取对注册表的引用来直接调用这些方法,也可以将包含以下内容的消息发送到控制总线:
"@integrationHeaderChannelRegistry.runReaper()"
此 sub-element 是一种方便,相当于指定以下配置:
<int:reply-channel
expression="@integrationHeaderChannelRegistry.channelToChannelName(headers.replyChannel)"
overwrite="true" />
<int:error-channel
expression="@integrationHeaderChannelRegistry.channelToChannelName(headers.errorChannel)"
overwrite="true" />
从版本 4.1 开始,您现在可以覆盖 registry 配置的 reaper delay,以便 channel mapping 至少保留指定的时间,而不管 reaper 延迟如何。 以下示例显示了如何执行此作:
<int:header-enricher input-channel="inputTtl" output-channel="next">
<int:header-channels-to-string time-to-live-expression="120000" />
</int:header-enricher>
<int:header-enricher input-channel="inputCustomTtl" output-channel="next">
<int:header-channels-to-string
time-to-live-expression="headers['channelTTL'] ?: 120000" />
</int:header-enricher>
在第一种情况下,每个标头通道映射的生存时间为 2 分钟。 在第二种情况下,生存时间在消息报头中指定,如果没有报头,则使用 Elvis 运算符使用两分钟。
Payload Enricher
在某些情况下,如前所述,标头扩充器可能不够,有效负载本身可能必须使用其他信息进行扩充。 例如,进入 Spring Integration 消息传递系统的订单消息必须根据提供的客户编号查找订单的客户,然后用该信息丰富原始有效负载。
Spring Integration 2.1 引入了有效负载丰富器。
有效负载扩充器定义了一个终端节点,该终端节点将Message
发送到公开的请求通道,然后期望收到回复消息。
然后,回复消息成为用于评估表达式以丰富目标负载的根对象。
有效负载扩充器通过enricher
元素。
为了发送请求消息,有效负载扩充器具有request-channel
属性,该属性允许您将消息分派到请求通道。
基本上,通过定义请求通道,有效负载丰富器充当网关,等待发送到请求通道的消息返回。 然后,扩充器使用回复消息提供的数据来扩充消息的有效负载。
在向请求通道发送消息时,您还可以选择使用request-payload-expression
属性。
有效负载的丰富是通过 SPEL 表达式配置的,从而提供了最大程度的灵活性。
因此,您不仅可以使用回复通道的Message
,但您可以使用 SPEL 表达式从该消息中提取子集或应用其他内联转换,从而进一步作数据。
如果您只需要使用静态值来丰富有效负载,则无需提供request-channel
属性。
Enrichers 是 transformer 的一种变体。 在许多情况下,您可以使用有效负载扩充器或通用转换器实现将其他数据添加到消息有效负载中。 你应该熟悉 Spring Integration 提供的所有支持转换的组件,并仔细选择在语义上最适合你的业务案例的实现。 |
配置
以下示例显示了有效负载扩充器的所有可用配置选项:
<int:enricher request-channel="" (1)
auto-startup="true" (2)
id="" (3)
order="" (4)
output-channel="" (5)
request-payload-expression="" (6)
reply-channel="" (7)
error-channel="" (8)
send-timeout="" (9)
should-clone-payload="false"> (10)
<int:poller></int:poller> (11)
<int:property name="" expression="" null-result-expression="'Could not determine the name'"/> (12)
<int:property name="" value="23" type="java.lang.Integer" null-result-expression="'0'"/>
<int:header name="" expression="" null-result-expression=""/> (13)
<int:header name="" value="" overwrite="" type="" null-result-expression=""/>
</int:enricher>
1 | 将消息发送到的通道,以获取用于扩充的数据。 自选。 |
2 | 生命周期属性指示是否应在应用程序上下文启动期间启动此组件。 默认为 true。 自选。 |
3 | 底层 Bean 定义的 ID,即EventDrivenConsumer 或PollingConsumer .
自选。 |
4 | 指定此终端节点作为订阅者连接到通道时的调用顺序。 当该通道使用 “failover” 分派策略时,这一点尤其重要。 当此终端节点本身是具有队列的通道的轮询使用者时,它不起作用。 自选。 |
5 | 标识此终端节点处理消息后将消息发送到的消息通道。 自选。 |
6 | 默认情况下,原始消息的有效负载用作发送到request-channel .
通过将 SpEL 表达式指定为request-payload-expression 属性,您可以使用原始有效负载的子集、标头值或任何其他可解析的 SPEL 表达式作为发送到请求通道的有效负载的基础。
对于表达式计算,完整消息可用作 'root object'。
例如,以下 SpEL 表达式(以及其他表达式)是可能的:payload.something ,headers.something ,new java.util.Date() ,'thing1' + 'thing2' |
7 | 需要回复消息的频道。 这是可选的。 通常,自动生成的临时回复通道就足够了。 自选。 |
8 | 一个ErrorMessage 如果发送了Exception 发生在request-channel .
这使您能够返回用于扩充的替代对象。
如果未设置,则Exception 被抛出给调用方。
自选。 |
9 | 如果通道可能阻塞,则向通道发送消息时要等待的最长时间(以毫秒为单位)。
例如,如果已达到其最大容量,则队列通道可以阻塞,直到有可用空间为止。
在内部,send() timeout 在MessagingTemplate 并最终应用于MessageChannel .
默认情况下,send() timeout is set to '-1', which can cause the send operation on the `MessageChannel 来无限期阻止。
自选。 |
10 | 布尔值,指示是否实现Cloneable 应在将消息发送到请求通道以获取丰富数据之前进行克隆。
克隆的版本将用作最终回复的目标负载。
默认值为false .
自选。 |
11 | 允许您配置消息轮询器(如果此端点是轮询使用者)。 自选。 |
12 | 每property sub-元素提供属性的名称(通过强制的name 属性)。
该属性应在目标负载实例上设置。
恰好是value 或expression 还必须提供属性 - 前者用于设置文字值,后者用于要评估的 SPEL 表达式。
评估上下文的根对象是从此扩充器启动的流返回的消息 — 如果没有请求通道或应用程序上下文(使用@<beanName>.<beanProperty> SPEL 语法)。
从版本 4.0 开始,在指定value 属性,您还可以指定可选的type 属性。
当目标是类型化 setter 方法时,框架会适当地强制该值(只要PropertyEditor ) 来处理转换。
但是,如果目标有效负载是Map ,则条目将填充值而不进行转换。
这type 属性允许您将String 包含数字到Integer 值。
从版本 4.1 开始,您还可以指定可选的null-result-expression 属性。
当enricher 返回 null,则对其进行评估,并返回评估的输出。 |
13 | 每header sub-元素提供消息标头的名称(通过强制的name 属性)。
恰好是value 或expression 还必须提供 attributes — 前者用于设置文字值,后者用于要计算的 SPEL 表达式。
评估上下文的根对象是从此丰富器启动的流返回的消息 — 如果没有请求通道或应用程序上下文(使用 '@<beanName>.<beanProperty>' SPEL 语法,则为输入消息)。
请注意,与<header-enricher> 这<enricher> 元素的header 元素具有type 和overwrite 属性。
但是,一个关键的区别在于,使用<enricher> 这overwrite attribute 为true 默认情况下,为了与<enricher> 元素的<property> sub-元素。
从版本 4.1 开始,您还可以指定可选的null-result-expression 属性。
当enricher 返回 null,则对其进行评估,并返回评估的输出。 |
例子
本节包含在各种情况下使用有效负载扩充器的几个示例。
此处显示的代码示例是 Spring 集成示例项目的一部分。 参见 Spring 集成示例。 |
在以下示例中,User
对象作为Message
:
<int:enricher id="findUserEnricher"
input-channel="findUserEnricherChannel"
request-channel="findUserServiceChannel">
<int:property name="email" expression="payload.email"/>
<int:property name="password" expression="payload.password"/>
</int:enricher>
这User
具有多个属性,但只有username
的初始设置。
增富剂的request-channel
属性配置为传递User
到findUserServiceChannel
.
通过隐式设置reply-channel
一个User
对象,并使用property
sub-元素中,将提取 Reply 中的属性并用于丰富原始有效负载。
如何仅将数据子集传递给请求通道?
当使用request-payload-expression
属性,则可以将有效负载的单个属性而不是完整消息传递给请求通道。
在以下示例中,将 username 属性传递给请求通道:
<int:enricher id="findUserByUsernameEnricher"
input-channel="findUserByUsernameEnricherChannel"
request-channel="findUserByUsernameServiceChannel"
request-payload-expression="payload.username">
<int:property name="email" expression="payload.email"/>
<int:property name="password" expression="payload.password"/>
</int:enricher>
请记住,尽管仅传递了用户名,但发送到请求通道的结果消息包含完整的MessageHeaders
.
如何丰富包含集合数据的有效负载?
在以下示例中,不是User
object、Map
传入:
<int:enricher id="findUserWithMapEnricher"
input-channel="findUserWithMapEnricherChannel"
request-channel="findUserByUsernameServiceChannel"
request-payload-expression="payload.username">
<int:property name="user" expression="payload"/>
</int:enricher>
这Map
包含username
map 键。
只有username
将传递到请求通道。
回复包含一个完整的User
object,它最终会添加到Map
在user
钥匙。
如何在不使用请求通道的情况下使用静态信息丰富有效负载?
以下示例根本不使用请求通道,而仅使用静态值丰富消息的有效负载:
<int:enricher id="userEnricher"
input-channel="input">
<int:property name="user.updateDate" expression="new java.util.Date()"/>
<int:property name="user.firstName" value="William"/>
<int:property name="user.lastName" value="Shakespeare"/>
<int:property name="user.age" value="42"/>
</int:enricher>
请注意,“static”一词在这里使用得很松散。 您仍然可以使用 SPEL 表达式来设置这些值。
索赔检查
在前面的部分中,我们介绍了几个内容扩充器组件,这些组件可以帮助您处理消息缺少一条数据的情况。 我们还讨论了内容筛选,它允许您从消息中删除数据项。 但是,有时我们想暂时隐藏数据。 例如,在分布式系统中,我们可能会收到一条 payload 非常大的消息。 一些间歇性的消息处理步骤可能不需要访问这个 payload,有些可能只需要访问某些 header,因此在每个处理步骤中携带大消息 payload 可能会导致性能下降,可能会产生安全风险,并可能使调试更加困难。
存储在库中(或声明检查)模式描述了一种机制,该机制允许您将数据存储在众所周知的位置,同时仅维护指向该数据所在位置的指针(声明检查)。 您可以将该指针作为新消息的有效负载传递,从而让消息流中的任何组件在需要时立即获取实际数据。 这种方法与认证邮件流程非常相似,在认证邮件流程中,您会在邮箱中获得索赔支票,然后必须前往邮局领取您的实际包裹。 这与航班后或酒店领取行李的想法相同。
Spring 集成提供了两种类型的索赔检查转换器:
-
Incoming Claim Check Transformer
-
传出索赔检查转换器
可以使用方便的基于命名空间的机制来配置它们。
Incoming Claim Check Transformer
传入声明检查转换器通过将传入消息存储在由其message-store
属性。
以下示例定义了传入的索赔检查转换器:
<int:claim-check-in id="checkin"
input-channel="checkinChannel"
message-store="testMessageStore"
output-channel="output"/>
在前面的配置中,在input-channel
将持久化到使用message-store
属性,并使用生成的 ID 编制索引。
该 ID 是该邮件的声明检查。
声明检查也成为发送到output-channel
.
现在,假设您在某个时候确实需要访问实际消息。 您可以手动访问邮件存储并获取邮件的内容,也可以使用相同的方法(创建转换器),只不过现在您使用传出索赔检查转换器将索赔检查转换为实际邮件。
以下清单概述了 incoming claim check transformer 的所有可用参数:
<int:claim-check-in auto-startup="true" (1)
id="" (2)
input-channel="" (3)
message-store="messageStore" (4)
order="" (5)
output-channel="" (6)
send-timeout=""> (7)
<int:poller></int:poller> (8)
</int:claim-check-in>
1 | 生命周期属性指示是否应在应用程序上下文启动期间启动此组件。
它默认为true .
此属性在Chain 元素。
自选。 |
2 | 标识底层 Bean 定义的 ID (MessageTransformingHandler ).
此属性在Chain 元素。
自选。 |
3 | 此终端节点的接收消息通道。
此属性在Chain 元素。
自选。 |
4 | 对MessageStore 供此索赔检查转换器使用。
如果未指定,则默认引用指向名为messageStore .
自选。 |
5 | 指定此终端节点作为订阅者连接到通道时的调用顺序。
当该通道使用failover 调度策略。
当此终端节点本身是具有队列的通道的轮询使用者时,它不起作用。
此属性在Chain 元素。
自选。 |
6 | 标识消息在此终端节点处理后发送到的消息通道。
此属性在Chain 元素。
自选。 |
7 | 指定向输出通道发送回复消息时要等待的最长时间(以毫秒为单位)。
默认为-1 — 无限期阻止。
此属性在Chain 元素。
自选。 |
8 | 定义 poller。
此元素在Chain 元素。
自选。 |
传出索赔检查转换器
传出声明检查转换器允许您将具有声明检查负载的消息转换为以原始内容作为其负载的消息。
<int:claim-check-out id="checkout"
input-channel="checkoutChannel"
message-store="testMessageStore"
output-channel="output"/>
在前面的配置中,在input-channel
应将 claim check 作为其有效负载。
传出声明检查转换器通过查询消息存储以查找由提供的声明检查标识的消息,将其转换为具有原始负载的消息。
然后,它将新签出的消息发送到output-channel
.
下面的清单提供了传出索赔检查转换器的所有可用参数的概述:
<int:claim-check-out auto-startup="true" (1)
id="" (2)
input-channel="" (3)
message-store="messageStore" (4)
order="" (5)
output-channel="" (6)
remove-message="false" (7)
send-timeout=""> (8)
<int:poller></int:poller> (9)
</int:claim-check-out>
1 | 生命周期属性指示是否应在应用程序上下文启动期间启动此组件。
它默认为true .
此属性在Chain 元素。
自选。 |
2 | 标识底层 Bean 定义的 ID (MessageTransformingHandler ).
此属性在Chain 元素。
自选。 |
3 | 此终端节点的接收消息通道。
此属性在Chain 元素。
自选。 |
4 | 对MessageStore 供此索赔检查转换器使用。
如果未指定,则默认引用指向名为messageStore .
自选。 |
5 | 指定此终端节点作为订阅者连接到通道时的调用顺序。
当该通道使用failover 调度策略。
当此终端节点本身是具有队列的通道的轮询使用者时,它不起作用。
此属性在Chain 元素。
自选。 |
6 | 标识消息在此终端节点处理后发送到的消息通道。
此属性在Chain 元素。
自选。 |
7 | 如果设置为true ,该消息将从MessageStore 通过这个转换器。
当 Message 只能被 “认领” 一次时,此设置非常有用。
它默认为false .
自选。 |
8 | 指定向输出通道发送回复消息时要等待的最长时间(以毫秒为单位)。
它默认为-1 — 无限期阻止。
此属性在Chain 元素。
自选。 |
9 | 定义 poller。
此元素在Chain 元素。
自选。 |
领取一次
有时,特定消息只能声明一次。
打个比方,考虑处理飞机行李的过程。
您在出发时托运行李,并在抵达时领取行李。
行李一旦被认领,如果不先重新办理托运,就不能再次领取行李。
为了适应这种情况,我们引入了remove-message
boolean 属性的claim-check-out
转换器。
此属性设置为false
默认情况下。
但是,如果设置为true
,则已声明的消息将从MessageStore
这样就不能再次领取了。
此功能对存储空间有影响,尤其是在内存中的情况下Map
-基于SimpleMessageStore
,其中未能删除消息最终可能会导致OutOfMemoryException
.
因此,如果您不希望进行多次索赔,我们建议您将remove-message
attribute 的值设置为true
.
以下示例演示如何使用remove-message
属性:
<int:claim-check-out id="checkout"
input-channel="checkoutChannel"
message-store="testMessageStore"
output-channel="output"
remove-message="true"/>
Codec
Spring Integration 的 4.2 版本引入了Codec
抽象化。
编解码器对对象进行编码和解码byte[]
.
它们提供了 Java 序列化的替代方案。
一个优点是,通常情况下,对象不需要实现Serializable
.
我们提供了一个使用 Kryo 进行序列化的实现,但您可以提供自己的实现,以便在以下任何组件中使用:
-
EncodingPayloadTransformer
-
DecodingTransformer
-
CodecMessageConverter
DecodingTransformer
此转换器将byte[]
通过使用编解码器。
它需要使用Class
对象应解码为该对象(或解析为Class
).
如果生成的对象是Message<?>
,则不会保留入站标头。
有关更多信息,请参阅 Javadoc。
CodecMessageConverter
某些终端节点(如 TCP 和 Redis)没有消息标头的概念。
它们支持使用MessageConverter
和CodecMessageConverter
可用于将消息转换为byte[]
用于传输。
有关更多信息,请参阅 Javadoc。
克良
目前,这是Codec
,它提供两种Codec
:
-
PojoCodec
: 用于转换器 -
MessageCodec
:用于CodecMessageConverter
该框架提供了几个自定义序列化程序:
-
FileSerializer
-
MessageHeadersSerializer
-
MutableMessageHeadersSerializer
第一个可以和PojoCodec
通过使用FileKryoRegistrar
.
第二个和第三个与MessageCodec
,它使用MessageKryoRegistrar
.
自定义 Kryo
默认情况下,Kryo 将未知的 Java 类型委托给其FieldSerializer
.
Kryo 还为每个原始类型注册了默认序列化器,以及String
,Collection
和Map
.FieldSerializer
使用 Reflection 导航对象图。
更有效的方法是实现一个自定义序列化器,该序列化器知道对象的结构,并且可以直接序列化选定的基元字段。
以下示例显示了这样的序列化程序:
public class AddressSerializer extends Serializer<Address> {
@Override
public void write(Kryo kryo, Output output, Address address) {
output.writeString(address.getStreet());
output.writeString(address.getCity());
output.writeString(address.getCountry());
}
@Override
public Address read(Kryo kryo, Input input, Class<Address> type) {
return new Address(input.readString(), input.readString(), input.readString());
}
}
这Serializer
接口公开Kryo
,Input
和Output
,这些设置可以完全控制包含哪些字段和其他内部设置,如 Kryo 文档中所述。
注册自定义序列化程序时,您需要注册 ID。 注册 ID 是任意的。 但是,在我们的例子中,必须显式定义 ID,因为分布式应用程序中的每个 Kryo 实例都必须使用相同的 ID。 Kryo 建议使用小的正整数,并保留一些 id(值< 10)。 Spring 集成目前默认使用 40、41 和 42(对于前面提到的文件和消息头序列化器)。 我们建议您从 60 开始,以便在框架中进行扩展。 您可以通过配置前面提到的 registrar 来覆盖这些框架默认值。 |
使用自定义 Kryo 序列化器
如果您需要自定义序列化,请参阅 Kryo 文档,因为您需要使用本机 API 进行自定义。
有关示例,请参阅MessageCodec
实现。
实现 KryoSerializable
如果你有write
访问 Domain Object 源代码,您可以实现KryoSerializable
如此处所述。
在这种情况下,该类本身提供序列化方法,无需进一步配置。
但是,基准测试表明,这不如显式注册自定义序列化程序有效。
以下示例显示了一个自定义 Kryo 序列化器:
public class Address implements KryoSerializable {
...
@Override
public void write(Kryo kryo, Output output) {
output.writeString(this.street);
output.writeString(this.city);
output.writeString(this.country);
}
@Override
public void read(Kryo kryo, Input input) {
this.street = input.readString();
this.city = input.readString();
this.country = input.readString();
}
}
您还可以使用此技术包装 Kryo 以外的序列化库。
使用@DefaultSerializer
注解
Kryo 还提供了一个@DefaultSerializer
注释,如此处所述。
@DefaultSerializer(SomeClassSerializer.class)
public class SomeClass {
// ...
}
如果你有write
访问 Domain 对象,这可能是指定自定义序列化程序的更简单方法。
请注意,这不会使用 ID 注册类,这可能会使该技术在某些情况下没有帮助。