连接和资源管理
虽然我们在上一节中描述的 AMQP 模型是通用的,适用于所有实现,但当我们进入资源管理时,细节是特定于代理实现的。 因此,在本节中,我们重点介绍仅存在于 “spring-rabbit” 模块中的代码,因为此时 RabbitMQ 是唯一受支持的实现。
用于管理与 RabbitMQ 代理的连接的中心组件是ConnectionFactory
接口。
责任ConnectionFactory
implementation 是提供org.springframework.amqp.rabbit.connection.Connection
,它是com.rabbitmq.client.Connection
.
选择连接工厂
有三种连接工厂可供选择
-
PooledChannelConnectionFactory
-
ThreadChannelConnectionFactory
-
CachingConnectionFactory
前两个是在 2.3 版本中添加的。
对于大多数使用案例,CachingConnectionFactory
应该使用。
这ThreadChannelConnectionFactory
如果您想确保严格的消息排序而无需使用 Scoped Operations,则可以使用。
这PooledChannelConnectionFactory
类似于CachingConnectionFactory
因为它使用单个连接和通道池。
它的实现更简单,但它不支持相关的发布者确认。
这三个工厂都支持简单的发布者确认。
配置RabbitTemplate
要使用单独的连接,您现在可以从版本 2.3.2 开始,将 Publishing Connection Factory 配置为不同的类型。
默认情况下,发布工厂的类型相同,并且在主工厂上设置的任何属性也会传播到发布工厂。
从版本 3.1 开始,AbstractConnectionFactory
包括connectionCreatingBackOff
属性,该属性支持 Connection 模块中的 Backoff 策略。
目前,支持createChannel()
要处理在channelMax
limit 时,实施基于尝试次数和间隔的回退策略。
PooledChannelConnectionFactory
此工厂基于 Apache Pool2 管理单个连接和两个通道池。
一个池用于事务通道,另一个池用于非事务通道。
池是GenericObjectPool
s 的默认配置;提供回调以配置池;有关更多信息,请参阅 Apache 文档。
阿帕奇commons-pool2
jar 必须位于 class path 上才能使用此工厂。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
此工厂管理单个连接和两个ThreadLocal
s,一个用于事务通道,另一个用于非事务通道。
此工厂确保同一线程上的所有作都使用相同的通道(只要它保持打开状态)。
这有助于对消息进行严格的排序,而无需 Scoped Operations。
为避免内存泄漏,如果您的应用程序使用许多短期线程,则必须调用工厂的closeThreadChannel()
释放频道资源。
从版本 2.3.7 开始,线程可以将其通道传输给另一个线程。
有关更多信息,请参见多线程环境中的严格消息排序。
CachingConnectionFactory
提供的第三个实现是CachingConnectionFactory
,默认情况下,该代理会建立可由应用程序共享的单个连接代理。
共享连接是可能的,因为使用 AMQP 进行消息传递的“工作单元”实际上是一个“通道”(在某些方面,这类似于 JMS 中连接和会话之间的关系)。
connection 实例提供了一个createChannel
方法。
这CachingConnectionFactory
implementation 支持这些通道的缓存,并且它根据通道是否为事务性通道维护单独的缓存。
创建CachingConnectionFactory
,您可以通过构造函数提供 'hostname'。
您还应该提供 'username' 和 'password' 属性。
要配置通道缓存的大小(默认值为 25),您可以调用setChannelCacheSize()
方法。
从版本 1.3 开始,您可以配置CachingConnectionFactory
以缓存连接以及仅缓存通道。
在这种情况下,每次对createConnection()
创建新连接(或从缓存中检索空闲连接)。
关闭连接会将其返回到缓存中(如果尚未达到缓存大小)。
在此类连接上创建的通道也会被缓存。
在某些环境中,例如从 HA 集群使用,使用单独的连接可能很有用。
与负载均衡器结合使用,以连接到不同的集群成员等。
要缓存连接,请将cacheMode
自CacheMode.CONNECTION
.
这不会限制连接数。 相反,它指定允许的空闲打开连接数。 |
从版本 1.5.5 开始,一个名为connectionLimit
。
设置此属性后,它将限制允许的连接总数。
设置后,如果达到限制,则channelCheckoutTimeLimit
用于等待连接变为空闲状态。
如果超过该时间,则AmqpTimeoutException
被抛出。
当缓存模式为 此外,在撰写本文时, |
重要的是要了解缓存大小(默认情况下)不是一个限制,而只是可以缓存的通道数。 如果缓存大小为 10,则实际上可以使用任意数量的通道。 如果使用的通道超过 10 个,并且它们都返回到缓存中,则 10 个通道进入缓存。 其余的都是实体关闭的。
从版本 1.6 开始,默认通道缓存大小已从 1 增加到 25。 在高容量、多线程环境中,小缓存意味着以高速率创建和关闭通道。 增加默认缓存大小可以避免此开销。 您应该通过 RabbitMQ Admin UI 监控正在使用的通道,并考虑进一步增加缓存大小,如果您 查看正在创建和关闭的许多频道。 缓存仅按需增长(以满足应用程序的并发要求),因此此更改不会 影响现有的低容量应用程序。
从版本 1.4.2 开始,CachingConnectionFactory
具有一个名为channelCheckoutTimeout
.
当此属性大于零时,channelCacheSize
成为可在连接上创建的通道数的限制。
如果达到限制,则调用线程会阻塞,直到通道可用或达到此超时,在这种情况下,会触发AmqpTimeoutException
被抛出。
框架内使用的通道(例如RabbitTemplate ) 可靠地返回到缓存中。
如果您在框架之外创建通道(例如
通过直接访问连接并调用createChannel() ),则必须可靠地(通过关闭)返回它们,也许是在finally 块,以避免通道用完。 |
以下示例显示了如何创建新的connection
:
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用 XML 时,配置可能类似于以下示例:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
还有一个SingleConnectionFactory 仅在框架的单元测试代码中可用的实现。
它比CachingConnectionFactory ,因为它不缓存通道,但由于缺乏性能和弹性,它不打算用于简单测试之外的实际使用。
如果您需要实施自己的ConnectionFactory 出于某种原因,AbstractConnectionFactory 基类可能提供了一个很好的起点。 |
一个ConnectionFactory
可以使用 Rabbit 命名空间快速方便地创建,如下所示:
<rabbit:connection-factory id="connectionFactory"/>
在大多数情况下,此方法更可取,因为框架可以为您选择最佳默认值。
创建的实例是一个CachingConnectionFactory
.
请记住,通道的默认缓存大小为 25。
如果要缓存更多通道,请通过设置 'channelCacheSize' 属性来设置更大的值。
在 XML 中,它如下所示:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,使用命名空间,您可以添加 'channel-cache-size' 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
默认缓存模式为CHANNEL
,但您可以将其配置为缓存连接。
在下面的示例中,我们使用connection-cache-size
:
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以使用命名空间提供 host 和 port 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在群集环境中运行,则可以使用 addresses 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
请参阅 连接到集群 以了解有关address-shuffle-mode
.
以下示例中有一个自定义线程工厂,该工厂在线程名称前面加上rabbitmq-
:
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
命名连接
从版本 1.7 开始,ConnectionNameStrategy
用于注入到AbstractionConnectionFactory
.
生成的名称用于目标 RabbitMQ 连接的应用程序特定标识。
如果 RabbitMQ 服务器支持连接名称,则连接名称将显示在管理 UI 中。
此值不必是唯一的,并且不能用作连接标识符,例如,在 HTTP API 请求中。
此值应该是人类可读的,并且是ClientProperties
在connection_name
钥匙。
您可以使用简单的 Lambda,如下所示:
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
这ConnectionFactory
argument 可以通过一些逻辑来区分目标连接名称。
默认情况下,beanName
的AbstractConnectionFactory
、表示对象的十六进制字符串和内部计数器用于生成connection_name
.
这<rabbit:connection-factory>
namespace 组件还随connection-name-strategy
属性。
的实现SimplePropertyValueConnectionNameStrategy
将连接名称设置为 Application 属性。
您可以将其声明为@Bean
并将其注入到 Connection Factory 中,如下例所示:
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
该属性必须存在于应用程序上下文的Environment
.
当使用 Spring Boot 及其自动配置的连接工厂时,你只需要声明ConnectionNameStrategy @Bean .
Boot 会自动检测 bean 并将其连接到工厂。 |
阻塞的连接和资源限制
该连接可能被阻止,无法与对应于 Memory Alarm 的代理进行交互。
从版本 2.0 开始,org.springframework.amqp.rabbit.connection.Connection
可随com.rabbitmq.client.BlockedListener
要通知连接已阻止和未阻止事件的实例。
此外,AbstractConnectionFactory
发出一个ConnectionBlockedEvent
和ConnectionUnblockedEvent
,分别通过其内部BlockedListener
实现。
这些允许您提供应用程序逻辑来对 broker 上的问题做出适当的反应,并(例如)采取一些纠正措施。
当应用程序配置了单个CachingConnectionFactory ,因为默认情况下使用 Spring Boot 自动配置,当连接被 Broker 阻止时,应用程序将停止工作。
当它被 Broker 阻止时,它的任何客户端都会停止工作。
如果我们在同一个应用程序中有 Producer 和 Consumer,那么当 Producer 阻止连接(因为 Broker 上不再有资源)并且 Consumer 无法释放它们(因为连接被阻止)时,我们最终可能会遇到死锁。
为了缓解这个问题,我们建议再有一个单独的CachingConnectionFactory instance 具有相同的选项 — 一个用于生产者,一个用于使用者。
单独的CachingConnectionFactory 对于在使用者线程上执行的事务性生产者来说是不可能的,因为它们应该重用Channel 与消费者交易相关联。 |
从版本 2.0.2 开始,RabbitTemplate
具有一个配置选项,可以自动使用第二个连接工厂,除非正在使用事务。
有关更多信息,请参阅使用单独的连接。
这ConnectionNameStrategy
,因为发布者连接与主要策略相同,其中.publisher
附加到调用方法的结果中。
从版本 1.7.7 开始,AmqpResourceNotAvailableException
,该SimpleConnection.createChannel()
无法创建Channel
(例如,由于channelMax
达到 limit 并且缓存中没有可用的通道)。
您可以在RetryPolicy
在一些回退后恢复作。
配置底层客户端连接工厂
这CachingConnectionFactory
使用 Rabbit 客户端的实例ConnectionFactory
.
许多配置属性通过 (host
,port
,userName
,password
,requestedHeartBeat
和connectionTimeout
例如),在CachingConnectionFactory
.
要设置其他属性 (clientProperties
),您可以定义 Rabbit 工厂的实例,并使用CachingConnectionFactory
.
使用命名空间时(如前所述),您需要在connection-factory
属性。
为方便起见,提供了一个工厂 Bean 来帮助在 Spring 应用程序上下文中配置连接工厂,如下一节所述。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
默认情况下,4.0.x 客户端启用自动恢复。
虽然与此功能兼容,但 Spring AMQP 有自己的恢复机制,通常不需要 Client 端恢复功能。
我们建议禁用amqp-client 自动恢复,以避免AutoRecoverConnectionNotCurrentlyOpenException 代理可用但连接尚未恢复的实例。
您可能会注意到此异常,例如,当RetryTemplate 在RabbitTemplate ,即使故障转移到集群中的其他代理时也是如此。
由于自动恢复连接在计时器上恢复,因此可以使用 Spring AMQP 的恢复机制更快地恢复连接。
从版本 1.7.1 开始, Spring AMQP 禁用amqp-client 自动恢复,除非你显式创建自己的 RabbitMQ 连接工厂并将其提供给CachingConnectionFactory .
RabbitMQ 函数ConnectionFactory 由RabbitConnectionFactoryBean 默认情况下,也禁用了该选项。 |
RabbitConnectionFactoryBean
和配置 SSL
从版本 1.4 开始,一个方便的RabbitConnectionFactoryBean
是为了通过使用依赖关系注入在底层客户端连接工厂上方便地配置 SSL 属性。
其他 setter 委托给底层工厂。
以前,您必须以编程方式配置 SSL 选项。
以下示例显示如何配置RabbitConnectionFactoryBean
:
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
<rabbit:connection-factory id="rabbitConnectionFactory"
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
<bean id="clientConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
有关配置 SSL 的信息,请参阅 RabbitMQ 文档。
省略keyStore
和trustStore
配置以通过 SSL 进行连接,而无需证书验证。
下一个示例显示如何提供密钥和信任存储配置。
这sslPropertiesLocation
property 是一个 SpringResource
指向包含以下键的属性文件:
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
这keyStore
和truststore
是 SpringResources
指向商店。
通常,此属性文件由作系统保护,应用程序具有读取访问权限。
从 Spring AMQP 版本 1.5 开始,您可以直接在工厂 bean 上设置这些属性。
如果 discrete properties 和sslPropertiesLocation
时,后者中的属性会覆盖
discrete 值。
从版本 2.0 开始,默认情况下会验证服务器证书,因为它更安全。
如果出于某种原因希望跳过此验证,请将工厂 Bean 的skipServerCertificateValidation property 设置为true .
从版本 2.1 开始,RabbitConnectionFactoryBean 现在调用enableHostnameVerification() 默认情况下。
要恢复到之前的行为,请将enableHostnameVerification property 设置为false . |
从版本 2.2.5 开始,默认情况下,工厂 Bean 将始终使用 TLS v1.2;以前,它在某些情况下使用 v1.1,而在其他情况下使用 v1.2(取决于其他属性)。
如果出于某种原因需要使用 v1.1,请将sslAlgorithm 财产:setSslAlgorithm("TLSv1.1") . |
连接到集群
要连接到集群,请配置addresses
属性CachingConnectionFactory
:
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
从版本 3.0 开始,每当建立新连接时,底层连接工厂将尝试通过选择随机地址连接到主机。
要恢复到之前尝试从第一个到最后一个连接的行为,请将addressShuffleMode
property 设置为AddressShuffleMode.NONE
.
从版本 2.3 开始,INORDER
添加了 shuffle 模式,这意味着在创建连接后,第一个地址将移动到末尾。
您可能希望将此模式与 RabbitMQ 分片插件一起使用,其中CacheMode.CONNECTION
以及合适的并发(如果您希望从所有节点上的所有分片中使用)。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
路由连接工厂
从版本 1.3 开始,AbstractRoutingConnectionFactory
已引入。
此工厂提供了一种机制,用于为多个ConnectionFactories
并确定目标ConnectionFactory
由一些人lookupKey
在运行时。
通常,该实现会检查线程绑定的上下文。
为方便起见, Spring AMQP 提供了SimpleRoutingConnectionFactory
,它获取当前线程绑定的lookupKey
从SimpleResourceHolder
.
以下示例说明如何配置SimpleRoutingConnectionFactory
在 XML 和 Java 中:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用后取消绑定资源很重要。
有关更多信息,请参阅 JavaDocAbstractRoutingConnectionFactory
.
从版本 1.4 开始,RabbitTemplate
支持 SPELsendConnectionFactorySelectorExpression
和receiveConnectionFactorySelectorExpression
属性,这些属性在每个 AMQP 协议交互作 (send
,sendAndReceive
,receive
或receiveAndReply
),解析为lookupKey
值AbstractRoutingConnectionFactory
.
您可以使用 Bean 引用,例如@vHostResolver.getVHost(#root)
在表达式中。
为send
作时,需要发送的消息是根 Evaluation 对象。
为receive
作中,该queueName
是根评估对象。
路由算法如下:如果 selector 表达式为null
or 的计算结果为null
或提供的ConnectionFactory
不是AbstractRoutingConnectionFactory
,一切都像以前一样工作,依赖于提供的ConnectionFactory
实现。
如果评估结果不是null
,但没有目标ConnectionFactory
为了那个lookupKey
和AbstractRoutingConnectionFactory
配置了lenientFallback = true
.
在AbstractRoutingConnectionFactory
,它会回退到其routing
实现基于determineCurrentLookupKey()
.
但是,如果lenientFallback = false
一IllegalStateException
被抛出。
命名空间支持还提供send-connection-factory-selector-expression
和receive-connection-factory-selector-expression
attributes 上的<rabbit:template>
元件。
此外,从版本 1.4 开始,您可以在侦听器容器中配置路由连接工厂。
在这种情况下,队列名称列表将用作查找键。
例如,如果将容器配置为setQueueNames("thing1", "thing2")
,则查找键为[thing1,thing]"
(请注意,键中没有空格)。
从版本 1.6.9 开始,您可以使用setLookupKeyQualifier
在 Listener 容器上。
例如,这样做可以侦听具有相同名称但在不同虚拟主机中的队列(每个虚拟主机都有一个连接工厂)。
例如,使用 lookup key 限定符thing1
以及一个监听 queue 的容器thing2
,您可以注册目标连接工厂的查找键可以是thing1[thing2]
.
目标(如果提供,则为默认)连接工厂必须具有相同的发布者确认和返回设置。 请参阅 发布者确认并返回。 |
从版本 2.4.4 开始,可以禁用此验证。
如果您遇到 confirms 和 returns 之间的值需要不相等的情况,则可以使用AbstractRoutingConnectionFactory#setConsistentConfirmsReturns
以关闭验证。
请注意,添加到AbstractRoutingConnectionFactory
将确定confirms
和returns
.
如果您遇到某些消息需要检查确认/返回而其他消息不确认/返回的情况,这可能会很有用。 例如:
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
这样,带有标头的消息x-use-publisher-confirms: true
会通过 Caching 连接发送,可以保证消息送达。
有关确保邮件送达的更多信息,请参阅 Publisher Confirms and Returns 。
Queue Affinity 和LocalizedQueueConnectionFactory
在集群中使用 HA 队列时,为了获得最佳性能,您可能需要连接到物理代理
lead 队列所在的位置。
这CachingConnectionFactory
可以配置多个 broker 地址。
这是为了进行故障转移,客户端会尝试根据配置的AddressShuffleMode
次序。
这LocalizedQueueConnectionFactory
使用管理插件提供的 REST API 来确定哪个节点是队列的潜在客户。
然后,它会创建(或从缓存中检索)一个CachingConnectionFactory
,它只连接到该节点。
如果连接失败,则确定新的前导节点,并使用方连接到该节点。
这LocalizedQueueConnectionFactory
配置了默认连接工厂,以防无法确定队列的物理位置,在这种情况下,它会正常连接到集群。
这LocalizedQueueConnectionFactory
是一个RoutingConnectionFactory
和SimpleMessageListenerContainer
使用队列名称作为查找键,如上面的 Routing Connection Factory 中所述。
因此(使用队列名称进行查找),则LocalizedQueueConnectionFactory 仅当容器配置为侦听单个队列时,才能使用。 |
必须在每个节点上启用 RabbitMQ 管理插件。 |
此连接工厂适用于长期连接,例如SimpleMessageListenerContainer .
它不适用于短连接使用,例如使用RabbitTemplate 因为在建立连接之前调用 REST API 的开销。
此外,对于发布作,队列是未知的,并且消息无论如何都会发布到所有集群成员,因此查找节点的逻辑几乎没有价值。 |
以下示例配置显示了如何配置工厂:
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
请注意,前三个参数是addresses
,adminUris
和nodes
.
这些是位置性的,因为当容器尝试连接到队列时,它使用 admin API 来确定哪个节点是队列的引线,并连接到与该节点位于同一数组位置的地址。
从版本 3.0 开始,RabbitMQhttp-client 不再用于访问 Rest API。
相反,默认情况下,WebClient 如果spring-webflux 位于 class 路径上;否则RestTemplate 被使用。 |
添加WebFlux
添加到类路径中:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
您还可以通过实现LocalizedQueueConnectionFactory.NodeLocator
并覆盖其createClient, ``restCall
和可选的close
方法。
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public HashMap<String, Object> restCall(MyClient client, URI uri) {
...
});
});
该框架提供了WebFluxNodeLocator
和RestTemplateNodeLocator
,默认值如上所述。
发布者确认并返回
通过设置CachingConnectionFactory
财产publisherConfirmType
自ConfirmType.CORRELATED
和publisherReturns
property 设置为 'true'。
设置这些选项后,Channel
工厂创建的实例包装在PublisherCallbackChannel
,用于促进回调。
当获得这样的通道时,客户端可以注册一个PublisherCallbackChannel.Listener
使用Channel
.
这PublisherCallbackChannel
implementation 包含用于将 Confirm 或 return 路由到相应侦听器的 logic 。
这些功能将在以下各节中进一步说明。
另请参阅 相关发布者确认和返回 和simplePublisherConfirms
在 Scoped Operations 中。
有关更多背景信息,请参阅 RabbitMQ 团队的博客文章,标题为 Introducing Publisher Confirms。 |
连接侦听器和通道侦听器
连接工厂支持注册ConnectionListener
和ChannelListener
实现。
这允许您接收连接和通道相关事件的通知。
(一个ConnectionListener
由RabbitAdmin
在建立连接时执行声明 - 有关更多信息,请参阅 自动声明交换、队列和绑定 )。
下面的清单显示了ConnectionListener
接口定义:
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
从版本 2.0 开始,org.springframework.amqp.rabbit.connection.Connection
Object 可以随com.rabbitmq.client.BlockedListener
要通知连接已阻止和未阻止事件的实例。
以下示例显示了 ChannelListener 接口定义:
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
请参阅发布是异步的 — 如何检测成功和失败,了解您可能希望注册ChannelListener
.
记录通道关闭事件
版本 1.5 引入了一种机制,使用户能够控制日志记录级别。
这AbstractConnectionFactory
使用默认策略来记录 Channel Closure,如下所示:
-
正常通道关闭 (200 OK) 不会被记录。
-
如果通道由于被动队列声明失败而关闭,则将其记录在 DEBUG 级别。
-
如果通道因
basic.consume
由于排他性消费者条件而被拒绝,则会记录在 DEBUG 级别(自 3.1 起,以前为 INFO)。 -
所有其他记录都为 ERROR 级别。
要修改此行为,您可以注入自定义ConditionalExceptionLogger
到CachingConnectionFactory
在其closeExceptionLogger
财产。
此外,AbstractConnectionFactory.DefaultChannelCloseLogger
现在是 public,允许对其进行子类化。
另请参阅 Consumer Events。
运行时缓存属性
以 1.6 版本为起点,CachingConnectionFactory
现在通过getCacheProperties()
方法。
这些统计信息可用于优化缓存,以便在生产环境中对其进行优化。
例如,高水位线可用于确定是否应增加缓存大小。
如果它等于缓存大小,则可能需要考虑进一步增加。
下表描述了CacheMode.CHANNEL
性能:
财产 | 意义 |
---|---|
connectionName |
由 |
channelCacheSize |
当前配置的允许空闲的最大通道数。 |
localPort |
连接的本地端口(如果可用)。 这可用于与 RabbitMQ Admin UI 上的连接和通道相关联。 |
idleChannelsTx |
当前处于空闲 (缓存) 状态的事务通道数。 |
idleChannelsNotTx |
当前处于空闲 (缓存) 状态的非事务性通道数。 |
idleChannelsTxHighWater |
已同时空闲(缓存)的事务通道的最大数量。 |
idleChannelsNotTxHighWater |
非事务性通道的最大数量已同时空闲(缓存)。 |
下表描述了CacheMode.CONNECTION
性能:
财产 | 意义 |
---|---|
connectionName:<localPort> |
由 |
openConnections |
表示与 broker 的连接的连接对象的数目。 |
channelCacheSize |
当前配置的允许空闲的最大通道数。 |
connectionCacheSize |
当前配置的最大允许空闲连接数。 |
idleConnections |
当前空闲的连接数。 |
idleConnectionsHighWater |
当前空闲的最大连接数。 |
idleChannelsTx:<localPort> |
此连接当前处于空闲 (缓存) 状态的事务通道数。
您可以使用 |
idleChannelsNotTx:<localPort> |
此连接当前处于空闲 (缓存) 状态的非事务性通道数。
这 |
idleChannelsTxHighWater:<localPort> |
已同时空闲(缓存)的事务通道的最大数量。 属性名称的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。 |
idleChannelsNotTxHighWater:<localPort> |
非事务性通道的最大数量已同时空闲(缓存)。
您可以使用 |
这cacheMode
属性 (CHANNEL
或CONNECTION
) 也包括在内。

RabbitMQ 自动连接 / 拓扑恢复
自 Spring AMQP 的第一个版本以来,该框架在代理发生故障时提供了自己的连接和通道恢复。
此外,如 配置 Broker 中所述,RabbitAdmin
重新建立连接时,重新声明任何基础结构 Bean(队列和其他)。
因此,它不依赖于现在由amqp-client
图书馆。
这amqp-client
默认启用自动恢复。
两种恢复机制之间存在一些不兼容之处,因此,默认情况下, Spring 会将automaticRecoveryEnabled
底层RabbitMQ connectionFactory
自false
.
即使属性是true
,Spring 通过立即关闭任何已恢复的连接来有效地禁用它。
默认情况下,只有定义为 bean 的元素(queues、exchanges、bindings)才会在连接失败后被重新声明。 有关如何更改该行为,请参阅恢复自动删除声明。 |