对于最新的稳定版本,请使用 Spring Framework 6.2.4! |
接收消息
这描述了如何在 Spring 中使用 JMS 接收消息。
同步接收
虽然 JMS 通常与异步处理相关联,但您可以
同步消费消息。超载的receive(..)
方法提供此
功能性。在同步接收期间,调用线程会阻塞,直到出现一条消息
变为可用。这可能是一个危险的作,因为调用线程可以
可能会无限期阻止。这receiveTimeout
property 指定多长时间
接收方应该在放弃等待消息之前等待。
异步接收:消息驱动的 POJO
Spring 还通过使用@JmsListener 注解,并提供一个开放的基础设施来以编程方式注册端点。
到目前为止,这是设置 asynchronous receiver 最方便的方法。
有关更多详细信息,请参阅启用侦听器终端节点注释。 |
与EJB世界中的消息驱动Bean (MDB) 类似,消息驱动的
POJO (MDP) 充当 JMS 消息的接收方。一个限制(但请参阅用MessageListenerAdapter
) 的 MDP 上执行
这jakarta.jms.MessageListener
接口。请注意,如果您的 POJO 收到消息
在多个线程上,确保您的实现是线程安全的非常重要。
以下示例显示了 MDP 的简单实现:
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
Once you have implemented your MessageListener
, it is time to create a message listener
container.
The following example shows how to define and configure one of the message listener
containers that ships with Spring (in this case, DefaultMessageListenerContainer
):
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
See the Spring javadoc of the various message listener containers (all of which implement
MessageListenerContainer)
for a full description of the features supported by each implementation.
Using the SessionAwareMessageListener
Interface
The SessionAwareMessageListener
interface is a Spring-specific interface that provides
a similar contract to the JMS MessageListener
interface but also gives the message-handling
method access to the JMS Session
from which the Message
was received.
The following listing shows the definition of the SessionAwareMessageListener
interface:
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
You can choose to have your MDPs implement this interface (in preference to the standard
JMS MessageListener
interface) if you want your MDPs to be able to respond to any
received messages (by using the Session
supplied in the onMessage(Message, Session)
method). All of the message listener container implementations that ship with Spring
have support for MDPs that implement either the MessageListener
or
SessionAwareMessageListener
interface. Classes that implement the
SessionAwareMessageListener
come with the caveat that they are then tied to Spring
through the interface. The choice of whether or not to use it is left entirely up to you
as an application developer or architect.
Note that the onMessage(..)
method of the SessionAwareMessageListener
interface throws JMSException
. In contrast to the standard JMS MessageListener
interface, when using the SessionAwareMessageListener
interface, it is the
responsibility of the client code to handle any thrown exceptions.
Using MessageListenerAdapter
The MessageListenerAdapter
class is the final component in Spring’s asynchronous
messaging support. In a nutshell, it lets you expose almost any class as an MDP
(though there are some constraints).
Consider the following interface definition:
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
Notice that, although the interface extends neither the MessageListener
nor the
SessionAwareMessageListener
interface, you can still use it as an MDP by using the
MessageListenerAdapter
class. Notice also how the various message handling methods are
strongly typed according to the contents of the various Message
types that they can
receive and handle.
Now consider the following implementation of the MessageDelegate
interface:
public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
In particular, note how the preceding implementation of the MessageDelegate
interface (the
DefaultMessageDelegate
class) has no JMS dependencies at all. It truly is a
POJO that we can make into an MDP through the following configuration:
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
The next example shows another MDP that can handle only receiving JMS
TextMessage
messages. Notice how the message handling method is actually called
receive
(the name of the message handling method in a MessageListenerAdapter
defaults to handleMessage
), but it is configurable (as you can see later in this section). Notice
also how the receive(..)
method is strongly typed to receive and respond only to JMS
TextMessage
messages.
The following listing shows the definition of the TextMessageDelegate
interface:
public interface TextMessageDelegate {
void receive(TextMessage message);
}
The following listing shows a class that implements the TextMessageDelegate
interface:
public class DefaultTextMessageDelegate implements TextMessageDelegate {
// implementation elided for clarity...
}
The configuration of the attendant MessageListenerAdapter
would then be as follows:
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
Note that, if the messageListener
receives a JMS Message
of a type
other than TextMessage
, an IllegalStateException
is thrown (and subsequently
swallowed). Another of the capabilities of the MessageListenerAdapter
class is the
ability to automatically send back a response Message
if a handler method returns a
non-void value. Consider the following interface and class:
public interface ResponsiveTextMessageDelegate {
// notice the return type...
String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// implementation elided for clarity...
}
If you use the DefaultResponsiveTextMessageDelegate
in conjunction with a
MessageListenerAdapter
, any non-null value that is returned from the execution of
the 'receive(..)'
method is (in the default configuration) converted into a
TextMessage
. The resulting TextMessage
is then sent to the Destination
(if
one exists) defined in the JMS Reply-To
property of the original Message
or the
default Destination
set on the MessageListenerAdapter
(if one has been configured).
If no Destination
is found, an InvalidDestinationException
is thrown
(note that this exception is not swallowed and propagates up the
call stack).
Processing Messages Within Transactions
Invoking a message listener within a transaction requires only reconfiguration of the
listener container.
You can activate local resource transactions through the sessionTransacted
flag
on the listener container definition. Each message listener invocation then operates
within an active JMS transaction, with message reception rolled back in case of listener
execution failure. Sending a response message (through SessionAwareMessageListener
) is
part of the same local transaction, but any other resource operations (such as
database access) operate independently. This usually requires duplicate message
detection in the listener implementation, to cover the case where database processing
has committed but message processing failed to commit.
Consider the following bean definition:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
To participate in an externally managed transaction, you need to configure a
transaction manager and use a listener container that supports externally managed
transactions (typically, DefaultMessageListenerContainer
).
To configure a message listener container for XA transaction participation, you want
to configure a JtaTransactionManager
(which, by default, delegates to the Jakarta EE
server’s transaction subsystem). Note that the underlying JMS ConnectionFactory
needs to
be XA-capable and properly registered with your JTA transaction coordinator. (Check your
Jakarta EE server’s configuration of JNDI resources.) This lets message reception as well
as (for example) database access be part of the same transaction (with unified commit
semantics, at the expense of XA transaction log overhead).
The following bean definition creates a transaction manager:
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
Then we need to add it to our earlier container configuration. The container
takes care of the rest. The following example shows how to do so:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/> (1)
</bean>
1
Our transaction manager.