5. 消息传递

Spring Cloud AWS 提供 Amazon SQSAmazon SNS 集成 这简化了通过 SQS 或 SNS 发布和使用消息的过程。虽然 SQS 完全依赖于消息收发 API 在 Spring 4.0 中引入,SNS 仅部分实现了它,因为接收部分必须以不同的方式处理 推送通知。spring-doc.cn

5.1. 配置消息传递

在使用和配置消息传递支持之前,应用程序必须包含相应的模块依赖项 添加到 Maven 配置中。Spring Cloud AWS Messaging 支持作为单独的模块提供,以允许模块化使用 的模块。spring-doc.cn

5.1.1. Maven 依赖配置

Spring Cloud AWS 消息传递模块作为独立模块提供,可以使用以下依赖项声明导入:spring-doc.cn

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-aws-messaging</artifactId>
    <version>{spring-cloud-version}</version>
</dependency>

5.2. SQS 支持

Amazon SQS 是 Amazon Web Service 平台上的托管消息收发服务,可提供点对点通信 与队列。与 JMS 或其他消息服务相比,Amazon SQS 具有多项功能和限制,这些功能和限制应该 考虑到。spring-doc.cn

  • Amazon SQS 只允许负载,因此必须将任何负载转换为 String 表示形式。 Spring Cloud AWS 专门支持通过将 Java 对象转换为 JSON 来传输带有 Amazon SQS 消息的 Java 对象。StringObjectspring-doc.cn

  • Amazon SQS 不支持事务,因此可能会检索两次消息。申请必须填写 一种幂等方式,以便它们可以接收消息两次。spring-doc.cn

  • Amazon SQS 每条消息的最大消息大小为 256kb,因此无法发送更大的消息。spring-doc.cn

5.2.1. 发送消息

这包含许多发送消息的便捷方法。有一些 send 方法可以指定 destination 使用对象,而那些使用字符串指定 destination 的字符串将 根据 SQS API 进行解析。不带 destination 参数的 send 方法使用默认 destination。QueueMessagingTemplateQueueMessageChannelspring-doc.cn

import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.messaging.support.MessageBuilder;

public class SqsQueueSender {

    private final QueueMessagingTemplate queueMessagingTemplate;

    @Autowired
    public SqsQueueSender(AmazonSQSAsync amazonSQSAsync) {
        this.queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
    }

    public void send(String message) {
        this.queueMessagingTemplate.send("physicalQueueName", MessageBuilder.withPayload(message).build());
    }
}

此示例使用该类创建具有字符串 payload 的消息。的 构造的。send 方法中的目标是一个字符串值,该值 必须与 AWS 上定义的队列名称匹配。此值将在运行时由 Amazon SQS 客户端解析。选择 可以将实现传递给构造函数以通过以下方式解析资源 在 CloudFormation 堆栈中运行时的逻辑名称(有关以下内容的更多信息,请参阅管理云环境 资源名称解析)。MessageBuilderQueueMessagingTemplateAmazonSQSAsyncResourceIdResolverQueueMessagingTemplatespring-doc.cn

使用消息传递命名空间,可以在 XML 配置文件中定义 a。QueueMessagingTemplatespring-doc.cn

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:aws-context="http://www.springframework.org/schema/cloud/aws/context"
    xmlns:aws-messaging="http://www.springframework.org/schema/cloud/aws/messaging"
    xmlns="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/cloud/aws/context
        http://www.springframework.org/schema/cloud/aws/context/spring-cloud-aws-context.xsd
        http://www.springframework.org/schema/cloud/aws/messaging
        http://www.springframework.org/schema/cloud/aws/messaging/spring-cloud-aws-messaging">

    <aws-context:context-credentials>
        <aws-context:instance-profile-credentials />
    </aws-context:context-credentials>

    <aws-messaging:queue-messaging-template id="queueMessagingTemplate" />

</beans>

在此示例中,消息命名空间处理程序构造一个新的 .客户端 根据提供的凭证自动创建并传递给模板的构造函数。如果 在配置的 CloudFormation 堆栈中运行的应用程序 a 将传递给构造函数(有关资源名称解析的更多信息,请参阅管理云环境)。QueueMessagingTemplateAmazonSQSAsyncResourceIdResolverspring-doc.cn

