示例应用程序
Spring AMQP Samples 项目包括两个示例应用程序。 第一个是一个简单的 “Hello World” 示例,它演示了同步和异步消息接收。 它为了解基本组件提供了一个很好的起点。 第二个示例基于股票交易用例,用于演示实际应用程序中常见的交互类型。 在本章中,我们提供了每个示例的快速演练,以便您可以专注于最重要的组件。 这些示例都是基于 Maven 的,因此您应该能够将它们直接导入到任何 Maven 感知的 IDE(比如 SpringSource Tool Suite)中。
“Hello World” 示例
“Hello World” 示例演示了同步和异步消息接收。
您可以导入spring-rabbit-helloworld
样例导入 IDE,然后按照下面的讨论进行作。
同步示例
在src/main/java
目录中,导航到org.springframework.amqp.helloworld
包。
打开HelloWorldConfiguration
类,请注意它包含@Configuration
注解,并注意一些@Bean
方法级别的 annotations。
这是 Spring 基于 Java 的配置的一个例子。
您可以在此处阅读更多相关信息。
下面的清单显示了如何创建 connection factory:
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
该配置还包含一个RabbitAdmin
,默认情况下,它会查找 exchange、queue 或 binding 类型的任何 bean,然后在代理上声明它们。
事实上,helloWorldQueue
在HelloWorldConfiguration
是一个示例,因为它是Queue
.
下面的清单显示了helloWorldQueue
bean 定义:
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
回头看rabbitTemplate
bean 配置中,可以看到它的名称为helloWorldQueue
set 作为其queue
属性(用于接收消息)及其routingKey
属性(用于发送消息)。
现在我们已经探索了配置,我们可以查看实际使用这些组件的代码。
首先,打开Producer
类。
它包含一个main()
方法中,SpringApplicationContext
已创建。
下面的清单显示了main
方法:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在前面的示例中,AmqpTemplate
bean 被检索并用于发送Message
.
由于客户端代码应尽可能依赖接口,因此类型为AmqpTemplate
而不是RabbitTemplate
.
即使HelloWorldConfiguration
是RabbitTemplate
,依赖接口意味着这段代码的可移植性更强(可以独立于代码来更改配置)。
由于convertAndSend()
方法,模板将委托给其MessageConverter
实例。
在本例中,它使用默认的SimpleMessageConverter
,但可以为rabbitTemplate
bean,如HelloWorldConfiguration
.
现在打开Consumer
类。
它实际上共享相同的配置基类,这意味着它共享rabbitTemplate
豆。
这就是为什么我们在routingKey
(用于发送)和queue
(用于接收)。
正如我们在AmqpTemplate
,则可以将 'routingKey' 参数传递给 send 方法,将 'queue' 参数传递给 receive 方法。
这Consumer
code 基本上是 Producer 的镜像,调用receiveAndConvert()
而不是convertAndSend()
.
下面的清单显示了Consumer
:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
如果运行Producer
然后运行Consumer
,您应该会看到Received: Hello World
在控制台输出中。
异步示例
同步示例演练了同步 Hello World 示例。
本节介绍一个稍微高级一些但功能更强大的选项。
通过一些修改,Hello World 示例可以提供异步接收(也称为消息驱动的 POJO)的示例。
事实上,有一个子包恰好提供了:org.springframework.amqp.samples.helloworld.async
.
同样,我们从发送方开始。
打开ProducerConfiguration
类,请注意,它会创建一个connectionFactory
以及rabbitTemplate
豆。
这一次,由于配置专用于消息发送端,我们甚至不需要任何队列定义,并且RabbitTemplate
仅设置了 'routingKey' 属性。
回想一下,消息是发送到 Exchange,而不是直接发送到队列。
AMQP 默认交换是没有名称的直接交换。
所有队列都绑定到该 default 交换,并将其名称作为路由键。
这就是为什么我们只需要在此处提供路由密钥。
下面的清单显示了rabbitTemplate
定义:
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于此示例演示了异步消息接收,因此生产方设计为连续发送消息(如果它是像同步版本一样的每次执行消息模型,那么它实际上是消息驱动的使用者就不会那么明显)。
负责持续发送消息的组件被定义为ProducerConfiguration
.
它配置为每 3 秒运行一次。
下面的清单显示了该组件:
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
您不需要了解所有细节,因为真正的重点应该是接收方(我们接下来将介绍)。
但是,如果您还不熟悉 Spring 任务调度支持,可以在此处了解更多信息。
简单来说,postProcessor
bean 中的ProducerConfiguration
向调度程序注册任务。
现在我们可以转向接收方。
为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。
该类称为HelloWorldHandler
,如下面的清单所示:
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
该类是 POJO。
它不扩展任何基类,不实现任何接口,甚至不包含任何导入。
它正在被“适应”到MessageListener
Spring AMQP 的接口MessageListenerAdapter
.
然后,您可以在SimpleMessageListenerContainer
.
在此示例中,容器是在ConsumerConfiguration
类。
你可以在那里看到 POJO 包装在适配器中。
下面的清单显示了listenerContainer
定义如下:
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
这SimpleMessageListenerContainer
是 Spring 生命周期组件,默认情况下,它会自动启动。
如果您查看Consumer
类中,你可以看到它的main()
方法只包含一个单行引导程序,用于创建ApplicationContext
.
制作人的main()
method 也是一个单行引导程序,因为其方法带有@Scheduled
也会自动启动。
您可以启动Producer
和Consumer
按任意顺序,您应该每三秒看到一次发送和接收的消息。
股票交易
Stock Trading 示例演示了比 Hello World 示例更高级的消息传送方案。
但是,配置非常相似,只是稍微复杂一些。
由于我们详细介绍了 Hello World 配置,因此,我们在这里重点介绍此示例的不同之处。
有一个服务器将市场数据(股票报价)推送到主题交易所。
然后,客户端可以通过将队列与路由模式(例如app.stock.quotes.nasdaq.*
).
此演示的另一个主要功能是由客户端发起并由服务器处理的请求-回复“股票交易”交互。
这涉及到一个私有的replyTo
queue 中,该队列由客户端在 order 请求消息本身中发送。
服务器的核心配置位于RabbitServerConfiguration
类中的org.springframework.amqp.rabbit.stocks.config.server
包。
它扩展了AbstractStockAppRabbitConfiguration
.
这是定义服务器和客户端通用资源的地方,包括市场数据主题交换(其名称为“app.stock.marketdata”)和服务器为股票交易公开的队列(其名称为“app.stock.request”)。
在该通用配置文件中,您还会看到Jackson2JsonMessageConverter
在RabbitTemplate
.
特定于服务器的配置由两部分组成。
首先,它在RabbitTemplate
这样它就不需要在每次调用时都提供该 Exchange 名称来发送Message
.
它在 base configuration 类中定义的抽象回调方法中执行此作。
下面的清单显示了该方法:
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次,声明股票请求队列。
在这种情况下,它不需要任何显式绑定,因为它绑定到默认的 no-name 交换,并将自己的名称作为路由键。
如前所述,AMQP 规范定义了该行为。
下面的清单显示了stockRequestQueue
豆:
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
现在,您已经了解了服务器的 AMQP 资源的配置,请导航到org.springframework.amqp.rabbit.stocks
软件包的src/test/java
目录。
在那里,您可以看到实际的Server
类提供main()
方法。
它会创建一个ApplicationContext
基于server-bootstrap.xml
config 文件。
在那里,您可以看到发布虚拟市场数据的计划任务。
该配置依赖于 Spring 的task
命名空间支持。
引导配置文件还导入了一些其他文件。
最有趣的是server-messaging.xml
,它位于src/main/resources
.
在那里,您可以看到messageListenerContainer
负责处理 Stock Trade 请求的 bean。
最后,看看serverHandler
在server-handlers.xml
(它也在 'src/main/resources' 中)。
该 bean 是ServerHandler
类,是消息驱动的 POJO 的一个很好的例子,它也可以发送回复消息。
请注意,它本身并不与框架或任何 AMQP 概念耦合。
它接受一个TradeRequest
并返回一个TradeResponse
.
下面的清单显示了handleMessage
方法:
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
现在我们已经看到了服务器最重要的配置和代码,我们可以转向客户端。
最好的起点可能是RabbitClientConfiguration
在org.springframework.amqp.rabbit.stocks.config.client
包。
请注意,它声明了两个队列,但没有提供显式名称。
下面的清单显示了两个队列的 bean 定义:
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
这些是私有队列,唯一名称是自动生成的。
客户端使用第一个生成的队列绑定到服务器已公开的市场数据交换。
回想一下,在 AMQP 中,使用者与队列交互,而生产者与交换交互。
队列与交易所的 “绑定” 是告诉 broker 将消息从给定交易所投递 (或路由) 到队列。
由于市场数据交换是主题交换,因此可以使用路由模式来表示绑定。
这RabbitClientConfiguration
执行此作,并使用Binding
object,该对象是使用BindingBuilder
Fluent API 的 API 创建。
下面的清单显示了Binding
:
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
请注意,实际值已在属性文件 (client.properties
下src/main/resources
),并且我们使用 Spring 的@Value
annotation 注入该值。
这通常是一个好主意。
否则,该值将在类中硬编码,并且无需重新编译即可不可修改。
在这种情况下,在更改用于绑定的路由模式的同时运行多个版本的客户端要容易得多。
我们现在可以试试。
从运行 开始org.springframework.amqp.rabbit.stocks.Server
然后org.springframework.amqp.rabbit.stocks.Client
.
您应该会看到NASDAQ
stocks,因为与 client.properties 中的 'stocks.quote.pattern' 键关联的当前值是 'app.stock.quotes.nasdaq.'.
现在,在保持现有的Server
和Client
运行,将该属性值更改为 'app.stock.quotes.nyse.' 并开始第二个Client
实例。
您应该看到,第一个客户仍然接收纳斯达克报价,而第二个客户接收纽约证券交易所报价。
相反,您可以更改模式以获取所有股票甚至单个股票代码。
我们探索的最后一个功能是从客户端的角度进行请求 - 回复交互。
回想一下,我们已经看到了ServerHandler
接受TradeRequest
对象和返回TradeResponse
对象。
上面的相应代码Client
side 为RabbitStockServiceGateway
在org.springframework.amqp.rabbit.stocks.gateway
包。
它将RabbitTemplate
以便发送消息。
下面的清单显示了send
方法:
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
请注意,在发送消息之前,它会设置replyTo
地址。
它提供由traderJoeQueue
bean 定义(如前所示)。
下面的清单显示了@Bean
定义StockServiceGateway
类本身:
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果您不再运行服务器和客户端,请立即启动它们。 尝试发送格式为 '100 TCKR' 的请求。 在模拟请求的 “处理” 的短暂人为延迟之后,您应该会看到客户端上出现一条确认消息。
从非 Spring 应用程序接收 JSON
Spring 应用程序在发送 JSON 时,将TypeId
header 转换为完全限定的类名,以帮助接收应用程序将 JSON 转换回 Java 对象。
这spring-rabbit-json
sample 探讨了从非 Spring 应用程序转换 JSON 的几种技术。