5. 消息传递

Spring Cloud AWS 提供 Amazon SQSAmazon SNS 集成 这简化了通过 SQS 或 SNS 发布和使用消息的过程。虽然 SQS 完全依赖于消息收发 API 在 Spring 4.0 中引入,SNS 仅部分实现了它,因为接收部分必须以不同的方式处理 推送通知。spring-doc.cadn.net.cn

5.1. 配置消息传递

在使用和配置消息传递支持之前,应用程序必须包含相应的模块依赖项 添加到 Maven 配置中。Spring Cloud AWS Messaging 支持作为单独的模块提供,以允许模块化使用 的模块。spring-doc.cadn.net.cn

5.1.1. Maven 依赖配置

Spring Cloud AWS 消息传递模块作为独立模块提供,可以使用以下依赖项声明导入:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-aws-messaging</artifactId>
    <version>{spring-cloud-version}</version>
</dependency>

5.2. SQS 支持

Amazon SQS 是 Amazon Web Service 平台上的托管消息收发服务,可提供点对点通信 与队列。与 JMS 或其他消息服务相比,Amazon SQS 具有多项功能和限制,这些功能和限制应该 考虑到。spring-doc.cadn.net.cn

  • Amazon SQS 仅允许Stringpayload 中,因此任何Object必须转换为 String 表示形式。 Spring Cloud AWS 专门支持通过将 Java 对象转换为 JSON 来传输带有 Amazon SQS 消息的 Java 对象。spring-doc.cadn.net.cn

  • Amazon SQS 不支持事务,因此可能会检索两次消息。申请必须填写 一种幂等方式,以便它们可以接收消息两次。spring-doc.cadn.net.cn

  • Amazon SQS 每条消息的最大消息大小为 256kb,因此无法发送更大的消息。spring-doc.cadn.net.cn

5.2.1. 发送消息

QueueMessagingTemplate包含许多发送消息的便捷方法。有一些 send 方法可以指定 destination 使用QueueMessageChannelobject 和那些使用字符串指定目的地的 根据 SQS API 进行解析。不带 destination 参数的 send 方法使用默认 destination。spring-doc.cadn.net.cn

import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.messaging.support.MessageBuilder;

public class SqsQueueSender {

    private final QueueMessagingTemplate queueMessagingTemplate;

    @Autowired
    public SqsQueueSender(AmazonSQSAsync amazonSQSAsync) {
        this.queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
    }

    public void send(String message) {
        this.queueMessagingTemplate.send("physicalQueueName", MessageBuilder.withPayload(message).build());
    }
}

此示例使用MessageBuilder类来创建具有字符串 payload 的消息。这QueueMessagingTemplate是 通过传递对AmazonSQSAsync客户。send 方法中的目标是一个字符串值,该值 必须与 AWS 上定义的队列名称匹配。此值将在运行时由 Amazon SQS 客户端解析。选择 一个ResourceIdResolverimplementation 可以传递给QueueMessagingTemplate构造函数解析资源 在 CloudFormation 堆栈中运行时的逻辑名称(有关以下内容的更多信息,请参阅管理云环境 资源名称解析)。spring-doc.cadn.net.cn

使用消息传递命名空间QueueMessagingTemplate可以在 XML 配置文件中定义。spring-doc.cadn.net.cn

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:aws-context="http://www.springframework.org/schema/cloud/aws/context"
    xmlns:aws-messaging="http://www.springframework.org/schema/cloud/aws/messaging"
    xmlns="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/cloud/aws/context
        http://www.springframework.org/schema/cloud/aws/context/spring-cloud-aws-context.xsd
        http://www.springframework.org/schema/cloud/aws/messaging
        http://www.springframework.org/schema/cloud/aws/messaging/spring-cloud-aws-messaging">

    <aws-context:context-credentials>
        <aws-context:instance-profile-credentials />
    </aws-context:context-credentials>

    <aws-messaging:queue-messaging-template id="queueMessagingTemplate" />

</beans>

在此示例中,消息传递命名空间处理程序构造一个新的QueueMessagingTemplate.这AmazonSQSAsync客户 根据提供的凭证自动创建并传递给模板的构造函数。如果 应用程序在配置的 CloudFormation 堆栈中运行ResourceIdResolver将传递给构造函数(有关资源名称解析的更多信息,请参阅管理云环境)。spring-doc.cadn.net.cn

