FTP/FTPS适配器

FTP/FTPS 适配器

Spring 集成支持使用 FTP 和 FTPS 进行文件传输作。spring-doc.cadn.net.cn

文件传输协议 (FTP) 是一种简单的网络协议,可让您在 Internet 上的两台计算机之间传输文件。 FTPS 代表“FTP over SSL”。spring-doc.cadn.net.cn

您需要将此依赖项包含在您的项目中:spring-doc.cadn.net.cn

Maven 系列
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-ftp</artifactId>
    <version>6.0.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-ftp:6.0.9"

在 FTP 通信方面,有两个参与者:客户端和服务器。 要使用 FTP 或 FTPS 传输文件,请使用启动与运行 FTP 服务器的远程计算机的连接的客户端。 建立连接后,客户端可以选择发送或接收文件副本。spring-doc.cadn.net.cn

Spring 集成通过提供三个客户端端点来支持通过 FTP 或 FTPS 发送和接收文件:入站通道适配器、出站通道适配器和出站网关。 它还为定义这些客户端组件提供了方便的基于命名空间的配置选项。spring-doc.cadn.net.cn

要使用 FTP 命名空间,请将以下内容添加到 XML 文件的标头中:spring-doc.cadn.net.cn

xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp
    https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd"

FTP 会话工厂

Spring 集成提供了可用于创建 FTP(或 FTPS)会话的工厂。spring-doc.cadn.net.cn

默认工厂

从版本 3.0 开始,默认情况下不再缓存会话。 请参阅 FTP 会话缓存

在配置 FTP 适配器之前,必须配置 FTP 会话工厂。 您可以使用常规 bean 定义配置 FTP 会话工厂,其中实现类为o.s.i.ftp.session.DefaultFtpSessionFactory. 以下示例显示了基本配置:spring-doc.cadn.net.cn

<bean id="ftpClientFactory"
    class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
    <property name="host" value="localhost"/>
    <property name="port" value="22"/>
    <property name="username" value="kermit"/>
    <property name="password" value="frog"/>
    <property name="clientMode" value="0"/>
    <property name="fileType" value="2"/>
    <property name="bufferSize" value="100000"/>
</bean>

对于 FTPS 连接,您可以使用o.s.i.ftp.session.DefaultFtpsSessionFactory相反。spring-doc.cadn.net.cn

以下示例显示了完整的配置:spring-doc.cadn.net.cn

<bean id="ftpClientFactory"
    class="org.springframework.integration.ftp.session.DefaultFtpsSessionFactory">
    <property name="host" value="localhost"/>
    <property name="port" value="22"/>
    <property name="username" value="oleg"/>
    <property name="password" value="password"/>
    <property name="clientMode" value="1"/>
    <property name="fileType" value="2"/>
    <property name="useClientMode" value="true"/>
    <property name="cipherSuites" value="a,b.c"/>
    <property name="keyManager" ref="keyManager"/>
    <property name="protocol" value="SSL"/>
    <property name="trustManager" ref="trustManager"/>
    <property name="prot" value="P"/>
    <property name="needClientAuth" value="true"/>
    <property name="authValue" value="oleg"/>
    <property name="sessionCreation" value="true"/>
    <property name="protocols" value="SSL, TLS"/>
    <property name="implicit" value="true"/>
</bean>
如果您遇到连接问题,并且想要跟踪会话创建并查看轮询了哪些会话,则可以通过将记录器设置为TRACE级别(例如,log4j.category.org.springframework.integration.file=TRACE).

现在你只需要将这些 session factories 注入到你的适配器中。 适配器使用的协议 (FTP 或 FTPS) 取决于已注入适配器的会话工厂的类型。spring-doc.cadn.net.cn

为 FTP 或 FTPS 会话工厂提供值的更实用的方法是使用 Spring 的属性占位符支持(参见 https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-factory-placeholderconfigurer)。

高级配置

DefaultFtpSessionFactory提供了对底层客户端 API 的抽象,该 API(自 Spring Integration 2.0 起)是 Apache Commons Net。 这使您无需了解org.apache.commons.net.ftp.FTPClient. 会话工厂中公开了几个常见属性(从版本 4.0 开始,现在包括connectTimeout,defaultTimeoutdataTimeout). 但是,您有时需要访问较低级别FTPClientconfiguration 实现更高级的配置(例如设置活动模式的端口范围)。 为此,AbstractFtpSessionFactory(所有 FTP 会话工厂的基类)以以下清单中所示的两种后处理方法的形式公开钩子:spring-doc.cadn.net.cn

/**
 * Will handle additional initialization after client.connect() method was invoked,
 * but before any action on the client has been taken
 */
protected void postProcessClientAfterConnect(T t) throws IOException {
    // NOOP
}
/**
 * Will handle additional initialization before client.connect() method was invoked.
 */
protected void postProcessClientBeforeConnect(T client) throws IOException {
    // NOOP
}

如您所见,这两种方法没有默认实现。 但是,通过扩展DefaultFtpSessionFactory中,您可以覆盖这些方法,以提供FTPClient,如下例所示:spring-doc.cadn.net.cn

public class AdvancedFtpSessionFactory extends DefaultFtpSessionFactory {

    protected void postProcessClientBeforeConnect(FTPClient ftpClient) throws IOException {
       ftpClient.setActivePortRange(4000, 5000);
    }
}

FTPS 和共享 SSLSession

当使用 FTP over SSL 或 TLS 时,某些服务器需要相同的SSLSession用于 Control 和 Data 连接。 这是为了防止 “窃取” 数据连接。 有关更多信息,请参阅 https://scarybeastsecurity.blogspot.cz/2009/02/vsftpd-210-released.htmlspring-doc.cadn.net.cn

目前,Apache FTPSClient 不支持此功能。 请参阅 NET-408spring-doc.cadn.net.cn

以下解决方案由 Stack Overflow 提供,它使用了sun.security.ssl.SSLSessionContextImpl,因此它可能不适用于其他 JVM。 堆栈溢出答案于 2015 年提交,Spring 集成团队已在 JDK 1.8.0_112 上测试了该解决方案。spring-doc.cadn.net.cn

以下示例说明如何创建 FTPS 会话:spring-doc.cadn.net.cn

@Bean
public DefaultFtpsSessionFactory sf() {
    DefaultFtpsSessionFactory sf = new DefaultFtpsSessionFactory() {

        @Override
        protected FTPSClient createClientInstance() {
            return new SharedSSLFTPSClient();
        }

    };
    sf.setHost("...");
    sf.setPort(21);
    sf.setUsername("...");
    sf.setPassword("...");
    sf.setNeedClientAuth(true);
    return sf;
}

private static final class SharedSSLFTPSClient extends FTPSClient {

