文件支持

文件支持

Spring 集成的文件支持通过专用词汇表扩展了 Spring 集成核心,以处理读取、写入和转换文件。spring-doc.cadn.net.cn

您需要将此依赖项包含在您的项目中:spring-doc.cadn.net.cn

Maven 系列
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>6.0.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-file:6.0.9"

它提供了一个名称空间,该名称空间允许元素定义专用于文件的通道适配器,并支持可以将文件内容读取到字符串或字节数组中的转换器。spring-doc.cadn.net.cn

本节介绍了FileReadingMessageSourceFileWritingMessageHandler以及如何将它们配置为 bean。 它还讨论了通过特定于文件的实现来处理文件的支持Transformer. 最后,它解释了特定于文件的命名空间。spring-doc.cadn.net.cn

读取文件

一个FileReadingMessageSource可用于使用文件系统中的文件。 这是MessageSource,从文件系统目录创建消息。 以下示例显示如何配置FileReadingMessageSource:spring-doc.cadn.net.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:directory="${input.directory}"/>

要防止为某些文件创建消息,您可以提供FileListFilter. 默认情况下,我们使用以下筛选器:spring-doc.cadn.net.cn

IgnoreHiddenFileListFilter确保不处理隐藏文件。 请注意,hidden 的确切定义取决于系统。 例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏文件。 另一方面,Microsoft Windows 有一个专用的 file 属性来指示隐藏文件。spring-doc.cadn.net.cn

版本 4.2 引入了IgnoreHiddenFileListFilter. 在以前的版本中,包含隐藏文件。 使用默认配置时,IgnoreHiddenFileListFilter首先触发,然后是AcceptOnceFileListFilter.spring-doc.cadn.net.cn

AcceptOnceFileListFilter确保仅从目录中选取文件一次。spring-doc.cadn.net.cn

AcceptOnceFileListFilter将其状态存储在内存中。 如果您希望该状态在系统重启后仍然存在,可以使用FileSystemPersistentAcceptOnceFileListFilter. 此筛选条件将接受的文件名存储在MetadataStoreimplementation (请参阅 元数据存储)。 此筛选条件与文件名和修改时间匹配。spring-doc.cadn.net.cn

从 4.0 版本开始,此过滤器需要一个ConcurrentMetadataStore. 当与共享数据存储(例如Redis使用RedisMetadataStore),它允许在多个应用程序实例之间或多个服务器使用的网络文件共享之间共享筛选键。spring-doc.cadn.net.cn

从 4.1.5 版本开始,此过滤器具有一个新属性 (flushOnUpdate),这会导致它在每次更新时刷新元数据存储(如果存储实现Flushable).spring-doc.cadn.net.cn

持久文件列表过滤器现在具有布尔属性forRecursion. 将此属性设置为true,还会设置alwaysAcceptDirectories,这意味着出站网关 (lsmget) 现在每次都始终遍历完整的目录树。 这是为了解决未检测到目录树深处更改的问题。 另外forRecursion=true使文件的完整路径用作元数据存储键;这解决了以下问题:如果具有相同名称的文件在不同目录中多次出现,则过滤器无法正常工作。 重要说明:这意味着对于顶级目录下的文件,将无法找到持久性元数据存储中的现有键。 因此,该属性为false默认情况下;这可能会在未来版本中更改。spring-doc.cadn.net.cn

以下示例将FileReadingMessageSource使用滤镜:spring-doc.cadn.net.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="customFilterBean"/>

读取文件的常见问题是,文件可能在准备就绪之前被检测到(即,其他进程可能仍在写入该文件)。 默认的AcceptOnceFileListFilter不会阻止这种情况。 在大多数情况下,如果文件写入过程在准备好读取每个文件后立即重命名每个文件,则可以防止这种情况。 一个filename-patternfilename-regex过滤器,它只接受准备好的文件(可能基于已知的后缀),由默认的AcceptOnceFileListFilter允许这种情况。 这CompositeFileListFilter启用合成,如下例所示:spring-doc.cadn.net.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="compositeFilter"/>

<bean id="compositeFilter"
    class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
            <bean class="o.s.i.file.filters.RegexPatternFileListFilter">
                <constructor-arg value="^test.*$"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

如果无法使用临时名称创建文件并重命名为最终名称,则 Spring 集成提供了另一种选择。 版本 4.2 添加了LastModifiedFileListFilter. 此过滤器可以使用age属性,以便过滤器仅传递早于此值的文件。 该年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早获取文件(例如,由于网络故障)。 以下示例显示如何配置LastModifiedFileListFilter:spring-doc.cadn.net.cn

<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
    <property name="age" value="120" />
</bean>