使用消息转换器

为了方便发送域模型对象,QueueMessagingTemplate具有各种 send 方法,这些方法 将 Java 对象作为消息数据内容的参数。重载方法convertAndSend()receiveAndConvert()QueueMessagingTemplate将转换过程委托给MessageConverter接口。此接口定义了一个简单的协定,用于在 Java 对象和 SQS 消息之间进行转换。默认的 实现SimpleMessageConverter只需解包消息有效负载,只要它与 Target 类型匹配即可。由 使用转换器,您和您的应用程序代码可以专注于通过 SQS 的,而不必关心如何将其表示为 SQS 消息的详细信息。spring-doc.cadn.net.cn

由于 SQS 只能发送Stringpayloads 默认转换器SimpleMessageConverter只应使用 发送String负载。对于更复杂的对象,应使用自定义转换器,就像 消息命名空间处理程序。spring-doc.cadn.net.cn

建议使用 XML 消息命名空间创建QueueMessagingTemplate因为它会设置一个 more 复杂MessageConverter当 Jackson 位于 Classpath 上时,它将对象转换为 JSON。spring-doc.cadn.net.cn

<aws-messaging:queue-messaging-template id="queueMessagingTemplate" />
this.queueMessagingTemplate.convertAndSend("queueName", new Person("John, "Doe"));

在此示例中,一个QueueMessagingTemplate是使用消息传送命名空间创建的。这convertAndSend方法 转换有效负载Person使用配置的MessageConverter并发送消息。spring-doc.cadn.net.cn

5.2.2. 接收消息

有两种方法可以接收 SQS 消息,使用receive的方法QueueMessagingTemplate或 注释驱动的侦听器端点。后者是迄今为止更方便的消息接收方式。spring-doc.cadn.net.cn

Person person = this.queueMessagingTemplate.receiveAndConvert("queueName", Person.class);

在此示例中,QueueMessagingTemplate将从 SQS 队列获取一条消息并将其转换为目标类 作为参数传递。spring-doc.cadn.net.cn

5.2.3. 注解驱动的侦听器端点

注释驱动的侦听器终端节点是侦听 SQS 消息的最简单方法。只需使用MessageMappingQueueMessageHandler将消息路由到带 Comments 的方法。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener />
@SqsListener("queueName")
public void queueListener(Person person) {
    // ...
}

在此示例中,将启动一个轮询 SQS 的队列侦听器容器queueName传递给MessageMapping注解。传入消息将转换为目标类型,然后转换为带注释的方法queueListener被调用。spring-doc.cadn.net.cn

除了有效负载之外,还可以在侦听器方法中注入标头,其中包含@Header@Headers附注。@Header用于注入特定的标头值,而@Headers注入Map<String, String>包含所有标头。spring-doc.cadn.net.cn

只有标准 支持随 SQS 消息一起发送的消息属性。目前不支持自定义属性。spring-doc.cadn.net.cn

除了提供的参数解析器外,自定义的可以在aws-messaging:annotation-driven-queue-listener元素使用aws-messaging:argument-resolvers属性(请参阅下面的示例)。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener>
    <aws-messaging:argument-resolvers>
        <bean class="org.custom.CustomArgumentResolver" />
    </aws-messaging:argument-resolvers>
</aws-messaging:annotation-driven-queue-listener>

默认情况下,SimpleMessageListenerContainer创建一个ThreadPoolTaskExecutor,其中核心和 最大池大小。核心池大小设置为队列数量的两倍,最大池大小通过乘以 队列数按maxNumberOfMessages田。如果这些默认值不满足 应用程序中,可以使用task-executor属性(请参阅下面的示例)。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener task-executor="simpleTaskExecutor" />
消息回复

消息侦听器方法可以用@SendTo将其返回值发送到另一个通道。这SendToHandlerMethodReturnValueHandler使用在aws-messaging:annotation-driven-queue-listener元素发送返回值。消息传递模板必须实现 这DestinationResolvingMessageSendingOperations接口。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener send-to-message-template="queueMessagingTemplate"/>
@SqsListener("treeQueue")
@SendTo("leafsQueue")
public List<Leaf> extractLeafs(Tree tree) {
    // ...
}

在此示例中,extractLeafs方法将接收来自treeQueue然后返回一个ListLeafs 的 S 文件,该 URL 将被发送到leafsQueue.请注意,在aws-messaging:annotation-driven-queue-listenerXML 元素中有一个属性send-to-message-template指定QueueMessagingTemplate作为消息模板,用于发送消息的返回值 listener 方法。spring-doc.cadn.net.cn

处理异常

内部引发的异常@SqsListener带 Comments 的方法可以由 Ancrypt 中的@MessageExceptionHandler.spring-doc.cadn.net.cn

import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.stereotype.Component;

@Component
public class MyMessageHandler {

    @SqsListener("queueName")
    void handle(String message) {
        ...
        throw new MyException("something went wrong");
    }

    @MessageExceptionHandler(MyException.class)
    void handleException(MyException e) {
        ...
    }
}

5.2.4. SimpleMessageListenerContainerFactory

SimpleMessageListenerContainer也可以通过创建SimpleMessageListenerContainerFactory.spring-doc.cadn.net.cn

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSqs);
    factory.setAutoStartup(false);
    factory.setMaxNumberOfMessages(5);
    // ...

    return factory;
}

