Spring AMQP 示例项目包括两个示例应用程序。 第一个是一个简单的“Hello World”示例,它演示了同步和异步消息接收。 它为了解基本组件提供了一个很好的起点。 第二个示例基于股票交易用例,用于演示实际应用程序中常见的交互类型。 在本章中,我们将快速浏览每个示例,以便您可以专注于最重要的组件。 这些示例都是基于 Maven 的,因此您应该能够将它们直接导入到任何可识别 Maven 的 IDE(例如 SpringSource 工具套件)中。
“Hello World”示例
“Hello World”示例演示了同步和异步消息接收。
您可以将示例导入到 IDE 中,然后按照下面的讨论进行操作。spring-rabbit-helloworld
同步示例
在目录中,导航到包。
打开类,注意它包含类级别的注释,并注意方法级别的一些注释。
这是 Spring 基于 Java 的配置的一个示例。
您可以在此处阅读有关此内容的更多信息。src/main/java
org.springframework.amqp.helloworld
HelloWorldConfiguration
@Configuration
@Bean
以下列表显示了如何创建连接工厂:
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
该配置还包含一个 的实例,默认情况下,该实例查找 exchange、queue 或 binding 类型的任何 bean,然后在代理上声明它们。
实际上,生成的 Bean 就是一个示例,因为它是 的实例。RabbitAdmin
helloWorldQueue
HelloWorldConfiguration
Queue
以下列表显示了 Bean 定义:helloWorldQueue
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
回顾 Bean 配置,您可以看到它的名称 set 作为其属性(用于接收消息)和其属性(用于发送消息)。rabbitTemplate
helloWorldQueue
queue
routingKey
现在我们已经探索了配置,我们可以查看实际使用这些组件的代码。
首先,从同一包中打开类。
它包含创建 Spring 的方法。Producer
main()
ApplicationContext
下面的列表显示了该方法:main
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在前面的示例中,将检索 Bean 并用于发送 .
由于客户端代码应尽可能依赖于接口,因此类型为 而不是 。
即使创建的 Bean 是 的实例,依赖接口也意味着此代码更具可移植性(您可以独立于代码更改配置)。
由于该方法被调用,因此模板将委托给其实例。
在本例中,它使用默认值 ,但可以向 Bean 提供不同的实现,如 中所定义。AmqpTemplate
Message
AmqpTemplate
RabbitTemplate
HelloWorldConfiguration
RabbitTemplate
convertAndSend()
MessageConverter
SimpleMessageConverter
rabbitTemplate
HelloWorldConfiguration
现在打开课程。
它实际上共享相同的配置基类,这意味着它共享 bean。
这就是为什么我们同时使用 a(用于发送)和 a(用于接收)配置该模板的原因。
正如我们在 AmqpTemplate
中所述,您可以将“routingKey”参数传递给 send 方法,将“queue”参数传递给接收方法。
代码基本上是 Producer 的镜像,调用而不是 .Consumer
rabbitTemplate
routingKey
queue
Consumer
receiveAndConvert()
convertAndSend()
以下列表显示了 :Consumer
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
如果运行 ,然后运行 ,您应该会在控制台输出中看到。Producer
Consumer
Received: Hello World
异步示例
同步示例演练了同步 Hello World 示例。
本节介绍一个稍微高级但功能更强大的选项。
通过一些修改,Hello World 示例可以提供异步接收(也称为消息驱动的 POJO)的示例。
事实上,有一个子包正好提供了这一点:.org.springframework.amqp.samples.helloworld.async
同样,我们从发送方开始。
打开该类,并注意它创建了一个 bean。
这一次,由于配置专用于消息发送端,我们甚至不需要任何队列定义,并且只有“routingKey”属性集。
回想一下,消息是发送到交换的,而不是直接发送到队列。
AMQP 默认交换是没有名称的直接交换。
所有队列都绑定到该默认交换,其名称为路由密钥。
这就是为什么我们只需要在这里提供路由密钥。ProducerConfiguration
connectionFactory
rabbitTemplate
RabbitTemplate
以下列表显示了定义:rabbitTemplate
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于此示例演示了异步消息接收,因此生产端被设计为连续发送消息(如果它是像同步版本那样的每次执行消息的模型,那么它实际上并不是消息驱动的使用者)。
负责连续发送消息的组件被定义为 中的内部类。
它配置为每三秒运行一次。ProducerConfiguration
以下列表显示了该组件:
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
您不需要了解所有细节,因为真正的重点应该在接收端(我们将在下面介绍)。
但是,如果您还不熟悉 Spring 任务调度支持,可以在此处了解更多信息。
简而言之,Bean 中的 Bean 使用调度程序注册任务。postProcessor
ProducerConfiguration
现在我们可以转向接收方。
为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。
该类被调用,并显示在以下列表中:HelloWorldHandler
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
该类是 POJO。
它不扩展任何基类,不实现任何接口,甚至不包含任何导入。
它正在被Spring AMQP“适应”到界面。
然后,您可以在 .
对于此示例,容器是在类中创建的。
您可以在那里看到包裹在适配器中的 POJO。MessageListener
MessageListenerAdapter
SimpleMessageListenerContainer
ConsumerConfiguration
以下列表显示了如何定义:listenerContainer
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
这是一个 Spring 生命周期组件,默认情况下会自动启动。
如果你看一下这个类,你可以看到它的方法只包含一个单行引导程序来创建 .
Producer 的方法也是一个单行引导程序,因为其方法注释的组件也会自动启动。
您可以按任意顺序启动 and,并且应该看到每三秒钟发送和接收一次消息。SimpleMessageListenerContainer
Consumer
main()
ApplicationContext
main()
@Scheduled
Producer
Consumer
股票交易
股票交易示例演示了比 Hello World 示例更高级的消息传递方案。
但是,配置非常相似,如果涉及更多的话。
由于我们详细介绍了 Hello World 配置,因此在这里,我们将重点介绍此示例的不同之处。
有一个服务器将市场数据(股票报价)推送到主题交易所。
然后,客户端可以通过绑定具有路由模式的队列来订阅市场数据馈送(例如,)。
此演示的另一个主要功能是由客户端发起并由服务器处理的请求-回复“股票交易”交互。
这涉及客户端在订单请求消息本身中发送的专用队列。app.stock.quotes.nasdaq.*
replyTo
服务器的核心配置位于包中的类中。
它扩展了 .
这是定义服务器和客户端通用资源的地方,包括市场数据主题交换(其名称为“app.stock.marketdata”)和服务器为股票交易公开的队列(其名称为“app.stock.request”)。
在该通用配置文件中,您还会看到 a 配置在 .RabbitServerConfiguration
org.springframework.amqp.rabbit.stocks.config.server
AbstractStockAppRabbitConfiguration
Jackson2JsonMessageConverter
RabbitTemplate
特定于服务器的配置由两个部分组成。
首先,它配置了市场数据交换,这样它就不需要在每次调用时提供该交换名称来发送 .
它在基配置类中定义的抽象回调方法中执行此操作。
以下列表显示了该方法:RabbitTemplate
Message
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次,声明库存请求队列。
在这种情况下,它不需要任何显式绑定,因为它绑定到默认的无名交换,其名称作为路由键。
如前所述,AMQP 规范定义了该行为。
以下列表显示了 Bean 的定义:stockRequestQueue
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
现在,您已经看到了服务器的 AMQP 资源的配置,请导航到目录下的包。
在那里,您可以看到提供方法的实际类。
它根据配置文件创建一个 base。
在那里,您可以看到发布虚拟市场数据的计划任务。
该配置依赖于 Spring 的命名空间支持。
引导配置文件还会导入一些其他文件。
最有趣的是 ,它位于 的正下方。
在那里,您可以看到负责处理股票交易请求的 bean。
最后,看一下 bean 中定义的 bean(也在 'src/main/resources' 中)。
该 Bean 是该类的一个实例,并且是消息驱动的 POJO 的一个很好的示例,它也可以发送回复消息。
请注意,它本身并不与框架或任何 AMQP 概念耦合。
它接受 a 并返回 a 。
下面的列表显示了该方法的定义:org.springframework.amqp.rabbit.stocks
src/test/java
Server
main()
ApplicationContext
server-bootstrap.xml
task
server-messaging.xml
src/main/resources
messageListenerContainer
serverHandler
server-handlers.xml
ServerHandler
TradeRequest
TradeResponse
handleMessage
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
现在我们已经看到了服务器最重要的配置和代码,我们可以转向客户端。
最好的起点可能是 ,在包中。
请注意,它声明了两个队列,而没有提供显式名称。
以下列表显示了两个队列的 Bean 定义:RabbitClientConfiguration
org.springframework.amqp.rabbit.stocks.config.client
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
这些是专用队列,并且会自动生成唯一名称。
客户端使用第一个生成的队列绑定到服务器公开的市场数据交换。
回想一下,在 AMQP 中,消费者与队列交互,而生产者与交易所交互。
队列与交换的“绑定”是告诉代理将消息从给定交换传递(或路由)到队列。
由于市场数据交换是主题交换,因此绑定可以用路由模式表示。
使用对象执行此操作,并且该对象是使用 Fluent API 生成的。
以下列表显示了:RabbitClientConfiguration
Binding
BindingBuilder
Binding
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
请注意,实际值已在属性文件(下)中外部化,并且我们使用 Spring 的注解来注入该值。
这通常是个好主意。
否则,该值将在类中被硬编码,并且如果不重新编译就无法修改。
在这种情况下,在更改用于绑定的路由模式时,运行多个版本的客户端要容易得多。
我们现在可以尝试一下。client.properties
src/main/resources
@Value
从运行开始,然后 .
您应该会看到股票的虚拟报价,因为与 client.properties 中的“stocks.quote.pattern”键关联的当前值是“app.stock.quotes.nasdaq”。'。
现在,在保持现有服务器
和客户端
运行的同时,将该属性值更改为“app.stock.quotes.nyse”。并启动第二个实例。
您应该看到,第一个客户仍然收到纳斯达克报价,而第二个客户收到纽约证券交易所报价。
相反,您可以更改模式以获取所有股票甚至单个股票代码。org.springframework.amqp.rabbit.stocks.Server
org.springframework.amqp.rabbit.stocks.Client
NASDAQ
Client
我们探索的最后一个功能是从客户端的角度进行请求-回复交互。
回想一下,我们已经看到了接受对象并返回对象。
侧面的相应代码在包装中。
它委托给 以便发送消息。
下面的列表显示了该方法:ServerHandler
TradeRequest
TradeResponse
Client
RabbitStockServiceGateway
org.springframework.amqp.rabbit.stocks.gateway
RabbitTemplate
send
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
请注意,在发送消息之前,它会设置地址。
它提供由 Bean 定义生成的队列(如上所示)。
下面的列表显示了类本身的定义:replyTo
traderJoeQueue
@Bean
StockServiceGateway
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果您不再运行服务器和客户端,请立即启动它们。 尝试发送格式为“100 TCKR”的请求。 在模拟请求“处理”的短暂人为延迟之后,您应该会看到客户端上出现一条确认消息。
从非 Spring 应用程序接收 JSON
Spring 应用程序在发送 JSON 时,将标头设置为完全限定的类名,以帮助接收应用程序将 JSON 转换回 Java 对象。TypeId
此示例探讨了从非 Spring 应用程序转换 JSON 的几种技术。spring-rabbit-json