从版本 4.3.7 开始,ChainFileListFilter(的扩展CompositeFileListFilter) 以允许后续过滤器只能看到前一个过滤器的结果。 (使用CompositeFileListFilter,则所有筛选器都会看到所有文件,但它只传递已通过所有筛选器的文件。 需要新行为的一个示例是LastModifiedFileListFilterAcceptOnceFileListFilter,当我们在一段时间后才希望接受文件时。 使用CompositeFileListFilter,由于AcceptOnceFileListFilter在第一个传递时看到所有文件,当另一个过滤器稍后传递时,它不会传递它。 这CompositeFileListFilter当模式过滤器与自定义过滤器结合使用时,方法非常有用,该过滤器查找辅助文件以指示文件传输已完成。 模式过滤器可能只传递主文件(例如something.txt),但 “done” 过滤器需要查看 if(例如)something.done存在。spring-doc.cadn.net.cn

假设我们有文件a.txt,a.doneb.txt.spring-doc.cadn.net.cn

模式筛选器仅通过a.txtb.txt,而 “done” 过滤器只能看到所有三个文件并传递a.txt. 复合过滤器的最终结果是,只有a.txt发布。spring-doc.cadn.net.cn

使用ChainFileListFilter,如果链中的任何过滤器返回空列表,则不会调用其余过滤器。

版本 5.0 引入了ExpressionFileListFilter对文件执行 SPEL 表达式作为上下文评估根对象。 为此,所有用于文件处理的 XML 组件(本地和远程)以及现有的filter属性,已随filter-expression选项,如下例所示:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter
        directory="${inputdir}"
        filter-expression="name matches '.text'"
        auto-startup="false"/>

版本 5.0.5 引入了DiscardAwareFileListFilter对 rejected files 感兴趣的实现。 为此,这样的过滤器实现应该通过addDiscardCallback(Consumer<File>). 在框架中,此功能从FileReadingMessageSource.WatchServiceDirectoryScanner,与LastModifiedFileListFilter. 与常规DirectoryScannerWatchService根据目标文件系统上的事件提供要处理的文件。 在轮询包含这些文件的内部队列时,LastModifiedFileListFilter可能会丢弃它们,因为它们相对于其配置的age. 因此,我们将丢失该文件以备将来可能的考虑。 discard 回调钩子允许我们将文件保留在内部队列中,以便可以根据age在随后的民意调查中。 这CompositeFileListFilter还实现了一个DiscardAwareFileListFilter并将 discard 回调填充到其所有DiscardAwareFileListFilter代表。spring-doc.cadn.net.cn

因为CompositeFileListFilter将文件与所有委托匹配,则discardCallback可能会对同一个文件调用多次。

从版本 5.1 开始,FileReadingMessageSource不检查目录是否存在,并且在其start()调用(通常通过包装SourcePollingChannelAdapter). 以前,在引用目录时(例如,从测试中引用)或稍后应用权限时,没有简单的方法来防止作系统权限错误。spring-doc.cadn.net.cn

消息报头

从版本 5.0 开始,FileReadingMessageSource(除了payload作为轮询File) 将以下标头填充到出站Message:spring-doc.cadn.net.cn

  • FileHeaders.FILENAME:这File.getName()的 file 中。 可用于后续的重命名或复制逻辑。spring-doc.cadn.net.cn

  • FileHeaders.ORIGINAL_FILE:这File对象本身。 通常,当我们丢失原始 File对象。 但是,为了与任何其他自定义用例保持一致和方便,此标头可用于访问原始文件。spring-doc.cadn.net.cn

  • FileHeaders.RELATIVE_PATH:引入了一个新的标头,用于表示文件路径相对于扫描的根目录的部分。 当要求在其他地方恢复源目录层次结构时,此标头可能很有用。 为此,DefaultFileNameGenerator(请参阅“'生成文件名)可以配置为使用此标头。spring-doc.cadn.net.cn

目录扫描和轮询

FileReadingMessageSource不会立即为目录中的文件生成消息。 它使用内部队列来存储scanner. 这scanEachPoll选项用于确保在每次轮询时使用最新的输入目录内容刷新内部队列。 默认情况下 (scanEachPoll = false)、FileReadingMessageSource清空其队列,然后再次扫描目录。 此默认行为对于减少对目录中大量文件的扫描特别有用。 但是,在需要自定义排序的情况下,请务必考虑将此标志设置为true. 文件的处理顺序可能与预期不符。 默认情况下,队列中的文件以其自然 (path) 订单。 通过扫描添加的新文件,即使队列中已有文件,也会入到适当的位置,以保持该自然顺序。 要自定义顺序,FileReadingMessageSource可以接受Comparator<File>作为构造函数参数。 它由内部 (PriorityBlockingQueue) 以根据业务要求对其内容进行重新排序。 因此,要按特定顺序处理文件,您应该为FileReadingMessageSource而不是对自定义生成的列表进行排序DirectoryScanner.spring-doc.cadn.net.cn

引入 5.0 版RecursiveDirectoryScanner执行文件树访问。 该实现基于Files.walk(Path start, int maxDepth, FileVisitOption…​ options)功能性。 根目录 (DirectoryScanner.listFiles(File)) 参数。 所有其他子目录包含和排除项都基于目标FileListFilter实现。 例如,SimplePatternFileListFilter默认情况下,会过滤掉目录。 看AbstractDirectoryAwareFileListFilter及其实现了解更多信息。spring-doc.cadn.net.cn

从版本 5.5 开始,FileInboundChannelAdapterSpec的 Java DSL 具有recursive(boolean)选项以使用RecursiveDirectoryScanner在目标FileReadingMessageSource而不是默认的。

命名空间支持

通过使用特定于文件的命名空间,可以简化文件读取的配置。 为此,请使用以下模板:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-file="http://www.springframework.org/schema/integration/file"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>

在此命名空间中,您可以减少FileReadingMessageSource并将其包装在入站 Channel Adapter 中,如下所示:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn1"
    directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>

<int-file:inbound-channel-adapter id="filesIn2"
    directory="file:${input.directory}"
    filter="customFilterBean" />

<int-file:inbound-channel-adapter id="filesIn3"
    directory="file:${input.directory}"
    filename-pattern="test*" />

<int-file:inbound-channel-adapter id="filesIn4"
    directory="file:${input.directory}"
    filename-regex="test[0-9]+\.txt" />

第一个通道适配器示例依赖于默认的FileListFilter实现:spring-doc.cadn.net.cn

因此,您也可以省略prevent-duplicatesignore-hidden属性,因为它们是true默认情况下。spring-doc.cadn.net.cn

Spring Integration 4.2 引入了ignore-hidden属性。 在以前的版本中,包含隐藏文件。spring-doc.cadn.net.cn

第二个通道适配器示例使用自定义过滤器,第三个示例使用filename-pattern属性以添加AntPathMatcherbased 过滤器,第四个使用filename-regex属性,将基于正则表达式模式的筛选条件添加到FileReadingMessageSource. 这filename-patternfilename-regex属性都与常规的filterreference 属性。 但是,您可以使用filter属性来引用CompositeFileListFilter它组合了任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。spring-doc.cadn.net.cn

当多个进程从同一目录读取数据时,您可能希望锁定文件以防止它们被并发选取。 为此,您可以使用FileLocker. 有一个java.nio,但也可以实现自己的锁定方案。 这nio可以按如下方式注入 Locker:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:nio-locker/>
</int-file:inbound-channel-adapter>

您可以按如下方式配置自定义保险箱:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了locker时,它负责在允许接收文件之前获取锁。 它不承担解锁文件的责任。 如果已处理文件并保持锁挂起,则存在内存泄漏。 如果这是一个问题,您应该调用FileLocker.unlock(File file)你自己在适当的时候。

当筛选和锁定文件还不够时,您可能需要完全控制文件的列出方式。 要实现此类要求,您可以使用DirectoryScanner. 此扫描程序可让您准确确定每个轮询中列出的文件。 这也是 Spring Integration 在内部用来连接FileListFilterinstances 和FileLockerFileReadingMessageSource. 您可以注入自定义DirectoryScanner<int-file:inbound-channel-adapter/>scanner属性,如下例所示:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
     scanner="customDirectoryScanner"/>

这样做可以让您完全自由地选择排序、列表和锁定策略。spring-doc.cadn.net.cn

了解过滤器(包括patterns,regex,prevent-duplicates等)和locker实例实际上由scanner. 在适配器上设置的这些属性中的任何一个随后都会注入到内部的scanner. 对于外部scanner,则FileReadingMessageSource. 必须在该自定义中指定它们(如果需要)DirectoryScanner. 换句话说,如果您注入scannerFileReadingMessageSource,您应该提供filterlocker在那个scanner,而不是在FileReadingMessageSource.spring-doc.cadn.net.cn

默认情况下,DefaultDirectoryScanner使用IgnoreHiddenFileListFilter以及一个AcceptOnceFileListFilter. 为了防止使用它们,您可以配置自己的过滤器(例如AcceptAllFileListFilter),甚至将其设置为null.

WatchServiceDirectoryScanner

FileReadingMessageSource.WatchServiceDirectoryScanner在将新文件添加到目录时依赖于文件系统事件。 在初始化期间,将注册目录以生成事件。 初始文件列表也是在初始化期间构建的。 在遍历目录树时,遇到的任何子目录也会被注册以生成事件。 在第一次轮询时,将返回遍历目录的初始文件列表。 在后续轮询中,将返回来自新创建事件的文件。 如果添加了新的子目录,则其 creation 事件用于遍历新子树以查找现有文件并注册找到的任何新子目录。spring-doc.cadn.net.cn

存在问题WatchKey当其内部事件queue不会在目录修改事件发生时被程序耗尽。 如果超出队列大小,则StandardWatchEventKinds.OVERFLOW,表示部分文件系统事件可能会丢失。 在这种情况下,将完全重新扫描根目录。 为避免重复,请考虑使用适当的FileListFilter(例如AcceptOnceFileListFilter) 或在处理完成时删除文件。

WatchServiceDirectoryScanner可以通过FileReadingMessageSource.use-watch-service选项,它与scanner选择。 内部FileReadingMessageSource.WatchServiceDirectoryScanner实例为提供的directory.spring-doc.cadn.net.cn

此外,现在WatchService轮询逻辑可以跟踪StandardWatchEventKinds.ENTRY_MODIFYStandardWatchEventKinds.ENTRY_DELETE.spring-doc.cadn.net.cn

如果您需要跟踪现有文件和新文件的修改,则应实现ENTRY_MODIFYevents 逻辑FileListFilter. 否则,将以相同的方式处理这些事件中的文件。spring-doc.cadn.net.cn

ResettableFileListFilterimplementation 拾取ENTRY_DELETE事件。 因此,他们的文件是为remove()操作。 启用此事件后,过滤器(如AcceptOnceFileListFilter删除文件。 因此,如果出现同名文件,它将通过过滤器并作为消息发送。spring-doc.cadn.net.cn

为此,watch-events属性 (FileReadingMessageSource.setWatchEvents(WatchEventType…​ watchEvents)) 已引入。 (WatchEventTypeFileReadingMessageSource.) 使用此选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他一些 logic 。 以下示例显示了如何为同一目录中的 create 和 modify 事件配置不同的逻辑:spring-doc.cadn.net.cn

值得一提的是,ENTRY_DELETE事件涉及被监视目录的子目录的重命名作。 更具体地说,ENTRY_DELETEevent 的 event(与上一个目录名称相关)位于ENTRY_CREATE事件,该事件通知新的(重命名的)目录。 在某些作系统(如 Windows)上,ENTRY_DELETEevent 必须注册才能处理这种情况。 否则,在文件资源管理器中重命名监视的子目录可能会导致在该子目录中无法检测到新文件。spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="newFiles"
     directory="${input.directory}"
     use-watch-service="true"/>

<int-file:inbound-channel-adapter id="modifiedFiles"
     directory="${input.directory}"
     use-watch-service="true"
     filter="acceptAllFilter"
     watch-events="MODIFY"/> <!-- The default is CREATE. -->

限制内存消耗

您可以使用HeadDirectoryScanner以限制内存中保留的文件数。 这在扫描大型目录时非常有用。 使用 XML 配置时,可以通过设置queue-size入站通道适配器上的属性。spring-doc.cadn.net.cn

在版本 4.2 之前,此设置与任何其他过滤器的使用不兼容。 任何其他筛选条件(包括prevent-duplicates="true") 覆盖用于限制大小的筛选条件。spring-doc.cadn.net.cn

使用HeadDirectoryScannerAcceptOnceFileListFilter. 由于在轮询决策期间会查询所有筛选条件,因此AcceptOnceFileListFilter不知道其他过滤器可能正在临时过滤文件。 即使之前由HeadDirectoryScanner.HeadFilter现已推出,则AcceptOnceFileListFilter筛选它们。spring-doc.cadn.net.cn

通常,不使用AcceptOnceFileListFilter在这种情况下,您应该删除已处理的文件,以便以前过滤的文件在将来的轮询中可用。spring-doc.cadn.net.cn

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
         FileReadingMessageSource source = new FileReadingMessageSource();
         source.setDirectory(new File(INBOUND_PATH));
         source.setFilter(new SimplePatternFileListFilter("*.txt"));
         return source;
    }

    @Bean
    @Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
    public FileToStringTransformer fileToStringTransformer() {
        return new FileToStringTransformer();
    }

}

使用 Java DSL 进行配置

Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileReadingFlow() {
         return IntegrationFlow
                  .from(Files.inboundAdapter(new File(INBOUND_PATH))
                              .patternFilter("*.txt"),
                          e -> e.poller(Pollers.fixedDelay(1000)))
                  .transform(Files.toStringTransformer())
                  .channel("processFileChannel")
                  .get();
    }

}