使用消息转换器

为了方便发送域模型对象,它有各种 send 方法, 将 Java 对象作为消息数据内容的参数。重载的方法 和 in 将转换过程委托给接口的实例。此接口定义了一个简单的协定,用于在 Java 对象和 SQS 消息之间进行转换。默认的 implementation 只需解包消息有效负载,只要它与 target 类型匹配即可。由 使用转换器,您和您的应用程序代码可以专注于通过 SQS 的,而不必关心如何将其表示为 SQS 消息的详细信息。QueueMessagingTemplateconvertAndSend()receiveAndConvert()QueueMessagingTemplateMessageConverterSimpleMessageConverterspring-doc.cn

由于 SQS 只能发送有效负载,因此只应使用默认转换器 发送有效负载。对于更复杂的对象,应使用自定义转换器,就像 消息命名空间处理程序。StringSimpleMessageConverterStringspring-doc.cn

建议使用 XML 消息传递命名空间来创建,因为它会将 sophisticated 的,当 Jackson 位于 Classpath 上时,将对象转换为 JSON。QueueMessagingTemplateMessageConverterspring-doc.cn

<aws-messaging:queue-messaging-template id="queueMessagingTemplate" />
this.queueMessagingTemplate.convertAndSend("queueName", new Person("John, "Doe"));

在此示例中,a 是使用消息命名空间创建的。方法 使用配置的 (Configured) 转换有效负载并发送消息。QueueMessagingTemplateconvertAndSendPersonMessageConverterspring-doc.cn

5.2.2. 接收消息

有两种方法可以接收 SQS 消息,使用 或 注释驱动的侦听器端点。后者是迄今为止更方便的消息接收方式。receiveQueueMessagingTemplatespring-doc.cn

Person person = this.queueMessagingTemplate.receiveAndConvert("queueName", Person.class);

在此示例中,将从 SQS 队列获取一条消息,并将其转换为目标类 作为参数传递。QueueMessagingTemplatespring-doc.cn

5.2.3. 注解驱动的侦听器端点

注释驱动的侦听器终端节点是侦听 SQS 消息的最简单方法。只需使用 和 注释方法,即可将消息路由到带注释的方法。MessageMappingQueueMessageHandlerspring-doc.cn

<aws-messaging:annotation-driven-queue-listener />
@SqsListener("queueName")
public void queueListener(Person person) {
    // ...
}

在此示例中,将启动一个队列侦听器容器,该容器轮询传递给注释的 SQS。传入消息将转换为目标类型,然后调用带注释的方法。queueNameMessageMappingqueueListenerspring-doc.cn

除了有效负载之外,还可以使用 or 注释将标头注入侦听器方法中。 用于注入特定的 Headers 值,而注入 Containing All Headers。@Header@Headers@Header@HeadersMap<String, String>spring-doc.cn

只有标准 支持随 SQS 消息一起发送的消息属性。目前不支持自定义属性。spring-doc.cn

除了提供的参数解析器之外,还可以使用 attribute 在元素上注册自定义解析器(请参阅下面的示例)。aws-messaging:annotation-driven-queue-listeneraws-messaging:argument-resolversspring-doc.cn

<aws-messaging:annotation-driven-queue-listener>
    <aws-messaging:argument-resolvers>
        <bean class="org.custom.CustomArgumentResolver" />
    </aws-messaging:argument-resolvers>
</aws-messaging:annotation-driven-queue-listener>

默认情况下,会为核心创建一个具有计算值 最大池大小。核心池大小设置为队列数量的两倍,最大池大小通过乘以 按字段值划分的队列数。如果这些默认值不满足 应用程序,可以使用 该属性设置 Custom Task Executor(请参阅下面的示例)。SimpleMessageListenerContainerThreadPoolTaskExecutormaxNumberOfMessagestask-executorspring-doc.cn

<aws-messaging:annotation-driven-queue-listener task-executor="simpleTaskExecutor" />
消息回复

消息侦听器方法可以进行 Comments ,以将其返回值发送到另一个通道。它使用在元素上设置的已定义消息传递模板来发送返回值。消息传递模板必须实现 界面。@SendToSendToHandlerMethodReturnValueHandleraws-messaging:annotation-driven-queue-listenerDestinationResolvingMessageSendingOperationsspring-doc.cn

<aws-messaging:annotation-driven-queue-listener send-to-message-template="queueMessagingTemplate"/>
@SqsListener("treeQueue")
@SendTo("leafsQueue")
public List<Leaf> extractLeafs(Tree tree) {
    // ...
}

在此示例中,该方法将接收来自 的消息,然后返回 s 的 a,该消息将发送到 。请注意,在 XML 元素上有一个属性,该属性指定要用于发送消息的返回值的消息模板 listener 方法。extractLeafstreeQueueListLeafleafsQueueaws-messaging:annotation-driven-queue-listenersend-to-message-templateQueueMessagingTemplatespring-doc.cn

处理异常

在带批注的方法中引发的异常可以由带 .@SqsListener@MessageExceptionHandlerspring-doc.cn

import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.stereotype.Component;

@Component
public class MyMessageHandler {

    @SqsListener("queueName")
    void handle(String message) {
        ...
        throw new MyException("something went wrong");
    }

    @MessageExceptionHandler(MyException.class)
    void handleException(MyException e) {
        ...
    }
}

5.2.4. SimpleMessageListenerContainerFactory

也可以通过创建类型为 的 bean 来使用 Java 进行配置 。SimpleMessageListenerContainerSimpleMessageListenerContainerFactoryspring-doc.cn

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSqs);
    factory.setAutoStartup(false);
    factory.setMaxNumberOfMessages(5);
    // ...

    return factory;
}

