Spring Integration 提供了支持应用程序中 ZeroMQ 通信的组件。 该实现基于 JeroMQ 库中受支持的 Java API。 所有组件都封装了 ZeroMQ 套接字生命周期,并在内部管理它们的线程,使与这些组件的交互无锁且线程安全。
您需要将此依赖项包含在项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>6.3.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:6.3.1"
ZeroMQ代理
是内置函数的 Spring 友好包装器。
它封装了套接字生命周期和线程管理。
此代理的客户端仍然可以使用标准的 ZeroMQ 套接字连接和交互 API。
除了标准之外,它还需要一种众所周知的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。
这样,一对合适的 ZeroMQ 套接字类型将用于代理的前端和后端。
有关详细信息,请参阅。ZeroMqProxy
ZMQ.proxy()
ZContext
ZeroMqProxy.Type
用于创建、绑定和配置套接字以及从(如果有)在专用线程中启动的实现。
前端和后端套接字的绑定是通过协议在具有所提供端口的所有可用网络接口上完成的。
否则,它们被绑定到随机端口,稍后可以通过相应的 API 方法获取。ZeroMqProxy
SmartLifecycle
ZMQ.proxy()
Executor
tcp://
getFrontendPort()
getBackendPort()
控制套接字公开为地址上具有线程间传输的套接字;它可以通过 获得。
它应该与来自另一个套接字的同一应用程序一起使用,以发送和/或命令。
在其生命周期中被调用时执行命令以终止循环并优雅地关闭所有绑定的套接字。SocketType.PAIR
"inproc://" + beanName + ".control"
getControlAddress()
SocketType.PAIR
ZMQ.PROXY_TERMINATE
ZMQ.PROXY_PAUSE
ZMQ.PROXY_RESUME
ZeroMqProxy
ZMQ.PROXY_TERMINATE
stop()
ZMQ.proxy()
该选项使此组件绑定一个额外的线程间套接字,以捕获和发布前端和后端套接字之间的所有通信,如实现所述。
此套接字绑定到地址,不需要任何特定的订阅进行筛选。setExposeCaptureSocket(boolean)
SocketType.PUB
ZMQ.proxy()
"inproc://" + beanName + ".capture"
可以使用其他属性(例如读/写超时或安全性)自定义前端和后端套接字。
此自定义分别通过回调和回调提供。setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
可以像这样以简单的 bean 形式提供:ZeroMqProxy
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有客户端节点都应通过以下方式连接到此代理的主机,并使用其感兴趣的相应端口。tcp://
ZeroMQ 消息通道
它使用一对 ZeroMQ 套接字来连接发布者和订阅者进行消息传递交互。
它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它也可以用作本地线程间通道(使用套接字) - 在这种情况下不提供。
在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与连接到同一代理的其他类似通道交换消息。
连接 url 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及一对通过冒号的端口,用于 ZeroMQ 代理的前端和后端套接字。
为方便起见,如果通道与代理配置在与代理相同的应用程序中,则可以与实例一起提供通道,而不是连接字符串。ZeroMqChannel
SubscribableChannel
PAIR
connectUrl
ZeroMqProxy
发送套接字和接收套接字都在自己的专用线程中进行管理,因此此通道对并发友好。
这样,我们就可以在不同步的情况下从不同的线程发布和使用。ZeroMqChannel
默认情况下,使用 Jackson JSON 处理器对 (包括标头) from/to 进行(反)序列化。
此逻辑可以通过 进行配置。ZeroMqChannel
EmbeddedJsonHeadersMessageMapper
Message
byte[]
setMessageMapper(BytesMessageMapper)
发送和接收套接字可以通过相应的回调为任何选项(读/写超时、安全性等)进行自定义。setSendSocketConfigurer(Consumer<ZMQ.Socket>)
setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
其内部逻辑基于通过 Project Reactor 和运算符的反应流。
这提供了更轻松的线程控制,并允许无锁并发发布和与通道之间的消耗。
本地 PUB/SUB 逻辑作为运算符实现,以允许此通道的所有本地订阅者接收与套接字的分布式订阅者相同的已发布消息。ZeroMqChannel
Flux
Mono
Flux.publish()
PUB
以下是配置的简单示例:ZeroMqChannel
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ 入站通道适配器
这是一个具有反应式语义的实现。
它不断以非阻塞的方式从 ZeroMQ 套接字读取数据,并将消息发布到无限,如果输出通道不是响应式的,则该无限由方法中的 a 或显式订阅。
当套接字上未收到任何数据时,将在下次读取尝试之前应用 a(默认为 1 秒)。ZeroMqMessageProducer
MessageProducerSupport
Flux
FluxMessageChannel
start()
consumeDelay
只有 ,并且由 .
此组件可以使用提供的或随机的端口连接到远程套接字或绑定到 TCP 协议。
实际端口可以通过启动该组件并绑定 ZeroMQ socket 后通过获取。
套接字选项(例如安全性或写入超时)可以通过回调进行配置。SocketType.PAIR
SocketType.PULL
SocketType.SUB
ZeroMqMessageProducer
getBoundPort()
setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
如果该选项设置为 ,则从套接字中消耗的 ,则按原样在生成的 : 的有效负载中发送 : 由下游流解析和转换 .
否则,an 用于将消耗的数据转换为 .
如果接收到的是多帧,则第一帧将被视为此 ZeroMQ 消息发布到的标头。receiveRaw
true
ZMsg
Message
ZMsg
InboundMessageMapper
Message
ZMsg
ZeroMqHeaders.TOPIC
如果该选项设置为 ,则认为传入消息由两个帧组成:主题和 ZeroMQ 消息。
否则,默认情况下,将被视为由三个帧组成:第一个包含主题的帧,最后一个包含消息的帧,中间有一个空帧。unwrapTopic
false
ZMsg
对于 ,使用提供的订阅选项;默认为全部订阅。
可以在运行时使用 和 s 调整订阅。SocketType.SUB
ZeroMqMessageProducer
topics
subscribeToTopics()
unsubscribeFromTopics()
@ManagedOperation
下面是配置示例:ZeroMqMessageProducer
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ 出站通道适配器
这是一个将发布消息生成到 ZeroMQ 套接字中的实现。
仅 ,并且受支持。
唯一支持连接 ZeroMQ 套接字;不支持绑定。
当使用 时,将根据请求消息进行评估,以将主题帧注入到 ZeroMQ 消息中(如果它不是 null)。
订阅者端 () 必须先接收主题帧,然后才能解析实际数据。ZeroMqMessageHandler
ReactiveMessageHandler
SocketType.PAIR
SocketType.PUSH
SocketType.PUB
ZeroMqMessageHandler
SocketType.PUB
topicExpression
SocketType.SUB
如果该选项设置为 ,那么 ZeroMQ 消息帧将在注入的主题(如果存在)之后发送。
默认情况下,在主题和消息之间发送一个额外的空帧。wrapTopic
false
当请求消息的有效负载为 时,不会执行转换或主题提取:按原样发送到套接字中,并且不会被销毁以供进一步重用。
否则,an 用于将请求消息(或仅其有效负载)转换为 ZeroMQ 帧进行发布。
默认情况下,a 与 .
套接字选项(例如安全性或写入超时)可以通过回调进行配置。ZMsg
ZMsg
OutboundMessageMapper<byte[]>
ConvertingBytesMessageMapper
ConfigurableCompositeMessageConverter
setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
下面是配置示例:ZeroMqMessageHandler
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
ZeroMQ Java DSL 支持
通过工厂和实现为上述组件提供方便的 Java DSL 流畅 API。spring-integration-zeromq
ZeroMq
IntegrationComponentSpec
这是 Java DSL 的示例:ZeroMqChannel
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的入站通道适配器是:
IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的出站通道适配器是:
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}