'tail'ing 文件

另一个常见的用例是从文件的末尾(或尾部)获取 'lines',并在添加新行时捕获新行。 提供了两种实现。 第一个OSDelegatingFileTailingMessageProducer使用本机tail命令(在具有 COMMAND 的作系统上)。 这通常是这些平台上最有效的实现。 对于没有tailcommand 的 app 中,第二个实现ApacheCommonsFileTailingMessageProducer使用 Apachecommons-io Tailer类。spring-doc.cadn.net.cn

在这两种情况下,文件系统事件(例如文件不可用和其他事件)都发布为ApplicationEvent实例。 此类事件的示例包括:spring-doc.cadn.net.cn

[message=tail: cannot open '/tmp/somefile' for reading:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become inaccessible:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has appeared;
               following end of new file, file=/tmp/somefile]

例如,在旋转文件时,可能会发生前面示例中显示的事件序列。spring-doc.cadn.net.cn

从版本 5.0 开始,FileTailingIdleEventidleEventInterval. 以下示例显示了此类事件的外观:spring-doc.cadn.net.cn

[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持tail命令提供这些状态消息。

从这些终端节点发出的消息具有以下标头:spring-doc.cadn.net.cn

在 5.0 版之前的版本中,FileHeaders.FILENAMEheader 包含文件绝对路径的字符串表示形式。 现在,您可以通过调用getAbsolutePath()在原始文件头上。

以下示例使用默认选项 ('-F -n 0',表示从当前端开始遵循文件名) 创建本机适配器。spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	task-executor="exec"
	file="/tmp/foo"/>

以下示例使用 '-F -n +0' 选项(表示遵循文件名,发出所有现有行)创建本机适配器。spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	native-options="-F -n +0"
	task-executor="exec"
	file-delay=10000
	file="/tmp/foo"/>

如果tail命令失败(在某些平台上,缺少文件会导致tail失败,即使使用-F指定),则每 10 秒重试一次该命令。spring-doc.cadn.net.cn

默认情况下,本机适配器从标准输出捕获并将内容作为消息发送。 他们还从标准错误中捕获以引发事件。 从版本 4.3.6 开始,您可以通过设置enable-status-readerfalse,如下例所示:spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	enable-status-reader="false"
	task-executor="exec"
	file="/tmp/foo"/>

在以下示例中,IdleEventInterval设置为5000,这意味着,如果 5 秒内没有写入任何行,则FileTailingIdleEvent每 5 秒触发一次:spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	idle-event-interval="5000"
	task-executor="exec"
	file="/tmp/somefile"/>

当您需要停止适配器时,这可能很有用。spring-doc.cadn.net.cn

以下示例创建一个 Apachecommons-io Tailer适配器,它每两秒检查一次文件是否有新行,并每十秒检查一次是否存在缺失的文件:spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="apache"
	channel="input"
	task-executor="exec"
	file="/tmp/bar"
	delay="2000"
	end="false"             (1)
	reopen="true"           (2)
	file-delay="10000"/>
1 文件从开头 (end="false") 而不是 end(这是默认值)。
2 将为每个块重新打开文件(默认为保持文件打开)。
指定delay,endreopenattributes 强制使用 Apachecommons-io适配器,并使native-options属性不可用。

处理不完整的数据

文件传输方案中的一个常见问题是如何确定传输已完成,以便不会开始读取不完整的文件。 解决此问题的一种常见方法是使用临时名称编写文件,然后以原子方式将其重命名为最终名称。 此技术与遮盖临时文件不被使用者选取的过滤器一起,提供了一个强大的解决方案。 这种技术被写入文件(本地或远程)的 Spring 集成组件使用。 默认情况下,它们会附加.writing添加到文件名中,并在传输完成后将其删除。spring-doc.cadn.net.cn

另一种常见的技术是编写第二个 “marker” 文件以指示文件传输已完成。 在这种情况下,您不应考虑somefile.txt(例如)可使用somefile.txt.complete也存在。 Spring 集成版本 5.0 引入了新的过滤器来支持这种机制。 为文件系统 (FileSystemMarkerFilePresentFileListFilter)、FTPSFTP 的 API 请求。 它们是可配置的,因此标记文件可以具有任何名称,尽管它通常与正在传输的文件相关。 有关更多信息,请参阅 Javadocspring-doc.cadn.net.cn

写入文件

要将消息写入文件系统,您可以使用FileWritingMessageHandler. 此类可以处理以下有效负载类型:spring-doc.cadn.net.cn

对于 String 负载,您可以配置编码和 charset。spring-doc.cadn.net.cn

为了简化作,您可以配置FileWritingMessageHandler作为出站通道适配器或出站网关的一部分,通过使用 XML 名称空间。spring-doc.cadn.net.cn

从版本 4.3 开始,您可以指定写入文件时要使用的缓冲区大小。spring-doc.cadn.net.cn

从版本 5.1 开始,您可以提供BiConsumer<File, Message<?>> newFileCallback如果您使用FileExistsMode.APPENDFileExistsMode.APPEND_NO_FLUSH并且必须创建一个新文件。 此回调接收新创建的文件以及触发该文件的消息。 例如,此回调可用于编写消息标头中定义的 CSV 标头。spring-doc.cadn.net.cn

生成文件名

在最简单的形式中,FileWritingMessageHandler只需要一个目标目录来写入文件。 要写入的文件的名称由处理程序的FileNameGenerator. 默认实现会查找其 key 与定义为FileHeaders.FILENAME.spring-doc.cadn.net.cn

或者,您可以指定要根据消息计算的表达式以生成文件名 — 例如headers['myCustomHeader'] + '.something'. 表达式的计算结果必须为String. 为方便起见,DefaultFileNameGenerator还提供setHeaderName方法,允许您显式指定其值将用作文件名的消息标头。spring-doc.cadn.net.cn

设置完成后,DefaultFileNameGenerator采用以下解析步骤来确定给定消息有效负载的文件名:spring-doc.cadn.net.cn

  1. 根据消息评估表达式,如果结果为非空String,将其用作文件名。spring-doc.cadn.net.cn

  2. 否则,如果有效负载是java.io.File,请使用File对象的文件名。spring-doc.cadn.net.cn

  3. 否则,请使用附加了 .msg作为文件名。spring-doc.cadn.net.cn

当您使用 XML 名称空间支持时,文件出站通道适配器和文件出站网关都支持以下互斥的配置属性:spring-doc.cadn.net.cn

在写入文件时,将使用临时文件后缀(其默认值为.writing). 在写入文件时,它会附加到文件名中。 要自定义后缀,您可以设置temporary-file-suffixFile outbound Channel Adapter 和 File Outbound Gateway 上的属性。spring-doc.cadn.net.cn

使用APPEND文件modetemporary-file-suffix属性,因为数据是直接附加到文件中的。

从版本 4.2.5 开始,生成的文件名(作为filename-generatorfilename-generator-expressionevaluation) 可以表示子路径和目标文件名。 它用作File(File parent, String child)如故。 但是,在过去,我们没有创建 (mkdirs()) 目录中,仅假定文件名。 当我们需要恢复文件系统树以匹配源目录时,这种方法非常有用,例如,解压缩存档并按原始顺序保存目标目录中的所有文件时。spring-doc.cadn.net.cn

指定输出目录

文件出站通道适配器和文件出站网关都提供了两个互斥的配置属性,用于指定输出目录:spring-doc.cadn.net.cn

Spring Integration 2.2 引入了directory-expression属性。
使用directory属性

当您使用directory属性,则输出目录将设置为固定值,该值在FileWritingMessageHandler已初始化。 如果不指定此属性,则必须使用directory-expression属性。spring-doc.cadn.net.cn

使用directory-expression属性

如果您想获得完整的 SPEL 支持,可以使用directory-expression属性。 此属性接受一个 SPEL 表达式,该表达式针对正在处理的每条消息进行评估。 因此,当您动态指定输出文件目录时,您对消息的有效负载及其标头具有完全访问权限。spring-doc.cadn.net.cn

SPEL 表达式必须解析为String,java.io.Fileorg.springframework.core.io.Resource. (后者被评估为File总之。 此外,由此产生的StringFile必须指向一个目录。 如果未指定directory-expression属性,则必须设置directory属性。spring-doc.cadn.net.cn

使用auto-create-directory属性

默认情况下,如果目标目录不存在,则会自动创建相应的目标目录和任何不存在的父目录。 要防止这种行为,您可以将auto-create-directory属性设置为false. 此属性适用于directorydirectory-expression属性。spring-doc.cadn.net.cn

使用directoryattribute 和auto-create-directoryfalse,从 Spring Integration 2.2 开始进行了以下更改:spring-doc.cadn.net.cn

现在,对正在处理的每条消息执行此检查,而不是在初始化适配器时检查目标目录是否存在。spring-doc.cadn.net.cn

此外,如果auto-create-directorytrue并且该目录在处理消息之间被删除,则会为正在处理的每条消息重新创建该目录。spring-doc.cadn.net.cn

处理现有目标文件

当您写入文件并且目标文件已存在时,默认行为是覆盖该目标文件。 您可以通过设置mode属性。 存在以下选项:spring-doc.cadn.net.cn

Spring Integration 2.2 引入了mode属性和APPEND,FAILIGNORE选项。
REPLACE

如果目标文件已存在,则将其覆盖。 如果mode属性,这是写入文件时的默认行为。spring-doc.cadn.net.cn

REPLACE_IF_MODIFIED

如果目标文件已存在,则仅当上次修改的时间戳与源文件的时间戳不同时,才会覆盖该目标文件。 为Filepayloads,则 payloadlastModified将 time 与现有文件进行比较。 对于其他负载,FileHeaders.SET_MODIFIED (file_setModified) 标头与现有文件进行比较。 如果标头缺失或具有不是Number,则始终会替换该文件。spring-doc.cadn.net.cn

APPEND

此模式允许您将消息内容附加到现有文件,而不是每次都创建新文件。 请注意,此属性与temporary-file-suffix属性,因为当它将内容附加到现有文件时,适配器不再使用临时文件。 文件在每条消息后关闭。spring-doc.cadn.net.cn

APPEND_NO_FLUSH

此选项与APPEND,但不会刷新数据,也不会在每条消息后关闭文件。 这可以提供显著的性能,但在发生故障时有丢失数据的风险。 看使用 时刷新文件APPEND_NO_FLUSH了解更多信息。spring-doc.cadn.net.cn

FAIL

如果目标文件存在,则MessageHandlingException被抛出。spring-doc.cadn.net.cn

IGNORE

如果目标文件存在,则消息负载将被静默忽略。spring-doc.cadn.net.cn

使用临时文件后缀(默认值为.writing)、IGNORE选项(如果最终文件名或临时文件名存在)适用。

使用 时刷新文件APPEND_NO_FLUSH

APPEND_NO_FLUSHmode 是在版本 4.3 中添加的。 使用它可以提高性能,因为文件不会在每条消息后关闭。 但是,如果发生故障,这可能会导致数据丢失。spring-doc.cadn.net.cn

Spring 集成提供了几种刷新策略来减轻这种数据丢失:spring-doc.cadn.net.cn

  • flushInterval. 如果文件在这段时间内未写入,则会自动刷新该文件。 这是近似值,可能取决于1.33x这一次(平均1.167x).spring-doc.cadn.net.cn

  • 将包含正则表达式的消息发送到消息处理程序的trigger方法。 具有与模式匹配的绝对路径名的文件将被刷新。spring-doc.cadn.net.cn

  • 为处理程序提供自定义MessageFlushPredicate实现来修改将消息发送到trigger方法。spring-doc.cadn.net.cn

  • 调用处理程序的flushIfNeeded方法,方法是传入自定义FileWritingMessageHandler.FlushPredicateFileWritingMessageHandler.MessageFlushPredicate实现。spring-doc.cadn.net.cn

将为每个打开的文件调用谓词。 有关更多信息,请参阅这些接口的 Javadoc。 请注意,从版本 5.0 开始,谓词方法提供了另一个参数:当前文件的首次写入时间(如果是新的或以前的关闭)。spring-doc.cadn.net.cn

使用flushInterval,则间隔从上次写入开始。 仅当文件在时间间隔内处于空闲状态时,才会刷新该文件。 从版本 4.3.7 开始,一个额外的属性 (flushWhenIdle) 可以设置为false,这意味着该间隔从第一次写入以前刷新的(或新)文件开始。spring-doc.cadn.net.cn

文件时间戳

默认情况下,目标文件的lastModifiedtimestamp 是创建文件的时间(就地重命名保留当前时间戳除外)。 从版本 4.3 开始,您现在可以配置preserve-timestamp(或setPreserveTimestamp(true)使用 Java 配置时)。 为Filepayloads 的 Payloads,这会将时间戳从入站文件传输到出站文件(无论是否需要副本)。 对于其他负载,如果FileHeaders.SET_MODIFIED标头 (file_setModified) 的 API 中,它用于设置目标文件的lastModifiedtimestamp,只要标头是Number.spring-doc.cadn.net.cn

文件权限

从版本 5.0 开始,在将文件写入支持 Posix 权限的文件系统时,您可以在出站通道适配器或网关上指定这些权限。 该属性是一个整数,通常以熟悉的八进制格式提供 — 例如0640,这意味着所有者具有读/写权限,组具有只读权限,其他人没有访问权限。spring-doc.cadn.net.cn

文件出站通道适配器

下面的示例配置文件出站通道适配器:spring-doc.cadn.net.cn

<int-file:outbound-channel-adapter id="filesOut" directory="${input.directory.property}"/>

基于命名空间的配置还支持delete-source-files属性。 如果设置为true,它会在写入目标后触发对原始源文件的删除。 该标志的默认值为false. 以下示例演示如何将其设置为true:spring-doc.cadn.net.cn

<int-file:outbound-channel-adapter id="filesOut"
    directory="${output.directory}"
    delete-source-files="true"/>
delete-source-files属性仅在入站消息具有Filepayload 的 intent 或FileHeaders.ORIGINAL_FILEheader 值包含 sourceFile实例或String表示原始文件路径。

从版本 4.2 开始,FileWritingMessageHandler支持append-new-line选择。 如果设置为true,则在写入消息后,将向文件附加一个新行。 默认属性值为false. 以下示例演示如何使用append-new-line选择:spring-doc.cadn.net.cn

<int-file:outbound-channel-adapter id="newlineAdapter"
	append-new-line="true"
    directory="${output.directory}"/>

出站网关

如果要继续根据写入的文件处理消息,可以使用outbound-gateway相反。 它的作用类似于outbound-channel-adapter. 但是,在写入文件后,它还会将其作为消息的有效负载发送到回复通道。spring-doc.cadn.net.cn

以下示例配置出站网关:spring-doc.cadn.net.cn

<int-file:outbound-gateway id="mover" request-channel="moveInput"
    reply-channel="output"
    directory="${output.directory}"
    mode="REPLACE" delete-source-files="true"/>

如前所述,您还可以指定mode属性,它定义如何处理目标文件已存在的情况的行为。 有关更多详细信息,请参阅 Dealing with Existing Destination Files。 通常,在使用文件出站网关时,结果文件将作为回复通道上的消息负载返回。spring-doc.cadn.net.cn

这在指定IGNORE模式。 在这种情况下,将返回预先存在的目标文件。 如果请求消息的负载是一个文件,您仍然可以通过消息标头访问该原始文件。 请参阅 FileHeaders.ORIGINAL_FILEspring-doc.cadn.net.cn

“outbound-gateway” 在您希望首先移动文件,然后通过处理管道发送文件的情况下效果很好。 在这种情况下,您可以将文件命名空间的inbound-channel-adapter元素添加到outbound-gateway然后连接该网关的reply-channel拖动到管道的开头。

如果您有更详细的要求或需要支持其他有效负载类型作为要转换为文件内容的输入,则可以扩展FileWritingMessageHandler,但更好的选择是依赖Transformer.spring-doc.cadn.net.cn

使用 Java 配置进行配置

Spring 下面的 Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
@IntegrationComponentScan
public class FileWritingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                      new SpringApplicationBuilder(FileWritingJavaApplication.class)
                              .web(false)
                              .run(args);
             MyGateway gateway = context.getBean(MyGateway.class);
             gateway.writeToFile("foo.txt", new File(tmpDir.getRoot(), "fileWritingFlow"), "foo");
    }

    @Bean
    @ServiceActivator(inputChannel = "writeToFileChannel")
    public MessageHandler fileWritingMessageHandler() {
         Expression directoryExpression = new SpelExpressionParser().parseExpression("headers.directory");
         FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
         handler.setFileExistsMode(FileExistsMode.APPEND);
         return handler;
    }

    @MessagingGateway(defaultRequestChannel = "writeToFileChannel")
    public interface MyGateway {

        void writeToFile(@Header(FileHeaders.FILENAME) String fileName,
                       @Header(FileHeaders.FILENAME) File directory, String data);

    }
}