5.2.5. 通过 Amazon SQS 使用 AWS Event 消息

还可以使用 SQS 消息侦听器接收 AWS 生成的事件消息。因为 AWS 消息不包含 mime 类型的标头,必须配置 Jackson 消息转换器 替换为属性 false,也可以解析没有适当 MIME 类型的消息。strictContentTypeMatchspring-doc.cn

下一个代码显示了使用 和 重新配置QueueMessageHandlerFactoryMappingJackson2MessageConverterspring-doc.cn

@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);
    factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
    return factory;
}

通过上述配置,可以接收 S3 存储桶(以及其他 事件通知(如 Elastic Transcoder 消息))在如下所示的带注释的方法中。@SqsListenerspring-doc.cn

@SqsListener("testQueue")
public void receive(S3EventNotification s3EventNotificationRecord) {
    S3EventNotification.S3Entity s3Entity = s3EventNotificationRecord.getRecords().get(0).getS3();
}

5.3. SNS 支持

Amazon SNS 是一种发布-订阅消息收发系统,允许客户端向特定主题发布通知。其他 感兴趣的客户端可以使用不同的协议(如 HTTP/HTTPS、电子邮件或 Amazon SQS 队列)进行订阅,以接收消息。spring-doc.cn

下图显示了 Amazon SNS 架构的典型示例。spring-doc.cn

SNS 概述

Spring Cloud AWS 支持使用 和 使用基于 Spring Web MVC 的编程模型通过 HTTP/HTTPS 端点接收通知。亚马逊河 基于 SQS 的订阅可以与 Spring Cloud AWS 消息传递模块提供的注释驱动消息支持一起使用。NotificationMessagingTemplate@Controllerspring-doc.cn

5.3.1. 发送消息

包含两种发送通知的便捷方法。第一个选项指定 destination 使用将针对 SNS API 进行解析。第二个 API 没有目的地 参数并使用默认目标。上可用的所有常用 send 方法都已实现,但发送通知不太方便,因为主题必须作为 header 传递。NotificationMessagingTemplateStringMessageSendingOperationsspring-doc.cn

目前,只能使用 发送有效负载,因为这是预期的 type 的 SNS API 创建。StringNotificationMessagingTemplatespring-doc.cn

import com.amazonaws.services.sns.AmazonSNS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;

public class SnsNotificationSender {

    private final NotificationMessagingTemplate notificationMessagingTemplate;

    @Autowired
    public SnsNotificationSender(AmazonSNS amazonSns) {
        this.notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns);
    }

    public void send(String subject, String message) {
        this.notificationMessagingTemplate.sendNotification("physicalTopicName", message, subject);
    }
}

