This version is still in development and is not considered stable yet. For the latest stable version, please use Spring AMQP 3.2.4! |
RabbitMQ AMQP 1.0 Support
Version 4.0 introduces spring-rabbitmq-client
module for AMQP 1.0 protocol support on RabbitMQ.
This artifact is based on the com.rabbitmq.client:amqp-client library and therefore can work only with RabbitMQ and its AMQP 1.0 protocol support. It cannot be used for any arbitrary AMQP 1.0 broker. For that purpose a JMS bridge and respective Spring JMS integration is recommended so far.
This dependency has to be added to the project to be able to interact with RabbitMQ AMQP 1.0 support:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbitmq-client</artifactId>
<version>4.0.0-SNAPSHOT</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbitmq-client:4.0.0-SNAPSHOT'
The spring-rabbit
(for AMQP 0.9.1 protocol) comes as a transitive dependency for reusing some common API in this new client, for example, exceptions, the @RabbitListener
support.
It is not necessary to use both functionality in the target project, but RabbitMQ allows both AMQP 0.9.1 and 1.0 co-exists.
For more information about RabbitMQ AMQP 1.0 Java Client see its documentation.
RabbitMQ AMQP 1.0 Environment
The com.rabbitmq.client.amqp.Environment
is the first thing which has to be added to the project for connection management and other common settings.
It is an entry point to a node or a cluster of nodes.
The environment allows creating connections.
It can contain infrastructure-related configuration settings shared between connections, e.g. pools of threads, metrics and/or observation:
@Bean
Environment environment() {
return new AmqpEnvironmentBuilder()
.connectionSettings()
.port(5672)
.environmentBuilder()
.build();
}
The same Environment
instance can be used for connecting to different RabbitMQ brokers, then connection setting must be provided on specific connection.
See below.
AMQP Connection Factory
The org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory
abstraction was introduced to manage com.rabbitmq.client.amqp.Connection
.
Don’t confuse it with a org.springframework.amqp.rabbit.connection.ConnectionFactory
which is only for AMQP 0.9.1 protocol.
The SingleAmqpConnectionFactory
implementation is present to manage one connection and its settings.
The same Connection
can be shared between many producers, consumers and management.
The multi-plexing is handled by the link abstraction for AMQP 1.0 protocol implementation internally in the AMQP client library.
The Connection
has recovery capabilities and also handles topology.
In most cases there is just enough to add this bean into the project:
@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
return new SingleAmqpConnectionFactory(environment);
}
See SingleAmqpConnectionFactory
setters for all connection-specific setting.
RabbitMQ Topology Management
For topology management (exchanges, queues and binding between) from the application perspective, the RabbitAmqpAdmin
is present, which is an implementation of existing AmqpAdmin
interface:
@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpAdmin(connectionFactory);
}
The same bean definitions for Exchange
, Queue
, Binding
and Declarables
instances as described in the Configuring the Broker has to be used to manage topology.
The RabbitAdmin
from spring-rabbit
can also do that, but it happens against AMQP 0.9.1 connection, and since RabbitAmqpAdmin
is based on the AMQP 1.0 connection, the topology recovery is handled smoothly from there, together with publishers and consumers recovery.
The RabbitAmqpAdmin
performs respective beans scanning in its start()
lifecycle callback.
The initialize()
, as well-as all other RabbitMQ entities management methods can be called manually at runtime.
Internally the RabbitAmqpAdmin
uses com.rabbitmq.client.amqp.Connection.management()
API to perform respective topology manipulations.
RabbitAmqpTemplate
The RabbitAmqpTemplate
is an implementation of the AsyncAmqpTemplate
and performs various send/receive operations with AMQP 1.0 protocol.
Requires an AmqpConnectionFactory
and can be configured with some defaults.
Even if com.rabbitmq.client:amqp-client
library comes with a com.rabbitmq.client.amqp.Message
, the RabbitAmqpTemplate
still exposes an API based on the well-known org.springframework.amqp.core.Message
with all the supporting classes like MessageProperties
and MessageConverter
abstraction.
The conversion to/from com.rabbitmq.client.amqp.Message
is done internally in the RabbitAmqpTemplate
.
All the methods return a CompletableFuture
to obtain operation results eventually.
The operations with plain object require message body conversion and SimpleMessageConverter
is used by default.
See Message Converters for more information about conversions.
Usually, just one bean like this is enough to perform all the possible template pattern operation:
@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpTemplate(connectionFactory);
}
It can be configured for some default exchange and routing key or just queue.
The RabbitAmqpTemplate
have a default queue for receive operation and another default queue for request-reply operation where temporary queue is created for the request by the client if not present.
Here are some samples of RabbitAmqpTemplate
operations:
@Bean
DirectExchange e1() {
return new DirectExchange("e1");
}
@Bean
Queue q1() {
return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}
@Bean
Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
...
@Test
void defaultExchangeAndRoutingKey() {
this.rabbitAmqpTemplate.setExchange("e1");
this.rabbitAmqpTemplate.setRoutingKey("k1");
this.rabbitAmqpTemplate.setReceiveQueue("q1");
assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
.succeedsWithin(Duration.ofSeconds(10));
assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo("test1");
}
Here we declared an e1
exchange, q1
queue and bind it into that exchange with a k1
routing key.
Then we use a default setting for RabbitAmqpTemplate
to publish messages to the mentioned exchange with the respective routing key and use q1
as default queue for receiving operations.
There are overloaded variants for those methods to send to specific exchange or queue (for send and receive).
The receiveAndConvert()
operations with a ParameterizedTypeReference<T>
requires a SmartMessageConverter
to be injected into the RabbitAmqpTemplate
.
The next example demonstrate and RPC implementation with RabbitAmqpTemplate
(assuming same RabbitMQ objects as in the previous example):
@Test
void verifyRpc() {
String testRequest = "rpc-request";
String testReply = "rpc-reply";
CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);
AtomicReference<String> receivedRequest = new AtomicReference<>();
CompletableFuture<Boolean> rpcServerResult =
this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
payload -> {
receivedRequest.set(payload);
return testReply;
});
assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
assertThat(receivedRequest.get()).isEqualTo(testRequest);
}
The correlation and replyTo
queue are managed internally.
The server side can be implemented with a @RabbitListener
POJO method described below.
The RabbitMQ AMQP 1.0 Consumer
As with many other messaging implementations for consumer side, the spring-rabbitmq-client
modules comes with the RabbitAmqpListenerContainer
which is, essentially, an implementation of well-know MessageListenerContainer
.
It does exactly the same as DirectMessageListenerContainer
, but for RabbitMQ AMQP 1.0 support.
Requires an AmqpConnectionFactory
and at least one queue to consume from.
Also, the MessageListener
(or AMQP 1.0 specific RabbitAmqpMessageListener
) must be provided.
Can be configured with an autoSettle = false
, with the meaning of AcknowledgeMode.MANUAL
.
In that case, the Message
provided to the MessageListener
has in its MessageProperties
an AmqpAcknowledgment
callback for target logic consideration.
The RabbitAmqpMessageListener
has a contract for com.rabbitmq.client:amqp-client
abstractions:
/**
* Process an AMQP message.
* @param message the message to process.
* @param context the consumer context to settle message.
* Null if container is configured for {@code autoSettle}.
*/
void onAmqpMessage(Message message, Consumer.Context context);
Where the first argument is a native received com.rabbitmq.client.amqp.Message
and context
is a native callback for message settlement, similar to the mentioned above AmqpAcknowledgment
abstraction.
The RabbitAmqpMessageListener
can handle and settle messages in batches when batchSize
option is provided.
For this purpose the MessageListener.onMessageBatch()
contract must be implemented.
The batchReceiveDuration
option is used to schedule a force release for not full batches to avoid memory and consumer credits exhausting.
Usually, the RabbitAmqpMessageListener
class is not used directly in the target project, and POJO method annotation configuration via @RabbitListener
is chosen for declarative consumer configuration.
The RabbitAmqpListenerContainerFactory
must be registered under the RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME
, and @RabbitListener
annotation process will register RabbitAmqpMessageListener
instance into the RabbitListenerEndpointRegistry
.
The target POJO method invocation is handled by specific RabbitAmqpMessageListenerAdapter
implementation, which extends a MessagingMessageListenerAdapter
and reuses a lot of its functionality, including request-reply scenarios (async or not).
So, all the concepts described in the Annotation-driven Listener Endpoints are applied with this RabbitAmqpMessageListener
as well.
In addition to traditional messaging payload
and headers
, the @RabbitListener
POJO method contract can be with these parameters:
-
com.rabbitmq.client.amqp.Message
- the native AMQP 1.0 message without any conversions; -
org.springframework.amqp.core.Message
- Spring AMQP message abstraction as conversion result from the native AMQP 1.0 message; -
org.springframework.messaging.Message
- Spring Messaging abstraction as conversion result from the Spring AMQP message; -
Consumer.Context
- RabbitMQ AMQP client consumer settlement API; -
org.springframework.amqp.core.AmqpAcknowledgment
- Spring AMQP acknowledgment abstraction: delegates to theConsumer.Context
.
The following example demonstrates a simple @RabbitListener
for RabbitMQ AMQP 1.0 interaction with the manual settlement:
@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpListenerContainerFactory(connectionFactory);
}
final List<String> received = Collections.synchronizedList(new ArrayList<>());
CountDownLatch consumeIsDone = new CountDownLatch(11);
@RabbitListener(queues = {"q1", "q2"},
ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
concurrency = "2",
id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
try {
if ("discard".equals(data)) {
if (!this.received.contains(data)) {
context.discard();
}
else {
throw new MessageConversionException("Test message is rejected");
}
}
else if ("requeue".equals(data) && !this.received.contains(data)) {
acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
}
else {
acknowledgment.acknowledge();
}
this.received.add(data);
}
finally {
this.consumeIsDone.countDown();
}
}