使用 Java DSL 进行配置

Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FileWritingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                 new SpringApplicationBuilder(FileWritingJavaApplication.class)
                         .web(false)
                         .run(args);
        MessageChannel fileWritingInput = context.getBean("fileWritingInput", MessageChannel.class);
        fileWritingInput.send(new GenericMessage<>("foo"));
    }

    @Bean
   	public IntegrationFlow fileWritingFlow() {
   	    return IntegrationFlow.from("fileWritingInput")
   		        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.txt")
   		                  .header("directory", new File(tmpDir.getRoot(), "fileWritingFlow")))
   	            .handle(Files.outboundGateway(m -> m.getHeaders().get("directory")))
   	            .channel(MessageChannels.queue("fileWritingResultChannel"))
   	            .get();
    }

}

文件转换器

要将从文件系统读取的数据转换为对象,反之亦然,您需要做一些工作。 与FileReadingMessageSource在较小程度上FileWritingMessageHandler,则可能需要自己的机制来完成工作。 为此,您可以实现Transformer接口。 或者,您可以扩展AbstractFilePayloadTransformer用于入站消息。 Spring 集成提供了一些明显的实现。spring-doc.cadn.net.cn

请参阅Javadoc 的Transformer接口以查看哪些 Spring Integration 类实现了它。 同样,您可以检查Javadoc 的AbstractFilePayloadTransformer以查看哪些 Spring Integration 类扩展了它。spring-doc.cadn.net.cn