5.2.5. 通过 Amazon SQS 使用 AWS Event 消息

还可以使用 SQS 消息侦听器接收 AWS 生成的事件消息。因为 AWS 消息不包含 mime 类型的标头,必须配置 Jackson 消息转换器 使用strictContentTypeMatch属性 false 来解析没有适当 MIME 类型的消息。spring-doc.cadn.net.cn

下一个代码显示了使用QueueMessageHandlerFactory并重新配置MappingJackson2MessageConverterspring-doc.cadn.net.cn

@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);
    factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
    return factory;
}

通过上述配置,可以接收 S3 存储桶(以及其他 事件通知(如 Elastic Transcoder 消息)在@SqsListener带注释的方法 s。spring-doc.cadn.net.cn

@SqsListener("testQueue")
public void receive(S3EventNotification s3EventNotificationRecord) {
    S3EventNotification.S3Entity s3Entity = s3EventNotificationRecord.getRecords().get(0).getS3();
}

5.3. SNS 支持

Amazon SNS 是一种发布-订阅消息收发系统,允许客户端向特定主题发布通知。其他 感兴趣的客户端可以使用不同的协议(如 HTTP/HTTPS、电子邮件或 Amazon SQS 队列)进行订阅,以接收消息。spring-doc.cadn.net.cn

下图显示了 Amazon SNS 架构的典型示例。spring-doc.cadn.net.cn

SNS 概述

Spring Cloud AWS 支持使用NotificationMessagingTemplate和 使用 Spring Web MVC 通过 HTTP/HTTPS 端点接收通知@Controller基于编程模型。亚马逊河 基于 SQS 的订阅可以与 Spring Cloud AWS 消息传递模块提供的注释驱动消息支持一起使用。spring-doc.cadn.net.cn

5.3.1. 发送消息

NotificationMessagingTemplate包含两种发送通知的便捷方法。第一个选项指定 destination 使用String这将针对 SNS API 进行解析。第二个 API 没有目的地 参数并使用默认目标。所有常用的 send 方法都可以在MessageSendingOperations,但发送通知不太方便,因为主题必须作为 header 传递。spring-doc.cadn.net.cn

目前仅String有效负载可以使用NotificationMessagingTemplate因为这是预期的 type 的 SNS API 创建。spring-doc.cadn.net.cn

import com.amazonaws.services.sns.AmazonSNS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;

public class SnsNotificationSender {

    private final NotificationMessagingTemplate notificationMessagingTemplate;

    @Autowired
    public SnsNotificationSender(AmazonSNS amazonSns) {
        this.notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns);
    }

    public void send(String subject, String message) {
        this.notificationMessagingTemplate.sendNotification("physicalTopicName", message, subject);
    }
}

