ZeroMQ 支持

ZeroMQ 支持

Spring 集成提供了支持应用程序中 ZeroMQ 通信的组件。 该实现基于 JeroMQ 库的 Java API。 所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,使与这些组件的交互无锁且线程安全。spring-doc.cadn.net.cn

您需要将此依赖项包含在您的项目中:spring-doc.cadn.net.cn

Maven 系列
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zeromq</artifactId>
    <version>6.0.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-zeromq:6.0.9"

ZeroMQ 代理

ZeroMqProxy是内置ZMQ.proxy() 功能。 它封装了套接字生命周期和线程管理。 此代理的客户端仍然可以使用标准的 ZeroMQ 套接字连接和交互 API。 除了标准ZContext它需要一种众所周知的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。 这样,一对适当的 ZeroMQ 套接字类型被用于代理的前端和后端。 看ZeroMqProxy.Type了解详情。spring-doc.cadn.net.cn

ZeroMqProxy实现SmartLifecycle创建、绑定和配置 sockets 并启动ZMQ.proxy()Executor(如果有)。 前端和后端套接字的绑定是通过tcp://协议连接到所有可用的网络接口上。 否则,它们将绑定到随机端口,这些端口稍后可以通过相应的getFrontendPort()getBackendPort()API 方法。spring-doc.cadn.net.cn

控制套接字公开为SocketType.PAIR"inproc://" + beanName + ".control"地址;它可以通过以下方式获得getControlAddress(). 它应该与来自另一个应用程序的相同应用程序一起使用SocketType.PAIR套接字发送ZMQ.PROXY_TERMINATE,ZMQ.PROXY_PAUSE和/或ZMQ.PROXY_RESUME命令。 这ZeroMqProxy执行ZMQ.PROXY_TERMINATE命令stop()调用其生命周期以终止ZMQ.proxy()loop 并正常关闭所有绑定的套接字。spring-doc.cadn.net.cn

setExposeCaptureSocket(boolean)选项使此组件将额外的线程间套接字与SocketType.PUB捕获并发布前端和后端套接字之间的所有通信,如ZMQ.proxy()实现。 此套接字绑定到"inproc://" + beanName + ".capture"地址,并且不需要任何特定的订阅进行筛选。spring-doc.cadn.net.cn

前端和后端套接字可以使用其他属性进行自定义,例如读/写超时或安全性。 此自定义可通过setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)setBackendSocketConfigurer(Consumer<ZMQ.Socket>)callback 中。spring-doc.cadn.net.cn

ZeroMqProxy可以作为简单的 bean 提供,如下所示:spring-doc.cadn.net.cn

@Bean
ZeroMqProxy zeroMqProxy() {
    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setExposeCaptureSocket(true);
    proxy.setFrontendPort(6001);
    proxy.setBackendPort(6002);
    return proxy;
}

所有客户端节点都应通过以下方式连接到此代理的主机tcp://并使用他们感兴趣的相应端口。spring-doc.cadn.net.cn

ZeroMQ 消息频道

ZeroMqChannel是一个SubscribableChannel它使用一对 ZeroMQ 套接字来连接发布者和订阅者以进行消息传递交互。 它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它也可以用作本地线程间通道(使用PAIRsockets) - 该connectUrl在这种情况下不提供。 在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与连接到同一代理的其他类似通道交换消息。 connection url 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及用于 ZeroMQ 代理的前端和后端套接字的一对冒号端口。 为方便起见,该通道可以使用ZeroMqProxyinstance 而不是 connection string,如果它与代理配置在同一应用程序中。spring-doc.cadn.net.cn

发送和接收套接字都在各自的专用线程中进行管理,使该通道对并发友好。 这样,我们就可以从ZeroMqChannel来自不同线程的 Sspring-doc.cadn.net.cn

默认情况下,ZeroMqChannel使用EmbeddedJsonHeadersMessageMapper以(反)序列化Message(包括标头)从/到byte[]使用 Jackson JSON 处理器。 此逻辑可通过setMessageMapper(BytesMessageMapper).spring-doc.cadn.net.cn

发送和接收套接字可以通过各自的setSendSocketConfigurer(Consumer<ZMQ.Socket>)setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)回调。spring-doc.cadn.net.cn

ZeroMqChannel基于 Project Reactor 的反应流FluxMono运营商。 这提供了更轻松的线程控制,并允许无锁的并发发布和对通道的使用。 本地 PUB/SUB 逻辑作为Flux.publish()运算符,以允许此通道的所有本地订阅者接收相同的已发布消息,就像PUB插座。spring-doc.cadn.net.cn