FileToByteArrayTransformer延伸AbstractFilePayloadTransformer并转换Fileobject 转换为byte[]通过使用 Spring 的FileCopyUtils. 通常,使用一系列 transformer 比将所有转换放在一个类中要好。 在这种情况下,Filebyte[]转换可能是合乎逻辑的第一步。spring-doc.cadn.net.cn

FileToStringTransformer延伸AbstractFilePayloadTransformerFileobject 设置为String. 如果不出意外,这对调试很有用(考虑将其与 wire tap 一起使用)。spring-doc.cadn.net.cn

要配置特定于文件的转换器,可以使用 file 命名空间中的相应元素,如下例所示:spring-doc.cadn.net.cn

<int-file:file-to-bytes-transformer  input-channel="input" output-channel="output"
    delete-files="true"/>

<int-file:file-to-string-transformer input-channel="input" output-channel="output"
    delete-files="true" charset="UTF-8"/>

delete-filesoption 向转换器发出信号,表示它应该在转换完成后删除入站文件。 这绝不能替代使用AcceptOnceFileListFilterFileReadingMessageSource正在多线程环境中使用(例如,当您通常使用 Spring 集成时)。spring-doc.cadn.net.cn

文件拆分器

FileSplitter在版本 4.1.2 中添加,其命名空间支持在版本 4.2 中添加。 这FileSplitter将文本文件拆分为单独的行,基于BufferedReader.readLine(). 默认情况下,拆分器使用Iterator在从文件中读取行时一次发出一行。 设置iteratorproperty 设置为false使它在将所有行作为消息发出之前将其读入内存。 一个用例可能是,您希望在发送任何包含行的消息之前检测文件上的 I/O 错误。 但是,它仅适用于相对较短的文件。spring-doc.cadn.net.cn

