流支持

流支持

在许多情况下,应用程序数据是从流中获取的。 建议不要将对流的引用作为消息负载发送给使用者。 相反,消息是从输入流中读取的数据创建的,并且消息负载将逐个写入输出流。spring-doc.cadn.net.cn

您需要将此依赖项包含在您的项目中:spring-doc.cadn.net.cn

Maven 系列
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>6.0.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-stream:6.0.9"

从 Streams 中读取

Spring 集成为流提供了两个适配器。 双ByteStreamReadingMessageSourceCharacterStreamReadingMessageSource实现MessageSource. 通过在 channel-adapter 元素中配置其中一个,可以配置轮询周期,并且消息总线可以自动检测和调度它们。 字节流版本需要一个InputStream,并且字符流版本需要Reader作为单个构造函数参数。 这ByteStreamReadingMessageSource还接受 'bytesPerMessage' 属性来确定它尝试读取每个Message. 默认值为1024. 以下示例创建一个输入流,该流创建每个消息包含 2048 字节的消息:spring-doc.cadn.net.cn

<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
  <constructor-arg ref="someInputStream"/>
  <property name="bytesPerMessage" value="2048"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
  <constructor-arg ref="someReader"/>
</bean>

CharacterStreamReadingMessageSource将读取器包装在BufferedReader(如果它还不是)。 您可以在第二个 constructor 参数中设置缓冲读取器使用的缓冲区大小。 从版本 5.0 开始,第三个 constructor 参数 (blockToDetectEOF) 控制CharacterStreamReadingMessageSource. 什么时候false(默认值)、receive()方法检查读取器是否为ready()否则返回 null。 在这种情况下,不会检测到 EOF(文件结束)。 什么时候truereceive()方法阻塞,直到数据可用或在底层流上检测到 EOF。 检测到 EOF 时,会发出StreamClosedEvent(应用程序事件) 发布。 您可以将此事件与实现ApplicationListener<StreamClosedEvent>.spring-doc.cadn.net.cn

为了便于 EOF 检测,轮询程序线程在receive()方法,直到数据到达或检测到 EOF。
检测到 EOF 后,轮询程序将继续在每个轮询上发布事件。 应用程序侦听器可以停止适配器以防止这种情况。 该事件在 poller 线程上发布。 停止适配器会导致线程中断。 如果您打算在停止适配器后执行某些可中断的任务,则必须执行stop()或对这些下游活动使用不同的线程。 请注意,发送到QueueChannel是可中断的,因此,如果您希望从侦听器发送消息,请在停止适配器之前执行此作。

这有助于将数据“管道化”或重定向到stdin,如下两个示例所示:spring-doc.cadn.net.cn

cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt

此方法允许应用程序在管道关闭时停止。spring-doc.cadn.net.cn

有四种方便的工厂方法可用:spring-doc.cadn.net.cn

public static final CharacterStreamReadingMessageSource stdin() { ... }

public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }

public static final CharacterStreamReadingMessageSource stdinPipe() { ... }

public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }

写入流

对于目标流,您可以使用以下两种实现之一:ByteStreamWritingMessageHandlerCharacterStreamWritingMessageHandler. 每个 API 都需要一个构造函数参数 (OutputStream对于 Byte 流或Writer),并且每个都提供第二个构造函数,该构造函数添加可选的 'bufferSize'。 由于这两种方法最终都会实现MessageHandler接口中,您可以从channel-adapter配置,如 Channel Adapter 中所述。spring-doc.cadn.net.cn

<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
  <constructor-arg ref="someOutputStream"/>
  <constructor-arg value="1024"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
  <constructor-arg ref="someWriter"/>
</bean>

Stream 命名空间支持

Spring 集成定义了一个名称空间,以减少与流相关的通道适配器所需的配置。 要使用它,需要以下架构位置:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/integration/stream
      https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

以下代码片段显示了配置入站通道适配器时支持的不同配置选项:spring-doc.cadn.net.cn

<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>

<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>

从版本 5.0 开始,您可以设置detect-eof属性,该属性将blockToDetectEOF财产。 有关更多信息,请参阅从 Streams 中读取spring-doc.cadn.net.cn

要配置出站通道适配器,您也可以使用名称空间支持。 以下示例显示了出站通道适配器的不同配置:spring-doc.cadn.net.cn

<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
    channel="testChannel"/>

<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
    channel="testChannel"/>

<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>

<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
    channel="testChannel"/>