    @Override
    protected void _prepareDataSocket_(final Socket socket) throws IOException {
        if (socket instanceof SSLSocket) {
            // Control socket is SSL
            final SSLSession session = ((SSLSocket) _socket_).getSession();
            final SSLSessionContext context = session.getSessionContext();
            context.setSessionCacheSize(0); // you might want to limit the cache
            try {
                final Field sessionHostPortCache = context.getClass()
                        .getDeclaredField("sessionHostPortCache");
                sessionHostPortCache.setAccessible(true);
                final Object cache = sessionHostPortCache.get(context);
                final Method method = cache.getClass().getDeclaredMethod("put", Object.class,
                        Object.class);
                method.setAccessible(true);
                String key = String.format("%s:%s", socket.getInetAddress().getHostName(),
                        String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
                method.invoke(cache, key, session);
                key = String.format("%s:%s", socket.getInetAddress().getHostAddress(),
                        String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
                method.invoke(cache, key, session);
            }
            catch (NoSuchFieldException e) {
                // Not running in expected JRE
                logger.warn("No field sessionHostPortCache in SSLSessionContext", e);
            }
            catch (Exception e) {
                // Not running in expected JRE
                logger.warn(e.getMessage());
            }
        }

    }

}

委派 Session Factory

版本 4.2 引入了DelegatingSessionFactory,这允许在运行时选择实际的 session factory。 在调用 FTP 端点之前,调用setThreadKey()将 key 与当前线程相关联。 然后,该键用于查找要使用的实际 session factory。 您可以通过调用clearThreadKey()使用后。spring-doc.cadn.net.cn

我们添加了便捷的方法,以便您可以轻松地从消息流中使用委托会话工厂。spring-doc.cadn.net.cn

下面的示例展示了如何声明一个委托的 session factory:spring-doc.cadn.net.cn

<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
    <constructor-arg>
        <bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
            <!-- delegate factories here -->
        </bean>
    </constructor-arg>
</bean>

<int:service-activator input-channel="in" output-channel="c1"
        expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />

<int-ftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />

<int:service-activator input-channel="c2" output-channel="out"
        expression="@dsf.clearThreadKey(#root)" />
当您使用会话缓存时(请参阅 FTP 会话缓存),应缓存每个委托。 您不能缓存DelegatingSessionFactory本身。

从版本 5.0.7 开始,DelegatingSessionFactory可与RotatingServerAdvice轮询多个服务器;请参见入站通道适配器:轮询多个服务器和目录spring-doc.cadn.net.cn

FTP 入站通道适配器

FTP 入站通道适配器是一个特殊的侦听器,它连接到 FTP 服务器并侦听远程目录事件(例如,创建了新文件),此时它将启动文件传输。 以下示例说明如何配置inbound-channel-adapter:spring-doc.cadn.net.cn

<int-ftp:inbound-channel-adapter id="ftpInbound"
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    auto-create-local-directory="true"
    delete-remote-files="true"
    filename-pattern="*.txt"
    remote-directory="some/remote/path"
    remote-file-separator="/"
    preserve-timestamp="true"
    local-filename-generator-expression="#this.toUpperCase() + '.a'"
    scanner="myDirScanner"
    local-filter="myFilter"
    temporary-file-suffix=".writing"
    max-fetch-size="-1"
    local-directory=".">
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

如前面的配置所示,您可以使用inbound-channel-adapter元素,同时还为各种属性(例如local-directory,filename-pattern(基于简单的模式匹配,而不是正则表达式),以及对session-factory.spring-doc.cadn.net.cn

默认情况下,传输的文件与原始文件同名。 如果要覆盖此行为,可以设置local-filename-generator-expression属性,它允许您提供 SPEL 表达式来生成本地文件的名称。 与出站网关和适配器不同,其中 SPEL 评估上下文的根对象是Message,则此入站适配器在评估时还没有消息,因为这是它最终以传输的文件作为其有效负载生成的消息。 因此,SPEL 评估上下文的根对象是远程文件的原始名称(一个String).spring-doc.cadn.net.cn

入站通道适配器首先检索Fileobject 作为本地目录,然后根据 Poller 配置发出每个文件。 从版本 5.0 开始,您现在可以限制在需要检索新文件时从 FTP 服务器获取的文件数。 当目标文件非常大时,或者在具有持久文件列表过滤器的集群系统中运行时,这可能很有用,稍后将讨论。 用max-fetch-size为此目的。 负值(默认值)表示没有限制,并且将检索所有匹配的文件。 有关更多信息,请参见入站通道适配器:控制远程文件获取。 从 5.0 版本开始,您还可以提供自定义的DirectoryScannerimplementation 添加到inbound-channel-adapter通过设置scanner属性。spring-doc.cadn.net.cn

从 Spring Integration 3.0 开始,您可以指定preserve-timestamp属性(其默认值为false). 什么时候true,则本地文件的修改时间戳将设置为从服务器检索的值。 否则,它将设置为当前时间。spring-doc.cadn.net.cn

从版本 4.2 开始,您可以指定remote-directory-expression而不是remote-directory,让您动态确定每个轮询的目录 — 例如remote-directory-expression="@myBean.determineRemoteDir()".spring-doc.cadn.net.cn

从版本 4.3 开始,您可以省略remote-directoryremote-directory-expression属性。 它们默认为null. 在这种情况下,根据 FTP 协议,客户端工作目录将用作默认远程目录。spring-doc.cadn.net.cn

有时,基于使用filename-pattern属性可能还不够。 如果是这种情况,您可以使用filename-regex属性指定正则表达式(例如filename-regex=".*\.test$"). 此外,如果您需要完全控制,可以使用filter属性,并提供对o.s.i.file.filters.FileListFilter,一个用于筛选文件列表的策略接口。 此筛选器确定要检索的远程文件。 您还可以将基于模式的过滤器与其他过滤器(如AcceptOnceFileListFilter以避免同步之前已获取的文件)通过使用CompositeFileListFilter.spring-doc.cadn.net.cn

AcceptOnceFileListFilter将其状态存储在内存中。 如果您希望该状态在系统重启后仍然存在,请考虑使用FtpPersistentAcceptOnceFileListFilter相反。 此筛选条件将接受的文件名存储在MetadataStore策略(请参阅元数据存储)。 此过滤器匹配文件名和远程修改时间。spring-doc.cadn.net.cn

从 4.0 版本开始,此过滤器需要一个ConcurrentMetadataStore. 当与共享数据存储(例如Redis使用RedisMetadataStore),它允许在多个应用程序或服务器实例之间共享筛选键。spring-doc.cadn.net.cn

从版本 5.0 开始,FtpPersistentAcceptOnceFileListFilter使用 In-MemorySimpleMetadataStore默认应用于FtpInboundFileSynchronizer. 此筛选条件还与regexpattern选项以及FtpInboundChannelAdapterSpec在 Java DSL 中。 任何其他用例都可以使用CompositeFileListFilter(或ChainFileListFilter).spring-doc.cadn.net.cn

前面的讨论是指在检索文件之前筛选文件。 检索文件后,将对文件系统上的文件应用额外的过滤器。 默认情况下,这是一个AcceptOnceFileListFilter如前所述,它将 state 保留在内存中,并且不考虑文件的修改时间。 除非您的应用程序在处理后删除文件,否则默认情况下,适配器将在应用程序重新启动后重新处理磁盘上的文件。spring-doc.cadn.net.cn

此外,如果您配置filter要使用FtpPersistentAcceptOnceFileListFilter并且远程文件时间戳发生变化(导致它被重新获取),则默认的本地过滤器不允许处理这个新文件。spring-doc.cadn.net.cn

有关此筛选器及其使用方法的更多信息,请参阅远程持久性文件列表筛选器spring-doc.cadn.net.cn

您可以使用local-filter属性来配置本地文件系统过滤器的行为。 从版本 4.3.8 开始,FileSystemPersistentAcceptOnceFileListFilter默认配置。 此筛选条件将接受的文件名和修改后的时间戳存储在MetadataStore策略(请参阅元数据存储)并检测对本地文件修改时间的更改。 默认的MetadataStore是一个SimpleMetadataStore,它将状态存储在内存中。spring-doc.cadn.net.cn

从 4.1.5 版本开始,这些过滤器有一个新属性 (flushOnUpdate),这会导致它们刷新 元数据存储(如果存储实现Flushable).spring-doc.cadn.net.cn

此外,如果您使用分布式MetadataStore(例如 Redis),您可以拥有同一适配器或应用程序的多个实例,并确保每个文件只处理一次。

实际的本地过滤器是一个CompositeFileListFilter,其中包含提供的过滤器和一个模式过滤器,该过滤器阻止处理正在下载的文件(基于temporary-file-suffix). 下载带有此后缀的文件(默认值为.writing),并在传输完成后将文件重命名为其最终名称,使其对过滤器“可见”。spring-doc.cadn.net.cn

remote-file-separator属性允许您配置文件分隔符,以便在默认 '/' 不适用于您的特定环境时使用。spring-doc.cadn.net.cn

请参阅 架构 以了解有关这些属性的更多详细信息。spring-doc.cadn.net.cn

您还应该了解 FTP 入站通道适配器是轮询使用者。 因此,你必须配置一个 Poller (通过使用 global default 或 local sub-element)。 传输文件后,将显示一条带有java.io.File,因为其有效负载生成并发送到由channel属性。spring-doc.cadn.net.cn

详细了解文件筛选和不完整文件

有时,刚刚出现在受监视(远程)目录中的文件并不完整。 通常,此类文件是使用临时扩展名(例如somefile.txt.writing),然后在写入过程完成后重命名。 在大多数情况下,您只对完整的文件感兴趣,并且只想筛选完整的文件。 要处理这些情况,您可以使用filename-pattern,filename-regexfilter属性。 以下示例使用自定义筛选器实现:spring-doc.cadn.net.cn

<int-ftp:inbound-channel-adapter
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    filter="customFilter"
    local-directory="file:/my_transfers">
    remote-directory="some/remote/path"
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

<bean id="customFilter" class="org.example.CustomFilter"/>

入站 FTP 适配器的轮询器配置说明

入站 FTP 适配器的作业包括两个任务:spring-doc.cadn.net.cn

  1. 与远程服务器通信,以便将文件从远程目录传输到本地目录。spring-doc.cadn.net.cn

  2. 对于每个传输的文件,生成一条消息,将该文件作为有效负载,并将其发送到由 'channel' 属性标识的通道。 这就是为什么它们被称为 “'channel adapters'” 而不仅仅是 “'adapters'”。 这种适配器的主要工作是生成要发送到消息通道的消息。 从本质上讲,第二个任务的优先级是,如果您的本地目录已经有一个或多个文件,则它首先从这些文件生成消息。 只有当所有本地文件都已处理完时,它才会启动远程通信以检索更多文件。spring-doc.cadn.net.cn

此外,在 Poller 上配置触发器时,您应该密切注意max-messages-per-poll属性。 其默认值为1为了所有人SourcePollingChannelAdapter实例(包括 FTP)。 这意味着,一旦处理了一个文件,它就会等待由触发器配置确定的下一个执行时间。 如果您碰巧有一个或多个文件位于local-directory,它将在启动与远程 FTP 服务器的通信之前处理这些文件。 此外,如果max-messages-per-poll设置为1(默认),它一次只处理一个文件,其间隔由触发器定义,本质上是 “one-poll === one-file”。spring-doc.cadn.net.cn

对于典型的文件传输使用案例,您很可能希望发生相反的行为:处理每次轮询可以处理的所有文件,然后才等待下一次轮询。 如果是这种情况,请将max-messages-per-poll设置为 -1。 然后,在每次轮询时,适配器会尝试生成尽可能多的消息。 换句话说,它处理本地目录中的所有内容,然后连接到远程目录以传输所有可用内容,以便在本地处理。 只有这样,poll作才被视为完成,并且 Poller 等待下一次执行时间。spring-doc.cadn.net.cn

您也可以将 'max-messages-per-poll' 值设置为正值,该值表示每次轮询要从文件创建的消息的上限。 例如,值10表示在每次轮询时,它尝试处理的文件不超过 10 个。spring-doc.cadn.net.cn

从故障中恢复

了解适配器的架构非常重要。 有一个文件同步器可以获取文件,还有一个FileReadingMessageSource,为每个 synchronized 文件。 如前所述,涉及两个过滤器。 这filter属性(和模式)引用远程 (FTP) 文件列表,以避免获取已经 被获取。 这local-filterFileReadingMessageSource来确定哪些文件将作为消息发送。spring-doc.cadn.net.cn

同步器列出远程文件并查阅其过滤器。 然后传输文件。 如果在文件传输过程中发生 IO 错误,则会删除已添加到筛选器的任何文件,以便它们 有资格在下次轮询时重新获取。 这仅适用于过滤器实现ReversibleFileListFilter(例如AcceptOnceFileListFilter).spring-doc.cadn.net.cn

如果在同步文件后,处理文件的下游流发生错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。spring-doc.cadn.net.cn

如果您希望在失败后重新处理此类文件,您可以使用类似于以下内容的配置来方便 从过滤器中删除失败的文件:spring-doc.cadn.net.cn

<int-ftp:inbound-channel-adapter id="ftpAdapter"
        session-factory="ftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/ftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-ftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

上述配置适用于任何ResettableFileListFilter.spring-doc.cadn.net.cn

从版本 5.0 开始,入站通道适配器可以在本地构建与生成的本地文件名相对应的子目录。 那也可以是远程子路径。 为了能够递归读取本地目录以根据层次结构支持进行修改,您现在可以提供内部FileReadingMessageSource替换为新的RecursiveDirectoryScanner基于Files.walk()算法。 看AbstractInboundFileSynchronizingMessageSource.setScanner()了解更多信息。 此外,您现在可以将AbstractInboundFileSynchronizingMessageSourceWatchService-基于DirectoryScanner通过使用setUseWatchService()选择。 它还为所有WatchEventType实例来响应本地目录中的任何修改。 前面显示的 reprocessing 示例基于FileReadingMessageSource.WatchServiceDirectoryScanner执行ResettableFileListFilter.remove()删除文件时 (StandardWatchEventKinds.ENTRY_DELETE) 从本地目录获取。 看WatchServiceDirectoryScanner了解更多信息。spring-doc.cadn.net.cn

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        sf.setTestSession(true);
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
        FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> ftpMessageSource() {
        FtpInboundFileSynchronizingMessageSource source =
                new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("ftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 进行配置

Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow ftpInboundFlow() {
        return IntegrationFlow
            .from(Ftp.inboundAdapter(this.ftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilename(f -> f.toUpperCase() + ".a")
                    .localDirectory(new File("d:\\ftp_files")),
                e -> e.id("ftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

处理不完整的数据

FtpSystemMarkerFilePresentFileListFilter用于筛选在远程系统上没有相应标记文件的远程文件。 有关配置信息,请参阅 Javadoc(并浏览到父类)。spring-doc.cadn.net.cn

FTP 流入站通道适配器

版本 4.3 引入了流入站通道适配器。 此适配器生成 payload 类型的 messageInputStream,让文件在不写入 local 文件系统。 由于会话保持打开状态,因此使用应用程序负责在文件 消耗。 该会话在closeableResource标头 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE). 标准框架组件(例如FileSplitterStreamTransformer,则会自动关闭会话。 有关这些组件的更多信息,请参见File SplitterStream Transformer。 以下示例说明如何配置inbound-streaming-channel-adapter:spring-doc.cadn.net.cn

<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>

只有其中之一filename-pattern,filename-regex,filterfilter-expression是允许的。spring-doc.cadn.net.cn

从版本 5.0 开始,默认情况下,FtpStreamingMessageSource适配器可防止远程文件重复FtpPersistentAcceptOnceFileListFilter基于 In-MemorySimpleMetadataStore. 默认情况下,此过滤器也与文件名模式(或 regex)一起应用。 如果需要允许重复项,可以使用AcceptAllFileListFilter. 任何其他用例都可以由CompositeFileListFilter(或ChainFileListFilter). Java 配置(本文档后面部分)显示了一种在处理后删除远程文件以避免重复的技术。

有关FtpPersistentAcceptOnceFileListFilter及其使用方式,请参阅远程持久性文件列表过滤器spring-doc.cadn.net.cn

使用max-fetch-size属性来限制在需要提取时每次轮询时提取的文件数。 将其设置为1并在集群环境中运行时使用持久性过滤器。 有关更多信息,请参见入站通道适配器:控制远程文件获取spring-doc.cadn.net.cn

适配器将远程目录和文件名放在FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE标头。 从版本 5.0 开始,FileHeaders.REMOTE_FILE_INFOheader 提供额外的远程文件信息(默认以 JSON 表示)。 如果将fileInfoJson属性FtpStreamingMessageSourcefalse,标头包含一个FtpFileInfo对象。 这FTPFile对象,可以使用FtpFileInfo.getFileInfo()方法。 这fileInfoJson属性在使用 XML 配置时不可用,但您可以通过注入FtpStreamingMessageSource添加到您的配置类之一中。 另请参阅远程文件信息spring-doc.cadn.net.cn

从版本 5.1 开始,comparatorFTPFile. 以前,它是AbstractFileInfo<FTPFile>. 这是因为排序现在在筛选和应用之前在处理的早期执行maxFetch.spring-doc.cadn.net.cn

使用 Java 配置进行配置

Spring 下面的 Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("ftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public FtpRemoteFileTemplate template() {
        return new FtpRemoteFileTemplate(ftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

请注意,在此示例中,转换器下游的消息处理程序具有advice,这将在处理后删除远程文件。spring-doc.cadn.net.cn

入站通道适配器:轮询多个服务器和目录

从版本 5.0.7 开始,RotatingServerAdvice可用;当配置为 Poller Advice 时,入站适配器可以轮询多个服务器和目录。 配置建议并将其正常添加到 Poller 的建议链中。 一个DelegatingSessionFactory用于选择服务器,有关更多信息,请参阅 Delegating Session Factory。 建议配置由一个RotationPolicy.KeyDirectory对象。spring-doc.cadn.net.cn

@Bean
public RotatingServerAdvice advice() {
    List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
    return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

此建议将轮询目录foo在服务器上one直到没有新文件存在,然后移动到目录bar,然后是目录baz在服务器上two等。spring-doc.cadn.net.cn

可以使用fairconstructor arg 的 Constructor Arg 中:spring-doc.cadn.net.cn

公平
@Bean
public RotatingServerAdvice advice() {
    ...
    return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}

在这种情况下,无论上一个 poll 是否返回文件,通知都将移动到下一个服务器/目录。spring-doc.cadn.net.cn

或者,您也可以提供自己的RotationPolicy要根据需要重新配置消息源:spring-doc.cadn.net.cn

政策
public interface RotationPolicy {

    void beforeReceive(MessageSource<?> source);

    void afterReceive(boolean messageReceived, MessageSource<?> source);

}
习惯
@Bean
public RotatingServerAdvice advice() {
    return new RotatingServerAdvice(myRotationPolicy());
}

local-filename-generator-expression属性 (localFilenameGeneratorExpression)现在可以包含#remoteDirectory变量。 这允许将从不同目录中检索到的文件下载到本地的类似目录:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(Ftp.inboundAdapter(sf())
                    .filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                    .localDirectory(new File(tmpDir))
                    .localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
                    .remoteDirectory("."),
                e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
            .channel(MessageChannels.queue("files"))
            .get();
}
不要配置TaskExecutor在轮询器上有关更多信息,请参见Conditional Pollers for Message Sources

入站通道适配器:控制远程文件获取

在配置入站通道适配器时,应考虑两个属性。max-messages-per-poll与所有 Poller 一样,可用于限制每次轮询时发出的消息数(如果已准备好超过配置的值)。max-fetch-size(自版本 5.0 起)可以限制一次从远程服务器检索的文件数。spring-doc.cadn.net.cn

以下场景假定起始状态为空的本地目录:spring-doc.cadn.net.cn

  • max-messages-per-poll=2max-fetch-size=1:适配器获取一个文件,发出它,获取下一个文件,发出它,然后休眠直到下一次轮询。spring-doc.cadn.net.cn

  • max-messages-per-poll=2max-fetch-size=2):适配器获取这两个文件,然后发出每个文件。spring-doc.cadn.net.cn

  • max-messages-per-poll=2max-fetch-size=4:适配器最多获取四个文件(如果可用)并发出前两个文件(如果至少有两个)。 接下来的两个文件将在下一次轮询时发出。spring-doc.cadn.net.cn

  • max-messages-per-poll=2max-fetch-size未指定:适配器获取所有远程文件并发出前两个文件(如果至少有两个)。 后续文件将在后续轮询时发出(一次两个)。 当所有文件都被消耗时,将再次尝试远程获取以获取任何新文件。spring-doc.cadn.net.cn

当您部署应用程序的多个实例时,我们建议使用小型max-fetch-size,以避免一个实例“抓取”所有文件并耗尽其他实例。

另一个用途max-fetch-size是要停止获取远程文件,但继续处理已获取的文件。 设置maxFetchSize属性MessageSource(以编程方式,使用 JMX 或控制总线)有效地阻止适配器获取更多文件,但允许 Poller 继续为以前获取的文件发出消息。 如果在更改属性时 poller 处于活动状态,则更改将在下一次轮询时生效。spring-doc.cadn.net.cn

从版本 5.1 开始,可以为同步器提供Comparator<FTPFile>. 这在限制使用maxFetchSize.spring-doc.cadn.net.cn

FTP 出站通道适配器

FTP 出站通道适配器依赖于MessageHandler连接到 FTP 服务器并为它在传入消息的有效负载中接收的每个文件启动 FTP 传输的实现。 它还支持文件的多种表示形式,因此您不仅限于java.io.File-类型的有效负载。 FTP 出站通道适配器支持以下有效负载:spring-doc.cadn.net.cn

以下示例说明如何配置outbound-channel-adapter:spring-doc.cadn.net.cn

<int-ftp:outbound-channel-adapter id="ftpOutbound"
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    charset="UTF-8"
    remote-file-separator="/"
    auto-create-directory="true"
    remote-directory-expression="headers['remote_dir']"
    temporary-remote-directory-expression="headers['temp_remote_dir']"
    filename-generator="fileNameGenerator"
    use-temporary-filename="true"
    chmod="600"
    mode="REPLACE"/>

前面的配置显示了如何使用outbound-channel-adapter元素,同时还为各种属性(例如filename-generatoro.s.i.file.FileNameGeneratorstrategy 接口)、对session-factory和其他属性。 您还可以看到一些*expression属性,这些属性允许您使用 SPEL 配置设置,例如remote-directory-expression,temporary-remote-directory-expressionremote-filename-generator-expression(SPEL 的替代品filename-generator,如前面的示例所示)。 与任何允许使用 SPEL 的组件一样,可以通过“payload”和“headers”变量访问有效负载和消息 Headers。 请参阅 架构 有关可用属性的更多详细信息。spring-doc.cadn.net.cn

默认情况下,如果未指定文件名生成器,则 Spring 集成使用o.s.i.file.DefaultFileNameGenerator.DefaultFileNameGenerator根据file_name标头(如果存在)在MessageHeaders,或者,如果 Message 的有效负载已经是java.io.File,则使用该文件的原始名称。
定义某些值(例如remote-directory) 可能依赖于平台或 FTP 服务器。 例如,正如 https://forum.spring.io/showthread.php?p=333478&posted=1#post333478 上报告的那样,在某些平台上,您必须在目录定义的末尾添加斜杠(例如,remote-directory="/thing1/thing2/"而不是remote-directory="/thing1/thing2").

从版本 4.1 开始,您可以指定mode传输文件时。 默认情况下,现有文件将被覆盖。 模式由FileExistsModeenumeration,其中包括以下值:spring-doc.cadn.net.cn

IGNOREFAIL请勿传输文件。FAIL导致引发异常,而IGNORE以 Silent Json 的形式忽略传输(尽管DEBUGlog entry 生成)。spring-doc.cadn.net.cn

版本 5.2 引入了chmod属性,可用于在上传后更改远程文件权限。 您可以使用传统的 Unix 八进制格式(例如600仅允许文件所有者的读写)。 使用 java 配置适配器时,您可以使用setChmodOctal("600")setChmod(0600). 仅当您的 FTP 服务器支持SITE CHMOD子命令。spring-doc.cadn.net.cn

避免部分写入的文件

处理文件传输时出现的常见问题之一是处理部分文件的可能性。 也就是说,文件可能在传输实际完成之前就出现在文件系统中。spring-doc.cadn.net.cn

为了解决这个问题, Spring 集成 FTP 适配器使用一种通用算法:文件以临时名称传输,然后在完全传输后重命名。spring-doc.cadn.net.cn

默认情况下,正在传输的每个文件都显示在文件系统中,并带有一个附加后缀,默认情况下,该后缀为.writing. 您可以通过设置temporary-file-suffix属性。spring-doc.cadn.net.cn

但是,在某些情况下,您可能不想使用此技术(例如,如果服务器不允许重命名文件)。 对于此类情况,您可以通过设置use-temporary-file-namefalse(默认值为true). 当此属性为false,则文件使用其最终名称写入,并且使用应用程序需要某种其他机制来检测文件是否已完全上传,然后再访问它。spring-doc.cadn.net.cn

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                    new SpringApplicationBuilder(FtpJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToFtp(new File("/foo/bar.txt"));
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        sf.setTestSession(true);
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler handler() {
        FtpMessageHandler handler = new FtpMessageHandler(ftpSessionFactory());
        handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
        handler.setFileNameGenerator(new FileNameGenerator() {

            @Override
            public String generateFileName(Message<?> message) {
                 return "handlerContent.test";
            }

        });
        return handler;
    }

    @MessagingGateway
    public interface MyGateway {

         @Gateway(requestChannel = "toFtpChannel")
         void sendToFtp(File file);

    }
}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
            new SpringApplicationBuilder(FtpJavaApplication.class)
                .web(false)
                .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToFtp(new File("/foo/bar.txt"));
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        sf.setTestSession(true);
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public IntegrationFlow ftpOutboundFlow() {
        return IntegrationFlow.from("toFtpChannel")
                .handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
                        .useTemporaryFileName(false)
                        .fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
                        .remoteDirectory(this.ftpServer.getTargetFtpDirectory().getName())
                ).get();
    }

    @MessagingGateway
    public interface MyGateway {

         @Gateway(requestChannel = "toFtpChannel")
         void sendToFtp(File file);

    }

}

FTP 出站网关

FTP 出站网关提供一组有限的命令来与远程 FTP 或 FTPS 服务器进行交互。 支持的命令包括:spring-doc.cadn.net.cn

使用ls命令

ls列出远程文件并支持以下选项:spring-doc.cadn.net.cn

此外,还提供了文件名过滤,其方式与inbound-channel-adapter. 请参阅 FTP 入站通道适配器spring-doc.cadn.net.cn

lsoperation 是文件名列表或FileInfo对象。 这些对象提供修改时间、权限和其他详细信息等信息。spring-doc.cadn.net.cn

远程目录ls执行 action on 的命令在file_remoteDirectory页眉。spring-doc.cadn.net.cn

当使用递归选项 (-R)、fileName包括任何 subdirectory 元素,表示文件的相对路径(相对于远程目录)。 如果-dirs选项,则每个递归目录也会作为列表中的一个元素返回。 在这种情况下,建议您不要使用-1选项,因为您无法区分文件和目录,而您可以使用FileInfo对象。spring-doc.cadn.net.cn

从版本 4.3 开始,FtpSession支持null对于list()listNames()方法。 因此,您可以省略expression属性。 为方便起见,Java 配置有两个没有expression论点。 或LS,NLST,PUTMPUT命令null被视为客户端工作目录,根据 FTP 协议。 所有其他命令都必须随expression根据请求消息评估远程路径。 您可以使用FTPClient.changeWorkingDirectory()函数,当您扩展DefaultFtpSessionFactory并实施postProcessClientAfterConnect()回调。spring-doc.cadn.net.cn

使用nlst命令

版本 5 引入了对nlst命令。spring-doc.cadn.net.cn

nlst列出远程文件名,并且仅支持一个选项:spring-doc.cadn.net.cn

nlstoperation 是文件名列表。spring-doc.cadn.net.cn

远程目录nlst执行 action on 的命令在file_remoteDirectory页眉。spring-doc.cadn.net.cn

-1选项ls命令,它使用LIST命令、nlst命令会发送一个NLST命令发送到目标 FTP 服务器。 当服务器不支持LIST(例如,由于安全限制)。 结果nlstoperation 是名称,没有其他详细信息。 因此,框架无法确定实体是否为目录,例如,无法执行筛选或递归列表。spring-doc.cadn.net.cn

使用get命令

get检索远程文件。 它支持以下选项:spring-doc.cadn.net.cn

file_remoteDirectoryheader 提供远程目录名称,而file_remoteFileheader 提供文件名。spring-doc.cadn.net.cn

get作是File对象或InputStream当您使用-stream选择。 这-stream选项允许将文件作为流检索。 对于文本文件,一个常见的用例是将此作与文件拆分器流转换器结合使用。 将远程文件作为流使用时,您负责关闭Session在流被消费后。 为方便起见,SessioncloseableResourceheader,您可以使用IntegrationMessageHeaderAccessor以下示例演示如何使用 convenience 方法:spring-doc.cadn.net.cn

Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
    closeable.close();
}

文件拆分器和流转换器等框架组件在传输数据后会自动关闭会话。spring-doc.cadn.net.cn

以下示例演示如何将文件作为流使用:spring-doc.cadn.net.cn

<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
                            request-channel="inboundGetStream"
                            command="get"
                            command-options="-stream"
                            expression="payload"
                            remote-directory="ftpTarget"
                            reply-channel="stream" />

<int-file:splitter input-channel="stream" output-channel="lines" />
如果您在自定义组件中使用输入流,则必须关闭Session. 您可以在自定义代码中执行此作,也可以通过将消息的副本路由到service-activator并使用 SPEL,如下例所示:
<int:service-activator input-channel="closeSession"
    expression="headers['closeableResource'].close()" />

使用mget命令

mget根据模式检索多个远程文件,并支持以下选项:spring-doc.cadn.net.cn

mget作是List<File>object(即ListFile对象,每个对象代表一个检索到的文件)。spring-doc.cadn.net.cn

从版本 5.0 开始,如果FileExistsModeIGNORE,则输出消息的有效负载不再包含由于文件已存在而未获取的文件。 以前,该列表包含所有文件,包括已存在的文件。

用于确定远程路径的表达式应生成以- 例如somedir/将在somedir.spring-doc.cadn.net.cn

从版本 5.0 开始,递归mget,结合新的FileExistsMode.REPLACE_IF_MODIFIED模式,可用于定期在本地同步整个远程目录树。 此模式将本地文件的上次修改时间戳替换为远程时间戳,而不管-P(保留时间戳)选项。spring-doc.cadn.net.cn

使用递归 (-R)

该模式将被忽略,并被假定为。 默认情况下,将检索整个远程树。 但是,可以通过提供*FileListFilter. 树中的目录也可以通过这种方式进行筛选。 一个FileListFilter可以通过引用提供,由filename-patternfilename-regex属性。 例如filename-regex="(subDir|.*1.txt)"检索所有以1.txt在远程目录中,使用subDir子目录。 但是,下一个示例显示了一个替代方法,即 5.0 版本可用。spring-doc.cadn.net.cn

如果过滤了子目录,则不会对该子目录执行额外的遍历。spring-doc.cadn.net.cn

-dirs选项(递归的mget使用递归ls获取目录树,因此目录本身不能包含在列表中)。spring-doc.cadn.net.cn

通常,您会使用#remoteDirectory变量在local-directory-expression,以便在本地保留远程目录结构。spring-doc.cadn.net.cn

持久文件列表过滤器现在具有布尔属性forRecursion. 将此属性设置为true,还会设置alwaysAcceptDirectories,这意味着出站网关 (lsmget) 现在每次都始终遍历完整的目录树。 这是为了解决未检测到目录树深处更改的问题。 另外forRecursion=true使文件的完整路径用作元数据存储键;这解决了以下问题:如果具有相同名称的文件在不同目录中多次出现,则过滤器无法正常工作。 重要说明:这意味着对于顶级目录下的文件,将无法找到持久性元数据存储中的现有键。 因此,该属性为false默认情况下;这可能会在未来版本中更改。spring-doc.cadn.net.cn

从版本 5.0 开始,FtpSimplePatternFileListFilterFtpRegexPatternFileListFilter可以通过设置alwaysAcceptDirectoriesproperty 设置为true. 这样做允许对简单模式进行递归,如下例所示:spring-doc.cadn.net.cn

<bean id="starDotTxtFilter"
        class="org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter">
    <constructor-arg value="*.txt" />
    <property name="alwaysAcceptDirectories" value="true" />
</bean>

<bean id="dotStarDotTxtFilter"
            class="org.springframework.integration.ftp.filters.FtpRegexPatternFileListFilter">
    <constructor-arg value="^.*\.txt$" />
    <property name="alwaysAcceptDirectories" value="true" />
</bean>

定义筛选器(如前面示例中的筛选器)后,可以通过设置filter属性。spring-doc.cadn.net.cn

使用put命令

put命令将文件发送到远程服务器。 消息的有效负载可以是java.io.File一个byte[]String. 一个remote-filename-generator(或 expression) 用于命名远程文件。 其他可用属性包括remote-directory,temporary-remote-directory及其*-expression等价物:use-temporary-file-nameauto-create-directory. 有关更多信息,请参阅 schema documentation.spring-doc.cadn.net.cn

put作是String这表示传输后文件在服务器上的完整路径。spring-doc.cadn.net.cn

版本 5.2 引入了chmod属性,该属性在上传后更改远程文件权限。 您可以使用传统的 Unix 八进制格式(例如600仅允许文件所有者的读写)。 使用 java 配置适配器时,您可以使用setChmod(0600). 仅当您的 FTP 服务器支持SITE CHMOD子命令。spring-doc.cadn.net.cn

使用mput命令

mput将多个文件发送到服务器,并且仅支持一个选项:spring-doc.cadn.net.cn

  • -R:递归的。 发送目录及其子目录中的所有文件(可能已过滤)。spring-doc.cadn.net.cn

消息负载必须是java.io.File(或String) 表示本地目录。 从 5.1 版本开始,一组FileString也受支持。spring-doc.cadn.net.cn

此命令支持与put命令. 此外,本地目录中的文件可以使用以下选项之一进行过滤mput-pattern,mput-regex,mput-filtermput-filter-expression. 只要子目录本身通过过滤器,过滤器就可以使用递归。 未通过过滤器的子目录不会递归。spring-doc.cadn.net.cn

mput作是List<String>object(即List的远程文件路径)。spring-doc.cadn.net.cn

版本 5.2 引入了chmod属性,该属性允许您在上传后更改远程文件权限。 您可以使用传统的 Unix 八进制格式(例如600仅允许文件所有者的读写)。 使用 Java 配置适配器时,您可以使用setChmodOctal("600")setChmod(0600). 仅当您的 FTP 服务器支持SITE CHMOD子命令。spring-doc.cadn.net.cn

使用rm命令

rm命令删除文件。spring-doc.cadn.net.cn

rm命令中没有选项。spring-doc.cadn.net.cn

rmoperation 为Boolean.TRUE如果删除成功,或者Boolean.FALSE否则。 这file_remoteDirectoryheader 提供远程目录,而file_remoteFileheader 提供文件名。spring-doc.cadn.net.cn

使用mv命令

mv命令移动文件。spring-doc.cadn.net.cn

mv命令中没有选项。spring-doc.cadn.net.cn

expression属性定义 “from” 路径,rename-expressionattribute 定义 “to” 路径。 默认情况下,rename-expressionheaders['file_renameTo']. 此表达式的计算结果不得为 null 或空String. 如有必要,将创建任何必要的远程目录。 结果消息的有效负载为Boolean.TRUE. 这file_remoteDirectoryheader 提供原始远程目录,而file_remoteFileheader 提供文件名。 新路径位于file_renameTo页眉。spring-doc.cadn.net.cn

从版本 5.5.6 开始,remoteDirectoryExpression可用于mv命令。 如果 “from” 文件不是完整的文件路径,则remoteDirectoryExpression用作远程目录。 这同样适用于“to”文件,例如,如果任务只是重命名某个目录中的远程文件。spring-doc.cadn.net.cn

有关 FTP 出站网关命令的其他信息

getmget命令支持local-filename-generator-expression属性。 它定义了一个 SPEL 表达式,用于在传输过程中生成本地文件的名称。 评估上下文的根对象是请求消息。 这remoteFileName变量,这对于mget,也可用 — 例如local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.something".spring-doc.cadn.net.cn

getmget命令支持local-directory-expression属性。 它定义了一个 SPEL 表达式,用于在传输过程中生成本地目录的名称。 评估上下文的根对象是请求消息 but. 这remoteDirectory变量,这对于mget,也可用 — 例如:local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.something". 此属性与local-directory属性。spring-doc.cadn.net.cn

对于所有命令,网关的 'expression' 属性提供命令作的路径。 对于mget命令,表达式的计算结果可能为 '',表示检索所有文件,或者 'somedirectory/',依此类推。spring-doc.cadn.net.cn

以下示例显示了为ls命令:spring-doc.cadn.net.cn

<int-ftp:outbound-gateway id="gateway1"
    session-factory="ftpSessionFactory"
    request-channel="inbound1"
    command="ls"
    command-options="-1"
    expression="payload"
    reply-channel="toSplitter"/>

发送到toSplitterchannel 是String对象,每个对象都包含一个文件名。 如果command-options属性,则FileInfo对象。 它使用以空格分隔的选项 — 例如command-options="-1 -dirs -links".spring-doc.cadn.net.cn

从版本 4.2 开始,GET,MGET,PUTMPUT命令支持FileExistsMode属性 (mode当使用命名空间支持时)。 这会影响本地文件存在 (GETMGET) 或远程文件存在 (PUTMPUT). 支持的模式包括REPLACE,APPEND,FAILIGNORE. 为了向后兼容,默认PUTMPUToperations 为REPLACE. 为GETMGET作,默认值为FAIL.spring-doc.cadn.net.cn

从版本 5.0 开始,setWorkingDirExpression() (working-dir-expressionin XML) 选项。FtpOutboundGateway (<int-ftp:outbound-gateway>在 XML 中)。 它允许您在运行时更改客户端工作目录。 根据请求消息计算表达式。 每次网关作后,都会恢复以前的工作目录。spring-doc.cadn.net.cn

使用 Java 配置进行配置

Spring 以下 Boot 应用程序显示了如何使用 Java 配置配置出站网关的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        sf.setTestSession(true);
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler handler() {
        FtpOutboundGateway ftpOutboundGateway =
                          new FtpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
        ftpOutboundGateway.setOutputChannelName("lsReplyChannel");
        return ftpOutboundGateway;
    }

}

使用 Java DSL 进行配置

Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置出站网关的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        sf.setTestSession(true);
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public FtpOutboundGatewaySpec ftpOutboundGateway() {
        return Ftp.outboundGateway(ftpSessionFactory(),
            AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
            .options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
            .regexFileNameFilter("(subFtpSource|.*1.txt)")
            .localDirectoryExpression("'localDirectory/' + #remoteDirectory")
            .localFilenameExpression("#remoteFileName.replaceFirst('ftpSource', 'localTarget')");
    }

    @Bean
    public IntegrationFlow ftpMGetFlow(AbstractRemoteFileOutboundGateway<FTPFile> ftpOutboundGateway) {
        return f -> f
            .handle(ftpOutboundGateway)
            .channel(c -> c.queue("remoteFileOutputChannel"));
    }

}

出站网关部分成功 (mgetmput)

当您对多个文件执行作时(通过使用mgetmput),则在传输一个或多个文件后的某个时间可能会发生异常。 在这种情况下(从版本 4.2 开始),一个PartialSuccessException被抛出。 和往常一样MessagingException属性 (failedMessagecause),则此异常具有两个附加属性:spring-doc.cadn.net.cn

这些属性允许您确定哪些文件已成功传输,哪些文件未成功传输。spring-doc.cadn.net.cn

对于递归mputPartialSuccessException可能已嵌套PartialSuccessException事件。spring-doc.cadn.net.cn

请考虑以下目录结构:spring-doc.cadn.net.cn

root/
|- file1.txt
|- subdir/
   | - file2.txt
   | - file3.txt
|- zoo.txt

如果异常发生在file3.txtPartialSuccessException由 gateway 抛出的derivedInputfile1.txt,subdirzoo.txtpartialResultsfile1.txt. 其cause是另一个PartialSuccessExceptionderivedInputfile2.txtfile3.txtpartialResultsfile2.txt.spring-doc.cadn.net.cn

FTP 会话缓存

从 Spring Integration 3.0 开始,默认情况下不再缓存会话。 这cache-sessions属性。 您必须使用CachingSessionFactory(如以下示例所示)如果您希望缓存会话。

在 3.0 之前的版本中,默认情况下会自动缓存会话。 一个cache-sessions属性可用于禁用自动缓存,但该解决方案未提供配置其他会话缓存属性的方法。 例如,您无法限制创建的会话数。 为了支持该要求和其他配置选项,一个CachingSessionFactory已添加。 它提供sessionCacheSizesessionWaitTimeout性能。 这sessionCacheSizeproperty 控制工厂在其缓存中维护多少个活动会话(默认为 unbounded)。 如果sessionCacheSize阈值,则任何获取另一个会话的尝试都会被阻止,直到其中一个缓存的会话变得可用或直到会话的等待时间到期(默认等待时间为Integer.MAX_VALUE). 这sessionWaitTimeoutproperty 配置该值。spring-doc.cadn.net.cn

如果您希望缓存会话,请按照前面所述配置默认会话工厂,然后将其包装在CachingSessionFactory,您可以在其中提供这些附加属性。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

<bean id="ftpSessionFactory" class="o.s.i.ftp.session.DefaultFtpSessionFactory">
    <property name="host" value="localhost"/>
</bean>

<bean id="cachingSessionFactory" class="o.s.i.file.remote.session.CachingSessionFactory">
    <constructor-arg ref="ftpSessionFactory"/>
    <constructor-arg value="10"/>
    <property name="sessionWaitTimeout" value="1000"/>
</bean>

前面的示例显示了一个CachingSessionFactory使用sessionCacheSize设置为10sessionWaitTimeout设置为 1 秒(其值以毫秒为单位)。spring-doc.cadn.net.cn

从 Spring Integration 3.0 开始,CachingConnectionFactory提供resetCache()方法。 调用时,所有空闲会话将立即关闭,而正在使用的会话将在返回到缓存时关闭。 对会话的新请求会根据需要建立新会话。spring-doc.cadn.net.cn

从版本 5.1 开始,CachingSessionFactory具有新属性testSession. 如果为 true,则将通过发送 NOOP 命令来测试会话,以确保它仍然处于活动状态;否则,将从缓存中删除它;如果缓存中没有活动会话,则会创建一个新会话。spring-doc.cadn.net.cn

RemoteFileTemplate

从 Spring Integration 3.0 开始,在FtpSession对象。 该模板提供了用于发送、检索(作为InputStream)、删除和重命名文件。 此外,一个execute方法允许调用方对 session 执行多个作。 在所有情况下,模板都会可靠地关闭会话。 有关更多信息,请参阅Javadoc 的RemoteFileTemplate. FTP 有一个子类:FtpRemoteFileTemplate.spring-doc.cadn.net.cn

版本 4.1 添加了其他方法,包括getClientInstance(),它提供对底层FTPClient因此,您可以访问低级 API。spring-doc.cadn.net.cn

并非所有 FTP 服务器都能正确实现STAT <path>命令。 有些 API 会为不存在的路径返回 positive 结果。 这NLSTcommand 可靠地返回路径为文件且存在的名称。 但是,这不支持检查是否存在空目录,因为NLST当 path 是 directory 时,始终返回空列表。 由于模板不知道路径是否代表目录,因此当路径似乎不存在时,它必须执行额外的检查(当使用NLST). 这会增加开销,需要向服务器发出多个请求。 从版本 4.1.9 开始,FtpRemoteFileTemplate提供FtpRemoteFileTemplate.ExistsMode属性,其中包含以下选项:spring-doc.cadn.net.cn

  • STAT:执行STATFTP 命令 (FTPClient.getStatus(path)) 检查路径是否存在。 这是默认设置,并要求 FTP 服务器正确支持STAT命令(带路径)。spring-doc.cadn.net.cn

  • NLST:执行NLSTFTP 命令 —FTPClient.listName(path). 如果要测试的路径是文件的完整路径,请使用此选项。 它不适用于空目录。spring-doc.cadn.net.cn

  • NLST_AND_DIRS:执行NLST命令,如果它没有返回任何文件,则回退到一种使用FTPClient.changeWorkingDirectory(path). 看FtpSession.exists()了解更多信息。spring-doc.cadn.net.cn

由于我们知道FileExistsMode.FAILcase 始终只查找文件(而不是目录),我们安全地使用NLSTmode 的FtpMessageHandlerFtpOutboundGateway组件。spring-doc.cadn.net.cn

对于任何其他情况,FtpRemoteFileTemplate可以扩展以在覆盖的exist()方法。spring-doc.cadn.net.cn

从版本 5.0 开始,新的RemoteFileOperations.invoke(OperationsCallback<F, T> action)方法可用。 此方法允许多个RemoteFileOperations调用在相同的线程绑定Session. 当您需要执行RemoteFileTemplate作为一个工作单元。 例如AbstractRemoteFileOutboundGateway将其与mputcommand 实现,其中我们执行put作,并递归地作其子目录。 有关更多信息,请参阅 Javadocspring-doc.cadn.net.cn

MessageSessionCallback

从 Spring Integration 4.2 开始,您可以使用MessageSessionCallback<F, T>implementation 替换为<int-ftp:outbound-gateway/> (FtpOutboundGateway在 Java 中)对Session<FTPFile>使用requestMessage上下文。 它可用于任何非标准或低级 FTP作,并允许从集成流定义和功能接口 (Lambda) 实施注入进行访问,如下例所示:spring-doc.cadn.net.cn

@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler ftpOutboundGateway(SessionFactory<FTPFile> sessionFactory) {
    return new FtpOutboundGateway(sessionFactory,
         (session, requestMessage) -> session.list(requestMessage.getPayload()));
}

另一个示例可能是对正在发送或检索的文件数据进行预处理或后处理。spring-doc.cadn.net.cn

使用 XML 配置时,<int-ftp:outbound-gateway/>提供session-callback属性,让您指定MessageSessionCallbackBean 名称。spring-doc.cadn.net.cn

session-callbackcommandexpression属性。 使用 Java 进行配置时,不同的构造函数在FtpOutboundGateway类。

Apache Mina FTP 服务器事件

ApacheMinaFtplet,在版本 5.2 中添加,侦听某些 Apache Mina FTP 服务器事件并将其发布为ApplicationEvents 可以被任何ApplicationListener@EventListenerbean 方法或 Event Inbound Channel Adapterspring-doc.cadn.net.cn

目前,支持的事件包括:spring-doc.cadn.net.cn

它们中的每一个都是ApacheMinaFtpEvent;您可以配置单个侦听器来接收所有事件类型。 这source属性是FtpSession,您可以从中获取客户端地址等信息;一个方便的getSession()method 在 abstract 事件上提供。spring-doc.cadn.net.cn

会话打开/关闭以外的事件具有另一个属性FtpRequest它具有 command 和 arguments 等属性。spring-doc.cadn.net.cn

要使用侦听器(必须是 Spring bean)配置服务器,请将其添加到服务器工厂中:spring-doc.cadn.net.cn

FtpServerFactory serverFactory = new FtpServerFactory();
...
ListenerFactory factory = new ListenerFactory();
...
serverFactory.addListener("default", factory.createListener());
serverFactory.setFtplets(new HashMap<>(Collections.singletonMap("springFtplet", apacheMinaFtpletBean)));
server = serverFactory.createServer();
server.start();

要使用 Spring 集成事件适配器来使用这些事件:spring-doc.cadn.net.cn

@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
    ApplicationEventListeningMessageProducer producer =
        new ApplicationEventListeningMessageProducer();
    producer.setEventTypes(ApacheMinaFtpEvent.class);
    producer.setOutputChannel(eventChannel());
    return producer;
}

远程文件信息

从版本 5.2 开始,FtpStreamingMessageSource (FTP 流入站通道适配器)、FtpInboundFileSynchronizingMessageSource (FTP 入站通道适配器)和 “read” 命令的FtpOutboundGateway (FTP 出站网关)在消息中提供其他标头以生成有关远程文件的信息:spring-doc.cadn.net.cn

由于FtpInboundFileSynchronizingMessageSource不会针对远程文件生成消息,但使用本地副本,AbstractInboundFileSynchronizer将有关远程文件的信息存储在MetadataStore(可在外部配置)在 URI 样式 (protocol://host:port/remoteDirectory#remoteFileName) 进行同步作。 此元数据由FtpInboundFileSynchronizingMessageSource轮询本地文件时。 删除本地文件时,建议删除其元数据条目。 这AbstractInboundFileSynchronizer提供removeRemoteFileMetadata()callback 来实现此目的。 此外,还有一个setMetadataStorePrefix()用于元数据键。 建议将此前缀与MetadataStore-基于FileListFilterimplementations,当相同的MetadataStoreinstance 在这些组件之间共享,以避免条目覆盖,因为 filter 和AbstractInboundFileSynchronizer对元数据条目键使用相同的本地文件名。spring-doc.cadn.net.cn