入站有效负载可以是File,String(一个Filepath)、InputStreamReader. 其他负载类型保持不变。spring-doc.cadn.net.cn

以下清单显示了配置FileSplitter:spring-doc.cadn.net.cn

Java DSL
@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
Kotlin DSL
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
Java
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
XML 格式
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 splitter 的 bean 名称。
2 设置为true(默认值)来使用迭代器,或者false在发送行之前将文件加载到内存中。
3 设置为true在文件数据之前和之后发出文件开始和文件结束标记消息。 标记是带有FileSplitter.FileMarkerpayloads(使用STARTEND值中的mark属性)。 在筛选某些行的下游流中按顺序处理文件时,可以使用标记。 它们使下游处理能够知道文件何时被完全处理。 此外,一个file_marker标头,其中包含STARTEND已添加到这些消息中。 这ENDmarker 包括行数。 如果文件为空,则仅STARTEND标记使用0作为lineCount. 默认值为false. 什么时候true,apply-sequencefalse默认情况下。 另请参阅markers-json(next 属性)。
4 什么时候markers为 true,请将此项设置为true以将FileMarker对象转换为 JSON 字符串。 (使用SimpleJsonSerializer下面)。
5 设置为false要禁用sequenceSizesequenceNumber消息中的标头。 默认值为true除非markerstrue. 什么时候truemarkerstrue,则标记将包含在排序中。 什么时候trueiteratortruesequenceSizeheader 设置为0,因为大小未知。
6 设置为true以引起RequiresReplyException如果文件中没有行,则引发。 默认值为false.
7 设置将文本数据读入时使用的字符集名称String负载。 默认值为 platform charset。
8 在为其余行发出的消息中作为标题携带的第一行的标题名称。 从 5.0 版本开始。
9 设置用于将消息发送到拆分器的输入通道。
10 设置将消息发送到的输出通道。
11 设置发送超时。 仅当output-channelcan 阻止 — 例如完整的QueueChannel.
12 设置为false以禁用在刷新上下文时自动启动 Splitter。 默认值为true.
13 如果input-channel是一个<publish-subscribe-channel/>.
14 设置拆分器的启动阶段(在auto-startuptrue).

