此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
本节介绍轮询在 Spring Integration 中的工作原理。
轮询消费者
当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一:
实际实现取决于这些终结点连接到的通道类型。
连接到实现 org.springframework.messaging.SubscribableChannel
接口的通道的通道适配器会生成 .
另一方面,连接到实现 org.springframework.messaging.PollableChannel
接口的通道的通道适配器(例如 )会生成 .EventDrivenConsumer
QueueChannel
PollingConsumer
轮询消费者让 Spring Integration 组件主动轮询消息,而不是以事件驱动的方式处理消息。
在许多消息传递方案中,它们代表了一个关键的跨领域问题。 在 Spring Integration 中,轮询使用者基于同名模式,这在 Gregor Hohpe 和 Bobby Woolf 所著的 Enterprise Integration Patterns 一书中进行了描述。 您可以在该书的网站上找到该模式的描述。
有关轮询使用者配置的更多信息,请参阅消息终结点。
可轮询消息源
Spring Integration 提供了轮询使用者模式的第二种变体。
使用入站通道适配器时,这些适配器通常由 .
例如,从远程 FTP 服务器位置检索邮件时,FTP 入站通道适配器中描述的适配器配置有轮询器,以定期检索邮件。
因此,当组件配置了轮询器时,生成的实例属于以下类型之一:SourcePollingChannelAdapter
这意味着轮询器用于入站和出站消息传递方案。 以下是使用轮询器的一些用例:
-
轮询某些外部系统,如 FTP 服务器、数据库和 Web 服务
-
轮询内部(可轮询)消息通道
-
轮询内部服务(例如在 Java 类上重复执行方法)
AOP 建议类可以应用于轮询者,例如开始交易的交易建议。
从版本 4.1 开始,提供了 a。
轮询器使用触发器来确定下一次轮询的时间。
可用于禁止(跳过)轮询,可能是因为存在某些下游条件会阻止正在处理的消息。
要使用此建议,您必须为其提供 .
从版本 4.2.5 开始,提供了 a。
要使用它,您可以将实例作为 Bean 添加到应用程序上下文中,将其注入 ,然后将其添加到轮询器的建议链中。
要跳过轮询,请调用 。
要恢复轮询,请致电 。
版本 4.2 在这方面增加了更多的灵活性。
请参阅条件轮询器。advice-chain PollSkipAdvice PollSkipAdvice PollSkipStrategy SimplePollSkipStrategy PollSkipAdvice skipPolls() reset() |
AOP 建议类可以应用于轮询者,例如开始交易的交易建议。
从版本 4.1 开始,提供了 a。
轮询器使用触发器来确定下一次轮询的时间。
可用于禁止(跳过)轮询,可能是因为存在某些下游条件会阻止正在处理的消息。
要使用此建议,您必须为其提供 .
从版本 4.2.5 开始,提供了 a。
要使用它,您可以将实例作为 Bean 添加到应用程序上下文中,将其注入 ,然后将其添加到轮询器的建议链中。
要跳过轮询,请调用 。
要恢复轮询,请致电 。
版本 4.2 在这方面增加了更多的灵活性。
请参阅条件轮询器。advice-chain PollSkipAdvice PollSkipAdvice PollSkipStrategy SimplePollSkipStrategy PollSkipAdvice skipPolls() reset() |
延迟确认可轮询消息源
从版本 5.0.1 开始,某些模块提供了支持延迟确认的实现,直到下游流完成(或将消息移交给另一个线程)。
这目前仅限于 和 .MessageSource
AmqpMessageSource
KafkaMessageSource
使用这些消息源,标头(请参阅 MessageHeaderAccessor
API)将添加到消息中。
与可轮询消息源一起使用时,标头的值是 的实例,如以下示例所示:IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
AcknowledgmentCallback
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如,一个 )都支持该状态。
它的处理方式与 相同。KafkaMessageSource
REJECT
ACCEPT
应用程序可以随时确认消息,如以下示例所示:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果 被连接到 ,当轮询器线程在下游流完成后返回到适配器时,适配器将检查确认是否已被确认,如果没有,则将其状态设置为它(或者流是否引发异常)。
状态值在 AcknowledgmentCallback.Status
枚举中定义。MessageSource
SourcePollingChannelAdapter
ACCEPT
REJECT
Spring Integration 提供对 .
这也负责设置或回调返回(或引发异常)的时间。
以下示例演示如何使用 :MessageSourcePollingTemplate
MessageSource
ACCEPT
REJECT
AcknowledgmentCallback
MessageHandler
MessageSourcePollingTemplate
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在这两种情况下 ( 和 ),您都可以通过调用回调来禁用自动 ack/nack。
如果您将消息传递给另一个线程并希望稍后确认,则可以执行此操作。
并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。SourcePollingChannelAdapter
MessageSourcePollingTemplate
noAutoAck()
消息源的条件轮询器
本节介绍如何使用条件轮询器。
背景
Advice
对象,在轮询器中,建议整个轮询任务(消息检索和处理)。
这些“围绕建议”的方法无法访问民意调查的任何上下文,只能访问民意调查本身。
如前所述,这对于诸如使任务事务化或由于某些外部条件而跳过轮询之类的要求来说很好。
如果我们希望根据轮询部分的结果采取一些行动,或者如果我们想根据条件调整轮询器,该怎么办?对于这些情况,Spring Integration 提供“智能”轮询。advice-chain
receive
“智能”轮询
版本 5.3 引入了该接口。
中实现此接口的任何对象都仅应用于操作 - 和 。
因此,它们只能应用于 或 。
此类实现以下方法:ReceiveMessageAdvice
Advice
advice-chain
receive()
MessageSource.receive()
PollableChannel.receive(timeout)
SourcePollingChannelAdapter
PollingConsumer
-
beforeReceive(Object source)
此方法在方法之前调用。 它允许您检查和重新配置源。 返回将取消此投票(类似于前面提到的)。Object.receive()
false
PollSkipAdvice
-
Message<?> afterReceive(Message<?> result, Object source)
此方法在方法之后调用。 同样,您可以重新配置源或执行任何操作(可能取决于结果,如果源没有创建消息,则可以这样做)。 您甚至可以返回不同的消息receive()
null
螺纹安全
如果 an 更改了源,则不应使用 .
如果突变源,则此类突变不是线程安全的,并且可能会导致意外结果,尤其是对于高频轮询器。
如果需要并发处理轮询结果,请考虑使用下游,而不是向轮询器添加执行程序。 |
建议链订购
您应该了解在初始化期间如何处理建议链。 未实现的对象将应用于整个轮询过程,并且所有对象都首先被调用,顺序是,在任何 .
然后围绕源方法按顺序调用对象。
例如,如果具有 objects ,则 where 和 are ,则按以下顺序应用这些对象:。
此外,如果源已经是 ,则在任何现有对象之后调用 。
如果您想更改订单,您必须自己连接代理。 |
SimpleActiveIdleReceiveMessageAdvice
此建议是 的简单实现。
当与 结合使用时,它会根据上一次轮询是否产生消息来调整轮询频率。
轮询器还必须具有对相同 的引用。ReceiveMessageAdvice
DynamicPeriodicTrigger
DynamicPeriodicTrigger
重要:异步切换
SimpleActiveIdleReceiveMessageAdvice 根据结果修改触发器。
仅当在轮询器线程上调用建议时,这才有效。
如果轮询器具有 .
若要在轮询结果后使用异步操作,请稍后执行异步切换,可能使用 .receive() task-executor ExecutorChannel |
CompoundTriggerAdvice
此建议允许根据投票是否返回消息来选择两个触发器之一。
考虑一个使用 . 实例是不可变的,因此一旦构造就无法更改。
考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但如果未收到任何消息,则每分钟轮询一次,并在检索到消息时恢复使用 cron 表达式。CronTrigger
CronTrigger
建议(和轮询器)为此目的使用 a。
触发器的触发器可以是 .
当建议检测到未收到任何消息时,它会将辅助触发器添加到 .
调用实例的方法时,它会委托给辅助触发器(如果存在)。
否则,它将委托给主触发器。CompoundTrigger
primary
CronTrigger
CompoundTrigger
CompoundTrigger
nextExecutionTime
轮询器还必须具有对相同 的引用。CompoundTrigger
以下示例显示了每小时 cron 表达式的配置,并回退到每分钟一次:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
重要:异步切换
CompoundTriggerAdvice 根据结果修改触发器。
仅当在轮询器线程上调用建议时,这才有效。
如果轮询器具有 .
若要在轮询结果后使用异步操作,请稍后执行异步切换,可能使用 .receive() task-executor ExecutorChannel |
仅限 MessageSource 的建议
有些建议可能只适用于 ,而对 .
为此,接口( 的扩展)仍然存在。
有关详细信息,请参阅入站通道适配器:轮询多个服务器和目录。MessageSource.receive()
PollableChannel
MessageSourceMutator
ReceiveMessageAdvice
螺纹安全
如果 an 更改了源,则不应使用 .
如果突变源,则此类突变不是线程安全的,并且可能会导致意外结果,尤其是对于高频轮询器。
如果需要并发处理轮询结果,请考虑使用下游,而不是向轮询器添加执行程序。 |
建议链订购
您应该了解在初始化期间如何处理建议链。 未实现的对象将应用于整个轮询过程,并且所有对象都首先被调用,顺序是,在任何 .
然后围绕源方法按顺序调用对象。
例如,如果具有 objects ,则 where 和 are ,则按以下顺序应用这些对象:。
此外,如果源已经是 ,则在任何现有对象之后调用 。
如果您想更改订单,您必须自己连接代理。 |
重要:异步切换
SimpleActiveIdleReceiveMessageAdvice 根据结果修改触发器。
仅当在轮询器线程上调用建议时,这才有效。
如果轮询器具有 .
若要在轮询结果后使用异步操作,请稍后执行异步切换,可能使用 .receive() task-executor ExecutorChannel |
重要:异步切换
CompoundTriggerAdvice 根据结果修改触发器。
仅当在轮询器线程上调用建议时,这才有效。
如果轮询器具有 .
若要在轮询结果后使用异步操作,请稍后执行异步切换,可能使用 .receive() task-executor ExecutorChannel |