5. 消息传递
Spring Cloud AWS 提供 Amazon SQS 和 Amazon SNS 集成 这简化了通过 SQS 或 SNS 发布和使用消息的过程。虽然 SQS 完全依赖于消息收发 API 在 Spring 4.0 中引入,SNS 仅部分实现了它,因为接收部分必须以不同的方式处理 推送通知。
5.1. 配置消息传递
在使用和配置消息传递支持之前,应用程序必须包含相应的模块依赖项 添加到 Maven 配置中。Spring Cloud AWS Messaging 支持作为单独的模块提供,以允许模块化使用 的模块。
5.1.1. Maven 依赖配置
Spring Cloud AWS 消息传递模块作为独立模块提供,可以使用以下依赖项声明导入:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId>
<version>{spring-cloud-version}</version>
</dependency>
5.2. SQS 支持
Amazon SQS 是 Amazon Web Service 平台上的托管消息收发服务,可提供点对点通信 与队列。与 JMS 或其他消息服务相比,Amazon SQS 具有多项功能和限制,这些功能和限制应该 考虑到。
-
Amazon SQS 只允许负载,因此必须将任何负载转换为 String 表示形式。 Spring Cloud AWS 专门支持通过将 Java 对象转换为 JSON 来传输带有 Amazon SQS 消息的 Java 对象。
String
Object
-
Amazon SQS 不支持事务,因此可能会检索两次消息。申请必须填写 一种幂等方式,以便它们可以接收消息两次。
-
Amazon SQS 每条消息的最大消息大小为 256kb,因此无法发送更大的消息。
5.2.1. 发送消息
这包含许多发送消息的便捷方法。有一些 send 方法可以指定
destination 使用对象,而那些使用字符串指定 destination 的字符串将
根据 SQS API 进行解析。不带 destination 参数的 send 方法使用默认 destination。QueueMessagingTemplate
QueueMessageChannel
import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.messaging.support.MessageBuilder;
public class SqsQueueSender {
private final QueueMessagingTemplate queueMessagingTemplate;
@Autowired
public SqsQueueSender(AmazonSQSAsync amazonSQSAsync) {
this.queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
}
public void send(String message) {
this.queueMessagingTemplate.send("physicalQueueName", MessageBuilder.withPayload(message).build());
}
}
此示例使用该类创建具有字符串 payload 的消息。的
构造的。send 方法中的目标是一个字符串值,该值
必须与 AWS 上定义的队列名称匹配。此值将在运行时由 Amazon SQS 客户端解析。选择
可以将实现传递给构造函数以通过以下方式解析资源
在 CloudFormation 堆栈中运行时的逻辑名称(有关以下内容的更多信息,请参阅管理云环境
资源名称解析)。MessageBuilder
QueueMessagingTemplate
AmazonSQSAsync
ResourceIdResolver
QueueMessagingTemplate
使用消息传递命名空间,可以在 XML 配置文件中定义 a。QueueMessagingTemplate
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aws-context="http://www.springframework.org/schema/cloud/aws/context"
xmlns:aws-messaging="http://www.springframework.org/schema/cloud/aws/messaging"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/cloud/aws/context
http://www.springframework.org/schema/cloud/aws/context/spring-cloud-aws-context.xsd
http://www.springframework.org/schema/cloud/aws/messaging
http://www.springframework.org/schema/cloud/aws/messaging/spring-cloud-aws-messaging">
<aws-context:context-credentials>
<aws-context:instance-profile-credentials />
</aws-context:context-credentials>
<aws-messaging:queue-messaging-template id="queueMessagingTemplate" />
</beans>
在此示例中,消息命名空间处理程序构造一个新的 .客户端
根据提供的凭证自动创建并传递给模板的构造函数。如果
在配置的 CloudFormation 堆栈中运行的应用程序 a 将传递给构造函数(有关资源名称解析的更多信息,请参阅管理云环境)。QueueMessagingTemplate
AmazonSQSAsync
ResourceIdResolver
使用消息转换器
为了方便发送域模型对象,它有各种 send 方法,
将 Java 对象作为消息数据内容的参数。重载的方法 和 in 将转换过程委托给接口的实例。此接口定义了一个简单的协定,用于在 Java 对象和 SQS 消息之间进行转换。默认的
implementation 只需解包消息有效负载,只要它与 target 类型匹配即可。由
使用转换器,您和您的应用程序代码可以专注于通过
SQS 的,而不必关心如何将其表示为 SQS 消息的详细信息。QueueMessagingTemplate
convertAndSend()
receiveAndConvert()
QueueMessagingTemplate
MessageConverter
SimpleMessageConverter
由于 SQS 只能发送有效负载,因此只应使用默认转换器
发送有效负载。对于更复杂的对象,应使用自定义转换器,就像
消息命名空间处理程序。 |
建议使用 XML 消息传递命名空间来创建,因为它会将
sophisticated 的,当 Jackson 位于 Classpath 上时,将对象转换为 JSON。QueueMessagingTemplate
MessageConverter
<aws-messaging:queue-messaging-template id="queueMessagingTemplate" />
this.queueMessagingTemplate.convertAndSend("queueName", new Person("John, "Doe"));
在此示例中,a 是使用消息命名空间创建的。方法
使用配置的 (Configured) 转换有效负载并发送消息。QueueMessagingTemplate
convertAndSend
Person
MessageConverter
5.2.2. 接收消息
有两种方法可以接收 SQS 消息,使用 或
注释驱动的侦听器端点。后者是迄今为止更方便的消息接收方式。receive
QueueMessagingTemplate
Person person = this.queueMessagingTemplate.receiveAndConvert("queueName", Person.class);
在此示例中,将从 SQS 队列获取一条消息,并将其转换为目标类
作为参数传递。QueueMessagingTemplate
5.2.3. 注解驱动的侦听器端点
注释驱动的侦听器终端节点是侦听 SQS 消息的最简单方法。只需使用 和 注释方法,即可将消息路由到带注释的方法。MessageMapping
QueueMessageHandler
<aws-messaging:annotation-driven-queue-listener />
@SqsListener("queueName")
public void queueListener(Person person) {
// ...
}
在此示例中,将启动一个队列侦听器容器,该容器轮询传递给注释的 SQS。传入消息将转换为目标类型,然后调用带注释的方法。queueName
MessageMapping
queueListener
除了有效负载之外,还可以使用 or 注释将标头注入侦听器方法中。 用于注入特定的 Headers 值,而注入 Containing All Headers。@Header
@Headers
@Header
@Headers
Map<String, String>
只有标准 支持随 SQS 消息一起发送的消息属性。目前不支持自定义属性。
除了提供的参数解析器之外,还可以使用 attribute 在元素上注册自定义解析器(请参阅下面的示例)。aws-messaging:annotation-driven-queue-listener
aws-messaging:argument-resolvers
<aws-messaging:annotation-driven-queue-listener>
<aws-messaging:argument-resolvers>
<bean class="org.custom.CustomArgumentResolver" />
</aws-messaging:argument-resolvers>
</aws-messaging:annotation-driven-queue-listener>
默认情况下,会为核心创建一个具有计算值
最大池大小。核心池大小设置为队列数量的两倍,最大池大小通过乘以
按字段值划分的队列数。如果这些默认值不满足
应用程序,可以使用 该属性设置 Custom Task Executor(请参阅下面的示例)。SimpleMessageListenerContainer
ThreadPoolTaskExecutor
maxNumberOfMessages
task-executor
<aws-messaging:annotation-driven-queue-listener task-executor="simpleTaskExecutor" />
消息回复
消息侦听器方法可以进行 Comments ,以将其返回值发送到另一个通道。它使用在元素上设置的已定义消息传递模板来发送返回值。消息传递模板必须实现
界面。@SendTo
SendToHandlerMethodReturnValueHandler
aws-messaging:annotation-driven-queue-listener
DestinationResolvingMessageSendingOperations
<aws-messaging:annotation-driven-queue-listener send-to-message-template="queueMessagingTemplate"/>
@SqsListener("treeQueue")
@SendTo("leafsQueue")
public List<Leaf> extractLeafs(Tree tree) {
// ...
}
在此示例中,该方法将接收来自 的消息,然后返回 s 的 a,该消息将发送到 。请注意,在 XML 元素上有一个属性,该属性指定要用于发送消息的返回值的消息模板
listener 方法。extractLeafs
treeQueue
List
Leaf
leafsQueue
aws-messaging:annotation-driven-queue-listener
send-to-message-template
QueueMessagingTemplate
处理异常
在带批注的方法中引发的异常可以由带 .@SqsListener
@MessageExceptionHandler
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.stereotype.Component;
@Component
public class MyMessageHandler {
@SqsListener("queueName")
void handle(String message) {
...
throw new MyException("something went wrong");
}
@MessageExceptionHandler(MyException.class)
void handleException(MyException e) {
...
}
}
5.2.4. SimpleMessageListenerContainerFactory
也可以通过创建类型为 的 bean 来使用 Java 进行配置 。SimpleMessageListenerContainer
SimpleMessageListenerContainerFactory
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSqs);
factory.setAutoStartup(false);
factory.setMaxNumberOfMessages(5);
// ...
return factory;
}
5.2.5. 通过 Amazon SQS 使用 AWS Event 消息
还可以使用 SQS 消息侦听器接收 AWS 生成的事件消息。因为
AWS 消息不包含 mime 类型的标头,必须配置 Jackson 消息转换器
替换为属性 false,也可以解析没有适当 MIME 类型的消息。strictContentTypeMatch
下一个代码显示了使用 和 重新配置QueueMessageHandlerFactory
MappingJackson2MessageConverter
@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
//set strict content type match to false
messageConverter.setStrictContentTypeMatch(false);
factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
return factory;
}
通过上述配置,可以接收 S3 存储桶(以及其他
事件通知(如 Elastic Transcoder 消息))在如下所示的带注释的方法中。@SqsListener
@SqsListener("testQueue")
public void receive(S3EventNotification s3EventNotificationRecord) {
S3EventNotification.S3Entity s3Entity = s3EventNotificationRecord.getRecords().get(0).getS3();
}
5.3. SNS 支持
Amazon SNS 是一种发布-订阅消息收发系统,允许客户端向特定主题发布通知。其他 感兴趣的客户端可以使用不同的协议(如 HTTP/HTTPS、电子邮件或 Amazon SQS 队列)进行订阅,以接收消息。
下图显示了 Amazon SNS 架构的典型示例。
Spring Cloud AWS 支持使用 和
使用基于 Spring Web MVC 的编程模型通过 HTTP/HTTPS 端点接收通知。亚马逊河
基于 SQS 的订阅可以与 Spring Cloud AWS 消息传递模块提供的注释驱动消息支持一起使用。NotificationMessagingTemplate
@Controller
5.3.1. 发送消息
包含两种发送通知的便捷方法。第一个选项指定
destination 使用将针对 SNS API 进行解析。第二个 API 没有目的地
参数并使用默认目标。上可用的所有常用 send 方法都已实现,但发送通知不太方便,因为主题必须作为 header 传递。NotificationMessagingTemplate
String
MessageSendingOperations
目前,只能使用 发送有效负载,因为这是预期的
type 的 SNS API 创建。 |
import com.amazonaws.services.sns.AmazonSNS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;
public class SnsNotificationSender {
private final NotificationMessagingTemplate notificationMessagingTemplate;
@Autowired
public SnsNotificationSender(AmazonSNS amazonSns) {
this.notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns);
}
public void send(String subject, String message) {
this.notificationMessagingTemplate.sendNotification("physicalTopicName", message, subject);
}
}
此示例通过将 client 作为参数传递来构造一个 new。在该方法中,便捷方法用于将 with 发送到 SNS 主题。这
destination 是一个字符串值,该值必须与 AWS 上定义的主题名称匹配。此值
由 Amazon SNS 客户端在运行时解析。(可选)可以将实施传递给构造函数,以便在 CloudFormation 堆栈内运行时按逻辑名称解析资源。
(有关资源名称解析的更多信息,请参阅管理云环境。NotificationMessagingTemplate
AmazonSNS
send
sendNotification
message
subject
sendNotification
ResourceIdResolver
NotificationMessagingTemplate
建议使用 XML 消息命名空间来创建,因为它会自动
配置 SNS 客户端以设置默认转换器。NotificationMessagingTemplate
<aws-messaging:notification-messaging-template id="notificationMessagingTemplate" />
5.3.2. 注解驱动的 HTTP 通知端点
SNS 支持多种终端节点类型(SQS、电子邮件、HTTP、HTTPS),Spring Cloud AWS 提供对 HTTP(S) 终端节点的支持。 SNS 向 HTTP 主题侦听器终端节点发送三种类型的请求,每种请求都提供了注释:
-
订阅请求 →
@NotificationSubscriptionMapping
-
通知请求 →
@NotificationMessageMapping
-
退订请求 →
@NotificationUnsubscribeMapping
HTTP 端点基于 Spring MVC 控制器。Spring Cloud AWS 添加了一些自定义参数解析器来提取 通知请求中的消息和主题。
@Controller
@RequestMapping("/topicName")
public class NotificationTestController {
@NotificationSubscriptionMapping
public void handleSubscriptionMessage(NotificationStatus status) throws IOException {
//We subscribe to start receive the message
status.confirmSubscription();
}
@NotificationMessageMapping
public void handleNotificationMessage(@NotificationSubject String subject, @NotificationMessage String message) {
// ...
}
@NotificationUnsubscribeConfirmationMapping
public void handleUnsubscribeMessage(NotificationStatus status) {
//e.g. the client has been unsubscribed and we want to "re-subscribe"
status.confirmSubscription();
}
}
目前,无法在方法级别定义映射 URL,因此必须
在类型级别完成,并且必须包含 Endpoint 的完整路径。 |
此示例创建了一个新的 Spring MVC 控制器,其中包含三种方法来处理上面列出的三个请求。挨次
要解析方法的参数,必须注册自定义参数解析器。这
XML 配置如下。handleNotificationMessage
<mvc:annotation-driven>
<mvc:argument-resolvers>
<ref bean="notificationResolver" />
</mvc:argument-resolvers>
</mvc:annotation-driven>
<aws-messaging:notification-argument-resolver id="notificationResolver" />
该元素注册了三个参数解析器:、 、
和。aws-messaging:notification-argument-resolver
NotificationStatusHandlerMethodArgumentResolver
NotificationMessageHandlerMethodArgumentResolver
NotificationSubjectHandlerMethodArgumentResolver
5.4. 使用 CloudFormation
Amazon SQS 队列和 SNS 主题可以在堆栈中配置,然后由应用程序使用。Spring Cloud AWS 还支持按逻辑名称查找堆栈配置的队列和主题,并解析为物理 名字。以下示例显示了 CloudFormation 模板中的 SNS 主题和 SQS 队列配置。
"LogicalQueueName": {
"Type": "AWS::SQS::Queue",
"Properties": {
}
},
"LogicalTopicName": {
"Type": "AWS::SNS::Topic",
"Properties": {
}
}
然后,可以在配置和应用程序中使用逻辑名称
如下所示:LogicalQueueName
LogicalTopicName
<aws-messaging:queue-messaging-template default-destination="LogicalQueueName" />
<aws-messaging:notification-messaging-template default-destination="LogicalTopicName" />
@SqsListener("LogicalQueueName")
public void receiveQueueMessages(Person person) {
// Logical names can also be used with messaging templates
this.notificationMessagingTemplate.sendNotification("anotherLogicalTopicName", "Message", "Subject");
}
当使用如上例所示的逻辑名称时,可以在不同的环境中创建堆栈,而无需任何 应用程序内部的配置或代码更改。