FileSplitter还会拆分任何基于文本的InputStream转换为行。 从版本 4.3 开始,当与 FTP 或 SFTP 流入站通道适配器或者使用stream选项来检索文件,则当文件被完全消耗时,拆分器会自动关闭支持流的会话 有关这些工具的更多信息,请参阅 FTP Streaming Inbound Channel AdapterSFTP Streaming Inbound Channel Adapter 以及 FTP Outbound GatewaySFTP Outbound Gatewayspring-doc.cadn.net.cn

使用 Java 配置时,可以使用其他构造函数,如下例所示:spring-doc.cadn.net.cn

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

什么时候markersJson为 true,则标记表示为 JSON 字符串(使用SimpleJsonSerializer).spring-doc.cadn.net.cn

版本 5.0 引入了firstLineAsHeader选项以指定内容的第一行是标题(如 CSV 文件中的列名)。 传递给此属性的参数是标头名称,在该名称下,第一行作为其余行发出的消息中的标头。 此行不包含在序列标头中(如果applySequence为 true),也不在lineCount关联FileMarker.END. 注意:从版本 5.5 开始,lineCount' 也作为FileHeaders.LINE_COUNTFileMarker.END消息,由于FileMarker可以序列化为 JSON。 如果文件仅包含标题行,则该文件将被视为空,因此仅FileMarker在拆分期间发出实例(如果启用了标记 — 否则,不会发出任何消息)。 默认情况下(如果未设置标头名称),第一行被视为 data 并成为第一个发出的消息的有效负载。spring-doc.cadn.net.cn