下面是一个ZeroMqChannel配置:spring-doc.cadn.net.cn

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
    ZeroMqChannel channel = new ZeroMqChannel(context, true);
    channel.setConnectUrl("tcp://localhost:6001:6002");
    channel.setConsumeDelay(Duration.ofMillis(100));
    return channel;
}

ZeroMQ 入站通道适配器

ZeroMqMessageProducer是一个MessageProducerSupport使用反应式语义实现。 它以非阻塞的方式不断从 ZeroMQ 套接字读取数据,并将消息发布到无限的Flux它由FluxMessageChannel或显式地在start()method,如果 output channel 不是 reactive的。 当套接字上未收到任何数据时,一个consumeDelay(默认为 1 秒)在下一次读取尝试之前应用。spring-doc.cadn.net.cn

SocketType.PAIR,SocketType.PULLSocketType.SUBZeroMqMessageProducer. 此组件可以连接到远程套接字或使用提供的或随机端口绑定到 TCP 协议。 实际端口可以通过getBoundPort()启动该组件并绑定 ZeroMQ 套接字后。 套接字选项(例如安全或写入超时)可以通过setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)回调。spring-doc.cadn.net.cn

如果receiveRawoption 设置为true一个ZMsg,从套接字消耗,按原样在生成的Message:由下游流来解析和转换ZMsg. 否则,将InboundMessageMapper用于将消耗的数据转换为Message. 如果收到的ZMsg是多帧的,则第一帧被视为ZeroMqHeaders.TOPIC标头。spring-doc.cadn.net.cn

SocketType.SUBZeroMqMessageProducer使用提供的topics订阅选项;默认为 subscribe to all。 订阅可以在运行时使用subscribeToTopics()unsubscribeFromTopics() @ManagedOperations.spring-doc.cadn.net.cn

以下是ZeroMqMessageProducer配置:spring-doc.cadn.net.cn

@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
    ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
    messageProducer.setOutputChannel(outputChannel);
    messageProducer.setTopics("some");
    messageProducer.setReceiveRaw(true);
    messageProducer.setBindPort(7070);
    messageProducer.setConsumeDelay(Duration.ofMillis(100));
    return messageProducer;
}

ZeroMQ 出站通道适配器

ZeroMqMessageHandler是一个ReactiveMessageHandler实现将发布消息生成到 ZeroMQ 套接字中。 只SocketType.PAIR,SocketType.PUSHSocketType.PUB受支持。 这ZeroMqMessageHandler仅支持连接 ZeroMQ Socket;不支持 binding。 当SocketType.PUB时,使用topicExpression根据请求消息进行评估,以将主题帧注入 ZeroMQ 消息(如果它不为 null)。 订阅者端 (SocketType.SUB) 必须先收到 Topic 帧,然后才能解析实际数据。 当请求消息的有效负载为ZMsg,则不会执行转换或主题提取:ZMsg按原样发送到套接字中,并且不会销毁以供进一步重用。 否则,将OutboundMessageMapper<byte[]>用于将请求消息(或仅其有效负载)转换为 ZeroMQ 帧进行发布。 默认情况下,ConvertingBytesMessageMapper使用时随ConfigurableCompositeMessageConverter. 套接字选项(例如安全或写入超时)可以通过setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)回调。spring-doc.cadn.net.cn

以下是ZeroMqMessageHandler配置:spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}

ZeroMQ Java DSL 支持

spring-integration-zeromq通过以下方式提供方便的 Java DSL 连贯 APIZeroMqfactory 和IntegrationComponentSpec上述组件的实现。spring-doc.cadn.net.cn

这是一个 Java DSL 示例ZeroMqChannel:spring-doc.cadn.net.cn

.channel(ZeroMq.zeroMqChannel(this.context)
            .connectUrl("tcp://localhost:6001:6002")
            .consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ Java DSL 的入站通道适配器是:spring-doc.cadn.net.cn

IntegrationFlow.from(
            ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
                        .connectUrl("tcp://localhost:9000")
                        .topics("someTopic")
                        .receiveRaw(true)
                        .consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ Java DSL 的出站通道适配器是:spring-doc.cadn.net.cn

.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
                  .topicFunction(message -> message.getHeaders().get("myTopic")))
}