此示例构造一个新的NotificationMessagingTemplate通过传递AmazonSNSclient 作为参数。在send方法的便利性sendNotificationmethod 用于发送messagesubject添加到 SNS 主题中。这 destination 中的sendNotificationmethod 是一个字符串值,必须与 AWS 上定义的主题名称匹配。此值 由 Amazon SNS 客户端在运行时解析。(可选)ResourceIdResolverimplementation 可以传递给NotificationMessagingTemplateconstructor 在 CloudFormation 堆栈内运行时按逻辑名称解析资源。 (有关资源名称解析的更多信息,请参阅管理云环境spring-doc.cadn.net.cn

建议使用 XML 消息命名空间创建NotificationMessagingTemplate因为它会自动 配置 SNS 客户端以设置默认转换器。spring-doc.cadn.net.cn

<aws-messaging:notification-messaging-template id="notificationMessagingTemplate" />

5.3.2. 注解驱动的 HTTP 通知端点

SNS 支持多种终端节点类型(SQS、电子邮件、HTTP、HTTPS),Spring Cloud AWS 提供对 HTTP(S) 终端节点的支持。 SNS 向 HTTP 主题侦听器终端节点发送三种类型的请求,每种请求都提供了注释:spring-doc.cadn.net.cn

HTTP 端点基于 Spring MVC 控制器。Spring Cloud AWS 添加了一些自定义参数解析器来提取 通知请求中的消息和主题。spring-doc.cadn.net.cn

@Controller
@RequestMapping("/topicName")
public class NotificationTestController {

    @NotificationSubscriptionMapping
    public void handleSubscriptionMessage(NotificationStatus status) throws IOException {
        //We subscribe to start receive the message
        status.confirmSubscription();
    }

    @NotificationMessageMapping
    public void handleNotificationMessage(@NotificationSubject String subject, @NotificationMessage String message) {
        // ...
    }

    @NotificationUnsubscribeConfirmationMapping
    public void handleUnsubscribeMessage(NotificationStatus status) {
        //e.g. the client has been unsubscribed and we want to "re-subscribe"
        status.confirmSubscription();
    }
}

目前,无法在方法级别定义映射 URL,因此RequestMapping必须 在类型级别完成,并且必须包含 Endpoint 的完整路径。spring-doc.cadn.net.cn

此示例创建了一个新的 Spring MVC 控制器,其中包含三种方法来处理上面列出的三个请求。挨次 要解析handleNotificationMessage方法必须注册自定义参数解析器。这 XML 配置如下。spring-doc.cadn.net.cn

<mvc:annotation-driven>
    <mvc:argument-resolvers>
        <ref bean="notificationResolver" />
    </mvc:argument-resolvers>
</mvc:annotation-driven>

<aws-messaging:notification-argument-resolver id="notificationResolver" />

aws-messaging:notification-argument-resolverelement 注册三个参数解析器:NotificationStatusHandlerMethodArgumentResolver,NotificationMessageHandlerMethodArgumentResolver, 和NotificationSubjectHandlerMethodArgumentResolver.spring-doc.cadn.net.cn

5.4. 使用 CloudFormation

Amazon SQS 队列和 SNS 主题可以在堆栈中配置,然后由应用程序使用。Spring Cloud AWS 还支持按逻辑名称查找堆栈配置的队列和主题,并解析为物理 名字。以下示例显示了 CloudFormation 模板中的 SNS 主题和 SQS 队列配置。spring-doc.cadn.net.cn

"LogicalQueueName": {
    "Type": "AWS::SQS::Queue",
    "Properties": {
    }
},
"LogicalTopicName": {
    "Type": "AWS::SNS::Topic",
    "Properties": {
    }
}

逻辑名称LogicalQueueNameLogicalTopicName然后,可以在配置和应用程序中使用 如下所示:spring-doc.cadn.net.cn

<aws-messaging:queue-messaging-template default-destination="LogicalQueueName" />

<aws-messaging:notification-messaging-template default-destination="LogicalTopicName" />
@SqsListener("LogicalQueueName")
public void receiveQueueMessages(Person person) {
    // Logical names can also be used with messaging templates
    this.notificationMessagingTemplate.sendNotification("anotherLogicalTopicName", "Message", "Subject");
}

当使用如上例所示的逻辑名称时,可以在不同的环境中创建堆栈,而无需任何 应用程序内部的配置或代码更改。spring-doc.cadn.net.cn