使用 NIO(请参阅 IP 配置属性)可避免专用线程从每个套接字读取。
对于少量套接字,您可能会发现不使用 NIO 以及异步切换(例如 到 )的性能与使用 NIO 一样好或更好。using-nio
QueueChannel
在处理大量连接时,应考虑使用 NIO。 然而,蔚来汽车的使用还有其他一些影响。 线程池(在任务执行器中)在所有套接字之间共享。 每条传入消息都会被组合并发送到配置的通道,作为从该池中选择的线程上的单独工作单元。 到达同一套接字的两个顺序消息可能由不同的线程处理。 这意味着消息发送到通道的顺序是不确定的。 不对到达套接字的消息进行严格排序。
对于某些应用程序,这不是问题。
对于其他人来说,这是一个问题。
如果需要严格的排序,请考虑设置为并使用异步切换。using-nio
false
或者,您可以在入站端点的下游插入一个重排序器,以将消息返回到其正确的序列。
如果设置为在连接工厂上,则到达 TCP 连接的消息已设置和标头。
重排序器使用这些标头将消息返回到其正确的序列。apply-sequence
true
sequenceNumber
correlationId
从版本 5.1.4 开始,优先接受新连接,而不是从现有连接读取。
一般来说,这应该没有什么影响,除非你有非常高的新传入连接率。
如果要恢复到先前的读取优先级行为,请将 上的属性设置为 。multiAccept TcpNioServerConnectionFactory false |
从版本 5.1.4 开始,优先接受新连接,而不是从现有连接读取。
一般来说,这应该没有什么影响,除非你有非常高的新传入连接率。
如果要恢复到先前的读取优先级行为,请将 上的属性设置为 。multiAccept TcpNioServerConnectionFactory false |
泳池大小
不再使用池大小属性。
以前,当未指定任务执行程序时,它指定默认线程池的大小。
它还用于在服务器套接字上设置连接积压工作。
不再需要第一个函数(请参阅下一段)。
第二个函数替换为属性。backlog
以前,在 NIO 中使用固定线程池任务执行器(默认值)时,可能会出现死锁,处理将停止。 当缓冲区已满,从套接字读取的线程尝试向缓冲区添加更多数据,并且没有线程可用于在缓冲区中腾出空间时,会出现此问题。 这仅在非常小的池大小下发生,但在极端条件下是可能的。 自 2.2 以来,有两个更改消除了这个问题。 首先,默认任务执行程序是缓存的线程池执行程序。 其次,添加了死锁检测逻辑,这样,如果发生线程匮乏,则抛出异常,而不是死锁,从而释放死锁资源。
现在,默认任务执行程序是无限制的,如果消息处理需要较长的时间,则可能会出现内存不足情况,传入消息的速率很高。 如果应用程序表现出这种行为,则应使用具有适当池大小的共用任务执行程序,但请参阅下一节。 |
现在,默认任务执行程序是无限制的,如果消息处理需要较长的时间,则可能会出现内存不足情况,传入消息的速率很高。 如果应用程序表现出这种行为,则应使用具有适当池大小的共用任务执行程序,但请参阅下一节。 |
具有策略的线程池任务执行程序CALLER_RUNS
当您使用固定线程池时,应牢记一些重要的注意事项,当使用命名空间时,队列容量较小。CallerRunsPolicy
CALLER_RUNS
<task/>
如果不使用固定线程池,则以下情况不适用。
使用 NIO 连接,有三种不同的任务类型。 I/O 选择器处理在一个专用线程上执行(检测事件、接受新连接以及使用任务执行程序将 I/O 读取操作分派给其他线程)。 当 I/O 读取器线程(读取操作被调度到该线程)读取数据时,它会移交给另一个线程以组合传入消息。 大型邮件可能需要多次读取才能完成。 这些“汇编程序”线程可能会在等待数据时阻塞。 当发生新的读取事件时,读取器将确定此套接字是否已有汇编程序,如果没有,则运行新的套接字。 组装过程完成后,汇编程序线程将返回到池中。
当池用完、拒绝策略正在使用以及任务队列已满时,这可能会导致死锁。
当池为空且队列中没有空间时,IO 选择器线程将接收事件并使用执行程序调度读取。
队列已满,因此选择器线程本身会启动读取过程。
现在,它检测到此套接字没有汇编程序,并在执行读取之前触发汇编程序。
同样,队列已满,选择器线程成为汇编器。
汇编程序现在被阻止,等待读取数据,这永远不会发生。
连接工厂现在处于死锁状态,因为选择器线程无法处理新事件。CALLER_RUNS
OP_READ
为了避免这种死锁,我们必须避免选择器(或读取器)线程执行组装任务。 我们希望对 IO 和程序集操作使用单独的池。
该框架提供了一个 ,它允许配置两个不同的执行器:一个用于执行 IO 操作,另一个用于消息汇编。
在此环境中,IO 线程永远不会成为汇编程序线程,并且不会发生死锁。CompositeExecutor
此外,任务执行程序应配置为使用 ( 当使用 时)。
当 I/O 任务无法完成时,它会延迟一小段时间,并不断重试,直到可以完成并分配汇编程序。
默认情况下,延迟为 100 毫秒,但可以通过在连接工厂上设置属性来更改它(使用 XML 命名空间进行配置时)。AbortPolicy
ABORT
<task>
readDelay
read-delay
以下三个示例演示如何配置复合执行程序:
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>