此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Framework 6.2.0! |
接收消息
这描述了如何在 Spring 中使用 JMS 接收消息。
同步接收
虽然 JMS 通常与异步处理相关联,但您可以
同步消费消息。超载的receive(..)
方法提供此
功能性。在同步接收期间,调用线程会阻塞,直到出现一条消息
变为可用。这可能是一个危险的作,因为调用线程可以
可能会无限期阻止。这receiveTimeout
property 指定多长时间
接收方应该在放弃等待消息之前等待。
异步接收:消息驱动的 POJO
Spring 还通过使用@JmsListener 注解,并提供一个开放的基础设施来以编程方式注册端点。
到目前为止,这是设置 asynchronous receiver 最方便的方法。
有关更多详细信息,请参阅启用侦听器终端节点注释。 |
与EJB世界中的消息驱动Bean (MDB) 类似,消息驱动的
POJO (MDP) 充当 JMS 消息的接收方。一个限制(但请参阅用MessageListenerAdapter
) 的 MDP 上执行
这jakarta.jms.MessageListener
接口。请注意,如果您的 POJO 收到消息
在多个线程上,确保您的实现是线程安全的非常重要。
以下示例显示了 MDP 的简单实现:
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
实施MessageListener
,是时候创建消息侦听器
容器。
以下示例说明如何定义和配置其中一个消息侦听器
容器中(在本例中为DefaultMessageListenerContainer
):
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
请参阅各种消息侦听器容器(所有这些容器都实现了MessageListenerContainer)的 Spring javadoc ,了解每种实施所支持的功能的完整描述。
使用SessionAwareMessageListener
接口
这SessionAwareMessageListener
interface 是一个特定于 Spring 的接口,它提供
与 JMS 类似的合同MessageListener
接口,但也给出了消息处理
方法访问 JMSSession
从中,Message
收到了。
下面的清单显示了SessionAwareMessageListener
接口:
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
您可以选择让您的 MDP 实现此接口(优先于标准
JMS 公司MessageListener
接口),如果您希望 MDP 能够响应任何
收到的消息(通过使用Session
在onMessage(Message, Session)
方法)。Spring 附带的所有消息侦听器容器实现
支持实现MessageListener
或SessionAwareMessageListener
接口。实现SessionAwareMessageListener
来时需要注意的是,他们随后会与 Spring 绑定
通过界面。是否使用它的选择完全取决于您
作为应用程序开发人员或架构师。
请注意,onMessage(..)
方法SessionAwareMessageListener
接口引发JMSException
.与标准 JMS 相比MessageListener
接口中,使用SessionAwareMessageListener
接口,它是
客户端代码负责处理任何引发的异常。
用MessageListenerAdapter
这MessageListenerAdapter
class 是 Spring 的 asynchronous 中的最后一个组件
消息支持。简而言之,它允许您将几乎任何类公开为 MDP
(尽管有一些限制)。
请考虑以下接口定义:
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
请注意,尽管接口既没有扩展MessageListener
也不是SessionAwareMessageListener
接口中,您仍然可以通过使用MessageListenerAdapter
类。还要注意各种消息处理方法的
根据各种内容强类型Message
类型,他们可以
接收和处理。
现在考虑以下MessageDelegate
接口:
public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
特别要注意的是,前面的MessageDelegate
接口(DefaultMessageDelegate
类)完全没有 JMS 依赖项。它确实是一个
POJO 中,我们可以通过以下配置将其制作成 MDP:
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一个示例显示了另一个只能处理接收 JMS 的 MDPTextMessage
消息。请注意消息处理方法的实际调用receive
(消息处理方法的名称,以MessageListenerAdapter
默认为handleMessage
),但它是可配置的(如本节后面所示)。通知
还有receive(..)
method 的强类型化,以便仅接收和响应 JMSTextMessage
消息。
下面的清单显示了TextMessageDelegate
接口:
public interface TextMessageDelegate {
void receive(TextMessage message);
}
下面的清单显示了一个实现TextMessageDelegate
接口:
public class DefaultTextMessageDelegate implements TextMessageDelegate {
// implementation elided for clarity...
}
Attendant 的配置MessageListenerAdapter
将如下所示:
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果messageListener
接收 JMSMessage
类型
以外TextMessage
一IllegalStateException
被抛出(随后
吞下)。另一个功能MessageListenerAdapter
class 是
能够自动发回响应Message
如果处理程序方法返回
non-void 值。请考虑以下接口和类:
public interface ResponsiveTextMessageDelegate {
// notice the return type...
String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// implementation elided for clarity...
}
如果您使用DefaultResponsiveTextMessageDelegate
与MessageListenerAdapter
中,从执行
这'receive(..)'
method 被(在默认配置中)转换为TextMessage
.结果TextMessage
然后发送到Destination
(如果
一个存在)在 JMS 中定义Reply-To
原始属性Message
或
违约Destination
set 在MessageListenerAdapter
(如果已配置)。
如果没有Destination
,则InvalidDestinationException
被抛出
(请注意,此异常不会被吞噬,而是会向上传播
call 堆栈)。
处理事务中的消息
在事务中调用消息侦听器只需要重新配置 listener 容器。
您可以通过sessionTransacted
旗
在侦听器容器定义上。然后,每个消息侦听器调用都会运行
在活动的 JMS 事务中,如果侦听器,则回滚消息接收
执行失败。发送响应消息(通过SessionAwareMessageListener
) 是
部分,但任何其他资源作(例如
数据库访问)独立运行。这通常需要重复的消息
detection 来覆盖数据库处理
已提交,但消息处理无法提交。
考虑以下 bean 定义:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要参与外部管理的事务,您需要配置
事务管理器,并使用支持外部管理的侦听器容器
事务(通常为DefaultMessageListenerContainer
).
要为 XA 事务参与配置消息侦听器容器,您需要
要配置JtaTransactionManager
(默认情况下,委托给 Jakarta EE
服务器的事务子系统)。请注意,底层 JMSConnectionFactory
需要
支持 XA 并已正确注册到 JTA 事务协调器。(检查您的
Jakarta EE 服务器的 JNDI 资源的配置。这也允许消息接收
因为(例如)数据库访问是同一事务的一部分(使用统一提交
语义,但代价是 XA 事务日志开销)。
以下 bean 定义创建一个事务管理器:
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后我们需要将其添加到我们之前的容器配置中。容器 负责其余的事情。以下示例显示了如何执行此作:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/> (1)
</bean>
1 | 我们的交易管理器。 |