如果您需要有关从文件内容中提取 Headers 的更复杂的逻辑(不是第一行,不是行的整个内容,也不是一个特定的 Header,等等),请考虑在FileSplitter. 请注意,已移动到标题的行可能会从正常内容流程的下游进行筛选。spring-doc.cadn.net.cn

幂等下游处理拆分文件

什么时候apply-sequence为 true,则拆分器会将SEQUENCE_NUMBER标头(当markers为 true,则标记计为行)。 该行号可与幂等接收器一起使用,以避免在重新启动后重新处理行。spring-doc.cadn.net.cn

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}

文件聚合器

从版本 5.5 开始,一个FileAggregator的引入是为了覆盖FileSplitteruse-case 启用 START/END 标记时。 为方便起见,FileAggregator实现所有三种序列详细信息策略:spring-doc.cadn.net.cn

  • HeaderAttributeCorrelationStrategy使用FileHeaders.FILENAMEattribute 用于关联键的计算。 在FileSplitter,它不会填充序列详细信息标头,因为 START/END 标记消息也包含在序列大小中。 这FileHeaders.FILENAME仍会为发出的每一行填充,包括 START/END 标记消息。spring-doc.cadn.net.cn

  • FileMarkerReleaseStrategy- 检查FileSplitter.FileMarker.Mark.END消息,然后比较FileHeaders.LINE_COUNTheader 值与组大小减2 - FileSplitter.FileMarker实例。 它还实现了一个方便的GroupConditionProvider联系方式conditionSupplier函数中要使用的AbstractCorrelatingMessageHandler. 有关更多信息,请参阅 Message Group Conditionspring-doc.cadn.net.cn

  • FileAggregatingMessageGroupProcessor只是删除FileSplitter.FileMarker消息,并将其余消息收集到要生成的 List 负载中。spring-doc.cadn.net.cn

以下清单显示了配置FileAggregator:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
    return f -> f
            .split(Files.splitter()
                    .markers()
                    .firstLineAsHeader("firstLine"))
            .channel(c -> c.executor(taskExecutor))
            .filter(payload -> !(payload instanceof FileSplitter.FileMarker),
                    e -> e.discardChannel("aggregatorChannel"))
            .<String, String>transform(String::toUpperCase)
            .channel("aggregatorChannel")
            .aggregate(new FileAggregator())
            .channel(c -> c.queue("resultChannel"));
}
Kotlin DSL
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
    integrationFlow {
        split(Files.splitter().markers().firstLineAsHeader("firstLine"))
        channel { executor(taskExecutor) }
        filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
        transform(String::toUpperCase)
        channel("aggregatorChannel")
        aggregate(FileAggregator())
        channel { queue("resultChannel") }
    }
Java
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setProcessorBean(new FileAggregator());
    aggregator.setOutputChannel(outputChannel);
    return aggregator;
}
XML 格式
<int:chain input-channel="input" output-channel="output">
    <int-file:splitter markers="true"/>
    <int:aggregator>
        <bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
    </int:aggregator>
</int:chain>

如果FileAggregator不满足目标逻辑,建议为聚合器终端节点配置单个策略。 看FileAggregatorJavaDocs 了解更多信息。spring-doc.cadn.net.cn

远程持久文件列表过滤器

入站和流式处理入站远程文件通道适配器 (FTP,SFTP和其他技术)配置了相应的AbstractPersistentFileListFilter默认情况下,使用 In-Memory 配置MetadataStore. 要在集群中运行,可以使用共享的MetadataStore(有关更多信息,请参阅 元数据存储 )。 这些过滤器用于防止多次获取同一文件(除非修改时间更改)。 从版本 5.2 开始,在获取文件之前立即将文件添加到过滤器中(如果获取失败,则撤消)。spring-doc.cadn.net.cn

如果发生灾难性故障(例如断电),当前正在获取的文件可能会保留在过滤器中,并且在重新启动应用程序时不会重新获取。 在这种情况下,您需要从MetadataStore.

在以前的版本中,在提取任何文件之前都会过滤文件,这意味着在发生灾难性故障后,多个文件可能会处于此状态。spring-doc.cadn.net.cn

为了促进这种新行为,在FileListFilter.spring-doc.cadn.net.cn

boolean accept(F file);

boolean supportsSingleFileFiltering();

如果筛选条件返回truesupportsSingleFileFiltering,它必须实现accept().spring-doc.cadn.net.cn

如果远程过滤器不支持单个文件过滤(例如AbstractMarkerFilePresentFileListFilter),则适配器将恢复为之前的行为。spring-doc.cadn.net.cn

如果使用了多个过滤器(使用CompositeFileListFilterChainFileListFilter),则所有委托过滤器都必须支持单个文件过滤,复合过滤器才能支持它。spring-doc.cadn.net.cn

持久文件列表过滤器现在具有布尔属性forRecursion. 将此属性设置为true,还会设置alwaysAcceptDirectories,这意味着出站网关 (lsmget) 现在每次都始终遍历完整的目录树。 这是为了解决未检测到目录树深处更改的问题。 另外forRecursion=true使文件的完整路径用作元数据存储键;这解决了以下问题:如果具有相同名称的文件在不同目录中多次出现,则过滤器无法正常工作。 重要说明:这意味着对于顶级目录下的文件,将无法找到持久性元数据存储中的现有键。 因此,该属性为false默认情况下;这可能会在未来版本中更改。spring-doc.cadn.net.cn