此示例通过将 client 作为参数传递来构造一个 new。在该方法中,便捷方法用于将 with 发送到 SNS 主题。这 destination 是一个字符串值,该值必须与 AWS 上定义的主题名称匹配。此值 由 Amazon SNS 客户端在运行时解析。(可选)可以将实施传递给构造函数,以便在 CloudFormation 堆栈内运行时按逻辑名称解析资源。 (有关资源名称解析的更多信息,请参阅管理云环境NotificationMessagingTemplateAmazonSNSsendsendNotificationmessagesubjectsendNotificationResourceIdResolverNotificationMessagingTemplatespring-doc.cn

建议使用 XML 消息命名空间来创建,因为它会自动 配置 SNS 客户端以设置默认转换器。NotificationMessagingTemplatespring-doc.cn

<aws-messaging:notification-messaging-template id="notificationMessagingTemplate" />

5.3.2. 注解驱动的 HTTP 通知端点

SNS 支持多种终端节点类型(SQS、电子邮件、HTTP、HTTPS),Spring Cloud AWS 提供对 HTTP(S) 终端节点的支持。 SNS 向 HTTP 主题侦听器终端节点发送三种类型的请求,每种请求都提供了注释:spring-doc.cn

HTTP 端点基于 Spring MVC 控制器。Spring Cloud AWS 添加了一些自定义参数解析器来提取 通知请求中的消息和主题。spring-doc.cn

@Controller
@RequestMapping("/topicName")
public class NotificationTestController {

    @NotificationSubscriptionMapping
    public void handleSubscriptionMessage(NotificationStatus status) throws IOException {
        //We subscribe to start receive the message
        status.confirmSubscription();
    }

    @NotificationMessageMapping
    public void handleNotificationMessage(@NotificationSubject String subject, @NotificationMessage String message) {
        // ...
    }

    @NotificationUnsubscribeConfirmationMapping
    public void handleUnsubscribeMessage(NotificationStatus status) {
        //e.g. the client has been unsubscribed and we want to "re-subscribe"
        status.confirmSubscription();
    }
}

目前,无法在方法级别定义映射 URL,因此必须 在类型级别完成,并且必须包含 Endpoint 的完整路径。RequestMappingspring-doc.cn

此示例创建了一个新的 Spring MVC 控制器,其中包含三种方法来处理上面列出的三个请求。挨次 要解析方法的参数,必须注册自定义参数解析器。这 XML 配置如下。handleNotificationMessagespring-doc.cn

<mvc:annotation-driven>
    <mvc:argument-resolvers>
        <ref bean="notificationResolver" />
    </mvc:argument-resolvers>
</mvc:annotation-driven>

<aws-messaging:notification-argument-resolver id="notificationResolver" />

该元素注册了三个参数解析器:、 、 和。aws-messaging:notification-argument-resolverNotificationStatusHandlerMethodArgumentResolverNotificationMessageHandlerMethodArgumentResolverNotificationSubjectHandlerMethodArgumentResolverspring-doc.cn

5.4. 使用 CloudFormation

Amazon SQS 队列和 SNS 主题可以在堆栈中配置,然后由应用程序使用。Spring Cloud AWS 还支持按逻辑名称查找堆栈配置的队列和主题,并解析为物理 名字。以下示例显示了 CloudFormation 模板中的 SNS 主题和 SQS 队列配置。spring-doc.cn

"LogicalQueueName": {
    "Type": "AWS::SQS::Queue",
    "Properties": {
    }
},
"LogicalTopicName": {
    "Type": "AWS::SNS::Topic",
    "Properties": {
    }
}

然后,可以在配置和应用程序中使用逻辑名称 如下所示:LogicalQueueNameLogicalTopicNamespring-doc.cn

<aws-messaging:queue-messaging-template default-destination="LogicalQueueName" />

<aws-messaging:notification-messaging-template default-destination="LogicalTopicName" />
@SqsListener("LogicalQueueName")
public void receiveQueueMessages(Person person) {
    // Logical names can also be used with messaging templates
    this.notificationMessagingTemplate.sendNotification("anotherLogicalTopicName", "Message", "Subject");
}

当使用如上例所示的逻辑名称时,可以在不同的环境中创建堆栈,而无需任何 应用程序内部的配置或代码更改。spring-doc.cn