TCP 连接工厂
概述
对于 TCP,底层连接的配置是使用连接工厂提供的。 提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。 客户端连接工厂建立传出连接。 服务器连接工厂侦听传入连接。
出站通道适配器使用客户端连接工厂,但您也可以提供对入站通道适配器的客户端连接工厂的引用。 该适配器接收在出站适配器创建的连接上收到的任何传入消息。
入站通道适配器或网关使用服务器连接工厂。 (事实上,没有一个,连接工厂就无法运行)。 您还可以提供对出站适配器的服务器连接工厂的引用。 然后,您可以使用该适配器向同一连接上的传入邮件发送回复。
仅当回复包含ip_connectionId 由 Connection Factory 插入到原始消息中的 Header。 |
这是在入站和出站适配器之间共享连接工厂时执行的消息关联的范围。 这种共享允许通过 TCP 进行异步双向通信。 默认情况下,仅使用 TCP 传输负载信息。 因此,任何消息关联都必须由下游组件(如聚合器或其他终端节点)执行。 版本 3.0 中引入了对传输所选标头的支持。 有关更多信息,请参阅 TCP 消息关联。 |
您可以为每种类型的最多一个适配器提供对连接工厂的引用。
Spring 集成提供了使用java.net.Socket
和java.nio.channel.SocketChannel
.
以下示例显示了一个简单的服务器连接工厂,该工厂使用java.net.Socket
连接:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"/>
以下示例显示了一个简单的服务器连接工厂,该工厂使用java.nio.channel.SocketChannel
连接:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
using-nio="true"/>
从 Spring 集成版本 4.2 开始,如果服务器配置为侦听随机端口(通过将端口设置为0 ),您可以使用getPort() .
也getServerSocketAddress() 让您获得完整的SocketAddress .
请参阅Javadoc 的TcpServerConnectionFactory 接口了解更多信息。 |
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"/>
以下示例显示了一个客户端连接工厂,该工厂使用java.net.Socket
connections 并为每个消息创建一个新连接:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"
using-nio=true/>
从版本 5.2 开始,客户端连接工厂支持属性connectTimeout
,以秒为单位指定,默认为 60。
消息划分(序列化器和反序列化器)
TCP 是一种流协议。 这意味着必须为通过 TCP 传输的数据提供一些结构,以便接收方可以将数据划分为离散的消息。 连接工厂配置为使用序列化器和反序列化器在消息有效负载和通过 TCP 发送的位之间进行转换。 这是通过分别为入站和出站消息提供反序列化器和序列化器来实现的。 Spring 集成提供了许多标准的序列化器和反序列化器。
ByteArrayCrlfSerializer
*将字节数组转换为字节流,后跟回车符和换行符 (\r\n
).
这是默认的序列化程序(和反序列化程序),例如,可以与 telnet 一起用作 client。
这ByteArraySingleTerminatorSerializer
*将字节数组转换为字节流,后跟单个终止字符(默认值为0x00
).
这ByteArrayLfSerializer
*将字节数组转换为字节流,后跟单个换行符 (0x0a
).
这ByteArrayStxEtxSerializer
*将字节数组转换为前面带有 STX (0x02
) ,后跟一个 ETX (0x03
).
这ByteArrayLengthHeaderSerializer
将字节数组转换为字节流,前面是按网络字节顺序 (Big Endian) 排列的二进制长度。
这是一个高效的反序列化器,因为它不必解析每个字节来查找终止字符序列。
它还可用于包含二进制数据的有效负载。
前面的序列化程序仅支持有效负载中的文本。
长度标头的默认大小为 4 个字节(一个整数),允许消息最大为 (2^31 - 1) 字节。
但是,length
报头可以是单个字节 (unsigned) (最大 255 字节的消息),也可以是无符号短 (2 字节) (最大 (2^16 - 1) 字节的消息)。
如果你需要任何其他格式的 header ,你可以子类化ByteArrayLengthHeaderSerializer
并为readHeader
和writeHeader
方法。
绝对最大数据大小为 (2^31 - 1) 字节。
从版本 5.2 开始,除了有效负载之外,标头值还可以包括 标头的长度。
将inclusive
属性来启用该机制(对于生产者和消费者,必须将其设置为相同)。
这ByteArrayRawSerializer
*,将字节数组转换为字节流,并且不添加其他消息划分数据。
使用此序列化程序(和反序列化程序),消息的结尾由 Client 端按顺序关闭套接字来指示。
使用此序列化程序时,消息接收将挂起,直到客户端关闭套接字或发生超时。
超时不会导致消息。
当使用此序列化程序并且客户端是 Spring 集成应用程序时,客户端必须使用配置了single-use="true"
.
这样做会导致适配器在发送消息后关闭套接字。
序列化程序本身不会关闭连接。
你应该只将此序列化器与通道适配器(而不是网关)使用的连接工厂一起使用,并且连接工厂应该由入站或出站适配器使用,但不能同时由两者使用。
另请参阅ByteArrayElasticRawDeserializer
,在本节后面部分。
但是,从 5.2 版本开始,出站网关有一个新属性closeStreamAfterSend
;这允许使用 Raw serializers/deserializers,因为 EOF 被发送到服务器,同时保持连接打开以接收回复。
在 4.2.2 版本之前,当使用非阻塞 I/O (NIO) 时,此序列化器将超时(读取期间)视为文件结束,到目前为止读取的数据将作为消息发出。
这是不可靠的,不应用于分隔消息。
现在,它将此类条件视为异常。
万一您以这种方式使用它,您可以通过设置treatTimeoutAsEndOfMessage constructor 参数设置为true . |
它们中的每一个都是AbstractByteArraySerializer
,它实现了org.springframework.core.serializer.Serializer
和org.springframework.core.serializer.Deserializer
.
为了向后兼容,使用任何AbstractByteArraySerializer
对于序列化,还接受String
首先转换为 Byte 数组。
这些序列化器和反序列化器中的每一个都将包含相应格式的 input 流转换为字节数组有效负载。
为了避免由于行为不良的客户端(不遵守已配置的序列化程序的协议)而导致的内存耗尽,这些序列化程序会施加最大消息大小。
如果传入消息超过此大小,则会引发异常。
默认最大消息大小为 2048 字节。
您可以通过设置maxMessageSize
财产。
如果使用默认的序列化器或反序列化器,并希望增加最大消息大小,则必须使用maxMessageSize
属性设置并配置连接工厂以使用该 Bean。
本节前面标记的类使用中间缓冲区,并将解码后的数据复制到正确大小的最终缓冲区。
从版本 4.3 开始,您可以通过设置*poolSize
属性来重用这些原始缓冲区,而不是为每条消息分配和丢弃这些缓冲区,这是默认行为。
将该属性设置为负值将创建一个没有边界的池。
如果存储池是有界的,您还可以设置poolWaitTimeout
属性(以毫秒为单位),之后如果没有缓冲区可用,则会引发异常。
它默认为无穷大。
此类异常会导致套接字关闭。
如果你想在自定义反序列化器中使用相同的机制,你可以扩展AbstractPooledBufferByteArraySerializer
(而不是它的超类AbstractByteArraySerializer
) 并实施doDeserialize()
而不是deserialize()
.
缓冲区会自动返回到池中。AbstractPooledBufferByteArraySerializer
还提供了一种方便的 Utility 方法:copyToSizedArray()
.
版本 5.0 添加了ByteArrayElasticRawDeserializer
.
这类似于ByteArrayRawSerializer
上,除了没有必要设置maxMessageSize
.
在内部,它使用ByteArrayOutputStream
这样,缓冲区就会根据需要增长。
客户端必须有序地关闭套接字以发出消息结束的信号。
仅当 peer 受信任时,才应使用此反序列化器;由于内存不足情况,它容易受到 DoS 连接的影响。 |
这MapJsonSerializer
使用 JacksonObjectMapper
要在Map
和 JSON 的 JSON 中。
您可以将此序列化程序与MessageConvertingTcpMessageMapper
以及MapMessageConverter
以 JSON 格式传输选定的标头和有效负载。
Jackson酒店ObjectMapper 无法在流中划分消息。
因此,MapJsonSerializer 需要委托给另一个序列化器或反序列化器来处理消息划分。
默认情况下,ByteArrayLfSerializer ,则生成格式为<json><LF> ,但您可以将其配置为使用其他
(下一个示例演示如何执行此作。 |
最终的标准序列化程序是org.springframework.core.serializer.DefaultSerializer
,可用于通过 Java 序列化转换可序列化对象。org.springframework.core.serializer.DefaultDeserializer
用于包含可序列化对象的流的入站反序列化。
如果您不希望使用默认的序列化器和反序列化器 (ByteArrayCrLfSerializer
),则必须设置serializer
和deserializer
Attributes 的 Interfaces 属性。
以下示例显示了如何执行此作:
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer" />
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
使用java.net.Socket
connections 并在网络上使用 Java 序列化。
有关连接工厂上可用属性的完整详细信息,请参阅本节末尾的参考。
默认情况下,不对入站数据包执行反向 DNS 查找:在未配置 DNS 的环境中(例如 Docker 容器),这可能会导致连接延迟。
要将 IP 地址转换为主机名以用于邮件报头,可以通过设置lookup-host
属性设置为true
.
您还可以修改 sockets 和 socket factories 的属性。 有关更多信息,请参阅 SSL/TLS 支持。 如前所述,如果使用 SSL,则进行此类修改是可能的。 |
房东验证
从版本 5.1.0 开始,默认情况下启用主机验证以增强安全性。 此功能可确保在 TCP 连接期间验证服务器的身份。
如果遇到需要关闭主机校验的场景(不推荐),可以在 tcp-connection-factory 中配置 socket-support 属性。
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="0"
socket-support="customSocketSupport"
single-use="true"
so-timeout="10000"/>
<bean id="customSocketSupport" class="org.springframework.integration.ip.tcp.connection.DefaultTcpSocketSupport">
<constructor-arg value="false" />
</bean>
自定义序列化器和反序列化器
如果您的数据不是标准反序列化器之一支持的格式,则可以实现自己的格式;您还可以实现自定义序列化程序。
要实现自定义序列化器和反序列化器对,请实现org.springframework.core.serializer.Deserializer
和org.springframework.core.serializer.Serializer
接口。
当反序列化器检测到消息之间关闭的输入流时,它必须抛出一个SoftEndOfStreamException
;这是向框架发出的信号,表明关闭是 “正常 ”的。
如果在解码消息时关闭了流,则应改为引发其他异常。
从版本 5.2 开始,SoftEndOfStreamException
现在是RuntimeException
而不是扩展IOException
.
TCP 故障转移客户端连接工厂
您可以配置支持故障转移到一个或多个其他服务器的 TCP 连接工厂。 发送消息时,工厂会遍历其所有已配置的工厂,直到可以发送消息或找不到连接。 最初,使用已配置列表中的第一个 factory。 如果连接随后失败,则下一个工厂将成为当前工厂。 以下示例说明如何配置故障转移客户端连接工厂:
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
<constructor-arg>
<list>
<ref bean="clientFactory1"/>
<ref bean="clientFactory2"/>
</list>
</constructor-arg>
</bean>
使用故障转移连接工厂时,singleUse 属性必须在工厂本身和它配置为使用的工厂列表之间保持一致。 |
当与共享连接一起使用时,连接工厂具有两个与故障回复相关的属性 (singleUse=false
):
-
refreshSharedInterval
-
closeOnRefresh
根据上述配置,考虑以下场景:
假设clientFactory1
无法建立连接,但clientFactory2
能。
当failCF
getConnection()
method 在refreshSharedInterval
已通过clientFactory1
;如果成功,则连接到clientFactory2
将关闭。
如果closeOnRefresh
是false
,则“旧”连接将保持打开状态,如果第一个工厂再次失败,将来可能会重新使用。
设置refreshSharedInterval
仅在该时间到期后尝试重新连接第一家工厂;将其设置为Long.MAX_VALUE
(默认)如果您只想在当前连接失败时故障回复到第一个工厂。
设置closeOnRefresh
在刷新后关闭 “旧” 连接实际上会创建一个新连接。
如果任何委托工厂是CachingClientConnectionFactory 因为连接缓存是在那里处理的;在这种情况下,将始终查阅 Connection Factory 列表以获取 CONNECTION。 |
从版本 5.3 开始,这些默认为Long.MAX_VALUE
和true
因此,工厂仅在当前连接失败时尝试故障回复。
要恢复到以前版本的默认行为,请将它们设置为0
和false
.
另请参阅 测试连接。
TCP 线程关联连接工厂
Spring 集成版本 5.0 引入了这个连接工厂。
它将连接绑定到调用线程,并且每次该线程发送消息时都会重复使用相同的连接。
这种情况一直持续到连接关闭(由服务器或网络)或直到线程调用releaseConnection()
方法。
连接本身由另一个 Client 端工厂实现提供,该实现必须配置为提供非共享(一次性)连接,以便每个线程都获得一个连接。
以下示例说明如何配置 TCP 线程关联连接工厂:
@Bean
public TcpNetClientConnectionFactory cf() {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
Integer.parseInt(System.getProperty(PORT)));
cf.setSingleUse(true);
return cf;
}
@Bean
public ThreadAffinityClientConnectionFactory tacf() {
return new ThreadAffinityClientConnectionFactory(cf());
}
@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(tacf());
outGate.setReplyChannelName("toString");
return outGate;
}