此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1! |
FTP 入站通道适配器是一个特殊的侦听器,它连接到 FTP 服务器并侦听远程目录事件(例如,创建的新文件),此时它会启动文件传输。
以下示例演示如何配置:inbound-channel-adapter
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
如前面的配置所示,您可以使用该元素配置 FTP 入站通道适配器,同时还为各种属性提供值,例如 (基于简单的模式匹配,而不是正则表达式)和对 .inbound-channel-adapter
local-directory
filename-pattern
session-factory
默认情况下,传输的文件与原始文件同名。
如果要重写此行为,可以设置该属性,该属性允许您提供 SpEL 表达式来生成本地文件的名称。
与出站网关和适配器不同,在出站网关和适配器中,SpEL 评估上下文的根对象是 ,此入站适配器在评估时尚未收到消息,因为这是它最终生成的消息,传输的文件作为其有效负载。
因此,SpEL 评估上下文的根对象是远程文件的原始名称 (a )。local-filename-generator-expression
Message
String
入站通道适配器首先检索本地目录的对象,然后根据轮询器配置发出每个文件。
从版本 5.0 开始,您现在可以限制在需要新文件检索时从 FTP 服务器获取的文件数。
当目标文件非常大时,或者当您在具有持久性文件列表筛选器的群集系统中运行时,这可能很有用,稍后将讨论。
用于此目的。
负值(默认值)表示没有限制,并且将检索所有匹配的文件。
有关详细信息,请参阅入站通道适配器:控制远程文件提取。
从 5.0 版开始,您还可以通过设置属性来提供自定义实现。File
max-fetch-size
DirectoryScanner
inbound-channel-adapter
scanner
从 Spring Integration 3.0 开始,您可以指定属性(默认为 )。
当 时,本地文件的修改时间戳设置为从服务器检索的值。
否则,它将设置为当前时间。preserve-timestamp
false
true
从版本 4.2 开始,您可以指定 而不是 ,以便动态确定每个轮询的目录,例如 .remote-directory-expression
remote-directory
remote-directory-expression="@myBean.determineRemoteDir()"
从版本 4.3 开始,您可以省略 and 属性。
它们默认为 。
在这种情况下,根据 FTP 协议,客户端工作目录用作默认的远程目录。remote-directory
remote-directory-expression
null
有时,基于使用该属性指定的简单模式进行文件筛选可能还不够。
如果是这种情况,可以使用该属性指定正则表达式(例如 )。
此外,如果需要完全控制,可以使用该属性并提供对 的任何自定义实现的引用,这是一个用于筛选文件列表的策略接口。
此筛选器确定检索哪些远程文件。
您还可以使用 .filename-pattern
filename-regex
filename-regex=".*\.test$"
filter
o.s.i.file.filters.FileListFilter
AcceptOnceFileListFilter
CompositeFileListFilter
将其状态存储在内存中。
如果您希望状态在系统重新启动后继续存在,请考虑改用 。
此筛选器将接受的文件名存储在策略的实例中(请参阅元数据存储)。
此筛选器与文件名和远程修改时间匹配。AcceptOnceFileListFilter
FtpPersistentAcceptOnceFileListFilter
MetadataStore
从版本 4.0 开始,此筛选器需要 .
当与共享数据存储(例如 )一起使用时,它允许在多个应用程序或服务器实例之间共享筛选器键。ConcurrentMetadataStore
Redis
RedisMetadataStore
从版本 5.0 开始,默认情况下将 with in-memory 应用于 .
此筛选器还与 XML 配置中的 or 选项以及 Java DSL 中的 with 一起应用。
任何其他用例都可以使用(或)进行管理。FtpPersistentAcceptOnceFileListFilter
SimpleMetadataStore
FtpInboundFileSynchronizer
regex
pattern
FtpInboundChannelAdapterSpec
CompositeFileListFilter
ChainFileListFilter
前面的讨论涉及在检索文件之前对文件进行筛选。
检索文件后,将对文件系统上的文件应用额外的筛选器。
默认情况下,如前所述,这是一个 which 保留内存中的状态,并且不考虑文件的修改时间。
除非应用程序在处理后删除文件,否则默认情况下,适配器将在应用程序重新启动后重新处理磁盘上的文件。AcceptOnceFileListFilter
此外,如果将 “使用”配置为“,并且远程文件时间戳会更改(导致重新提取),则默认的本地筛选器不会处理此新文件。filter
FtpPersistentAcceptOnceFileListFilter
有关此筛选器及其使用方式的详细信息,请参阅远程持久性文件列表筛选器。
您可以使用该属性来配置本地文件系统筛选器的行为。
从版本 4.3.8 开始,默认配置 a。
此筛选器将接受的文件名和修改的时间戳存储在策略的实例中(请参阅元数据存储),并检测对本地文件修改时间的更改。
缺省值为 ,它将状态存储在内存中。local-filter
FileSystemPersistentAcceptOnceFileListFilter
MetadataStore
MetadataStore
SimpleMetadataStore
从版本 4.1.5 开始,这些过滤器具有一个新属性 (),该属性会导致它们刷新
每次更新时的元数据存储(如果存储实现)。flushOnUpdate
Flushable
此外,如果使用分布式(如 Redis),则可以拥有同一适配器或应用程序的多个实例,并确保每个文件只处理一次。MetadataStore |
实际的本地筛选器是包含提供的筛选器和模式筛选器,该筛选器阻止处理正在下载的文件(基于 )。
使用此后缀下载文件(默认为 ),传输完成后,文件将重命名为其最终名称,使其对过滤器“可见”。CompositeFileListFilter
temporary-file-suffix
.writing
该属性允许您配置文件分隔符,以便在默认的“/”不适用于您的特定环境时使用。remote-file-separator
有关这些属性的更多详细信息,请参阅架构。
您还应该了解 FTP 入站通道适配器是轮询使用者。
因此,必须配置轮询器(通过使用全局默认值或本地子元素)。
传输文件后,将生成一条以 a 为有效负载的消息,并将其发送到该属性标识的通道。java.io.File
channel
从版本 6.2 开始,您可以使用 根据上次修改的策略过滤 FTP 文件。
可以使用属性配置此筛选器,以便筛选器仅传递早于此值的文件。
期限默认为 60 秒,但您应该选择一个足够大的期限,以避免过早拿起文件(例如,由于网络故障)。
查看其 Javadoc 以获取更多信息。FtpLastModifiedFileListFilter
age
此外,如果使用分布式(如 Redis),则可以拥有同一适配器或应用程序的多个实例,并确保每个文件只处理一次。MetadataStore |
有关文件筛选和不完整文件的更多信息
有时,刚刚出现在受监视(远程)目录中的文件并不完整。
通常,此类文件是使用临时扩展名(例如 )编写的,然后在写入过程完成后重命名。
在大多数情况下,您只对完整的文件感兴趣,并且只想筛选完整的文件。
若要处理这些方案,可以使用 、 和 属性提供的筛选支持。
以下示例使用自定义筛选器实现:somefile.txt.writing
filename-pattern
filename-regex
filter
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
入站 FTP 适配器的轮询器配置说明
入站 FTP 适配器的作业由两个任务组成:
-
与远程服务器通信,以便将文件从远程目录传输到本地目录。
-
对于每个传输的文件,生成一条消息,将该文件作为有效负载,并将其发送到由“channel”属性标识的通道。 这就是为什么它们被称为“通道适配器”而不仅仅是“适配器”的原因。 这种适配器的主要工作是生成要发送到消息通道的消息。 从本质上讲,第二个任务的优先级是,如果您的本地目录已经有一个或多个文件,它将首先从这些文件生成消息。 只有当所有本地文件都已处理完毕后,它才会启动远程通信以检索更多文件。
此外,在轮询器上配置触发器时,应密切注意该属性。
其默认值适用于所有实例(包括 FTP)。
这意味着,一旦处理了一个文件,它就会等待由触发器配置确定的下一个执行时间。
如果 中恰好有一个或多个文件,它将在启动与远程 FTP 服务器的通信之前处理这些文件。
此外,如果设置为(默认值),则它一次只处理一个文件,其间隔由触发器定义,实质上是“one-poll === one-file”。max-messages-per-poll
1
SourcePollingChannelAdapter
local-directory
max-messages-per-poll
1
对于典型的文件传输用例,您很可能希望采取相反的行为:处理每个轮询的所有文件,然后才等待下一次轮询。
如果是这种情况,请设置为 -1。
然后,在每次轮询时,适配器都会尝试生成尽可能多的消息。
换句话说,它处理本地目录中的所有内容,然后连接到远程目录以传输那里可用的所有内容以在本地处理。
只有这样,轮询操作才被视为完成,轮询器等待下一个执行时间。max-messages-per-poll
或者,也可以将“max-messages-per-poll”值设置为正值,该值指示每次轮询要从文件创建的消息的上限。
例如,值 of 表示在每次轮询时,它尝试处理不超过 10 个文件。10
从故障中恢复
了解适配器的体系结构非常重要。
有一个文件同步器用于获取文件,还有一个文件同步器为每个文件发出一条消息
同步文件。
如前所述,涉及两个过滤器。
属性(和模式)引用远程 (FTP) 文件列表,以避免获取已经
被取走了。
用于确定要作为消息发送的文件。FileReadingMessageSource
filter
local-filter
FileReadingMessageSource
同步器列出远程文件并查阅其筛选器。
然后传输文件。
如果在文件传输过程中发生 IO 错误,则将删除已添加到筛选器的任何文件,以便它们
有资格在下一次投票中被重新获取。
仅当筛选器实现(例如 )时,这才适用。ReversibleFileListFilter
AcceptOnceFileListFilter
如果在同步文件后,在处理文件的下游流中发生错误,则不会自动回滚筛选器,因此默认情况下不会重新处理失败的文件。
如果您希望在失败后重新处理此类文件,可以使用类似于以下内容的配置来方便 从筛选器中删除失败的文件:
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 .ResettableFileListFilter
从 V5.0 开始,入站通道适配器可以在本地构建与生成的本地文件名相对应的子目录。
这也可以是远程子路径。
为了能够根据层次结构支持以递归方式读取本地目录进行修改,您现在可以根据该算法提供带有新目录的内部目录。
有关更多信息,请参见 AbstractInboundFileSynchronizingMessageSource.setScanner()。
此外,您现在可以将 -based by using 选项切换到。
它还配置为所有实例,以对本地目录中的任何修改做出反应。
前面显示的重新处理示例基于从本地目录中删除文件时执行的 to 执行的内置功能。
有关详细信息,请参阅 WatchServiceDirectoryScanner
。FileReadingMessageSource
RecursiveDirectoryScanner
Files.walk()
AbstractInboundFileSynchronizingMessageSource
WatchService
DirectoryScanner
setUseWatchService()
WatchEventType
FileReadingMessageSource.WatchServiceDirectoryScanner
ResettableFileListFilter.remove()
StandardWatchEventKinds.ENTRY_DELETE
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}