文件支持
文件支持
Spring 集成的文件支持通过专用词汇表扩展了 Spring 集成核心,以处理读取、写入和转换文件。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-file:6.0.9"
它提供了一个名称空间,该名称空间允许元素定义专用于文件的通道适配器,并支持可以将文件内容读取到字符串或字节数组中的转换器。
本节介绍了FileReadingMessageSource
和FileWritingMessageHandler
以及如何将它们配置为 bean。
它还讨论了通过特定于文件的实现来处理文件的支持Transformer
.
最后,它解释了特定于文件的命名空间。
读取文件
一个FileReadingMessageSource
可用于使用文件系统中的文件。
这是MessageSource
,从文件系统目录创建消息。
以下示例显示如何配置FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
要防止为某些文件创建消息,您可以提供FileListFilter
.
默认情况下,我们使用以下筛选器:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
这IgnoreHiddenFileListFilter
确保不处理隐藏文件。
请注意,hidden 的确切定义取决于系统。
例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏文件。
另一方面,Microsoft Windows 有一个专用的 file 属性来指示隐藏文件。
版本 4.2 引入了 |
这AcceptOnceFileListFilter
确保仅从目录中选取文件一次。
这 从 4.0 版本开始,此过滤器需要一个 从 4.1.5 版本开始,此过滤器具有一个新属性 ( |
持久文件列表过滤器现在具有布尔属性forRecursion
.
将此属性设置为true
,还会设置alwaysAcceptDirectories
,这意味着出站网关 (ls
和mget
) 现在每次都始终遍历完整的目录树。
这是为了解决未检测到目录树深处更改的问题。
另外forRecursion=true
使文件的完整路径用作元数据存储键;这解决了以下问题:如果具有相同名称的文件在不同目录中多次出现,则过滤器无法正常工作。
重要说明:这意味着对于顶级目录下的文件,将无法找到持久性元数据存储中的现有键。
因此,该属性为false
默认情况下;这可能会在未来版本中更改。
以下示例将FileReadingMessageSource
使用滤镜:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的常见问题是,文件可能在准备就绪之前被检测到(即,其他进程可能仍在写入该文件)。
默认的AcceptOnceFileListFilter
不会阻止这种情况。
在大多数情况下,如果文件写入过程在准备好读取每个文件后立即重命名每个文件,则可以防止这种情况。
一个filename-pattern
或filename-regex
过滤器,它只接受准备好的文件(可能基于已知的后缀),由默认的AcceptOnceFileListFilter
允许这种情况。
这CompositeFileListFilter
启用合成,如下例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,则 Spring 集成提供了另一种选择。
版本 4.2 添加了LastModifiedFileListFilter
.
此过滤器可以使用age
属性,以便过滤器仅传递早于此值的文件。
该年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早获取文件(例如,由于网络故障)。
以下示例显示如何配置LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从版本 4.3.7 开始,ChainFileListFilter
(的扩展CompositeFileListFilter
) 以允许后续过滤器只能看到前一个过滤器的结果。
(使用CompositeFileListFilter
,则所有筛选器都会看到所有文件,但它只传递已通过所有筛选器的文件。
需要新行为的一个示例是LastModifiedFileListFilter
和AcceptOnceFileListFilter
,当我们在一段时间后才希望接受文件时。
使用CompositeFileListFilter
,由于AcceptOnceFileListFilter
在第一个传递时看到所有文件,当另一个过滤器稍后传递时,它不会传递它。
这CompositeFileListFilter
当模式过滤器与自定义过滤器结合使用时,方法非常有用,该过滤器查找辅助文件以指示文件传输已完成。
模式过滤器可能只传递主文件(例如something.txt
),但 “done” 过滤器需要查看 if(例如)something.done
存在。
假设我们有文件a.txt
,a.done
和b.txt
.
模式筛选器仅通过a.txt
和b.txt
,而 “done” 过滤器只能看到所有三个文件并传递a.txt
.
复合过滤器的最终结果是,只有a.txt
发布。
使用ChainFileListFilter ,如果链中的任何过滤器返回空列表,则不会调用其余过滤器。 |
版本 5.0 引入了ExpressionFileListFilter
对文件执行 SPEL 表达式作为上下文评估根对象。
为此,所有用于文件处理的 XML 组件(本地和远程)以及现有的filter
属性,已随filter-expression
选项,如下例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
版本 5.0.5 引入了DiscardAwareFileListFilter
对 rejected files 感兴趣的实现。
为此,这样的过滤器实现应该通过addDiscardCallback(Consumer<File>)
.
在框架中,此功能从FileReadingMessageSource.WatchServiceDirectoryScanner
,与LastModifiedFileListFilter
.
与常规DirectoryScanner
这WatchService
根据目标文件系统上的事件提供要处理的文件。
在轮询包含这些文件的内部队列时,LastModifiedFileListFilter
可能会丢弃它们,因为它们相对于其配置的age
.
因此,我们将丢失该文件以备将来可能的考虑。
discard 回调钩子允许我们将文件保留在内部队列中,以便可以根据age
在随后的民意调查中。
这CompositeFileListFilter
还实现了一个DiscardAwareFileListFilter
并将 discard 回调填充到其所有DiscardAwareFileListFilter
代表。
因为CompositeFileListFilter 将文件与所有委托匹配,则discardCallback 可能会对同一个文件调用多次。 |
从版本 5.1 开始,FileReadingMessageSource
不检查目录是否存在,并且在其start()
调用(通常通过包装SourcePollingChannelAdapter
).
以前,在引用目录时(例如,从测试中引用)或稍后应用权限时,没有简单的方法来防止作系统权限错误。
消息报头
从版本 5.0 开始,FileReadingMessageSource
(除了payload
作为轮询File
) 将以下标头填充到出站Message
:
-
FileHeaders.FILENAME
:这File.getName()
的 file 中。 可用于后续的重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE
:这File
对象本身。 通常,当我们丢失原始File
对象。 但是,为了与任何其他自定义用例保持一致和方便,此标头可用于访问原始文件。 -
FileHeaders.RELATIVE_PATH
:引入了一个新的标头,用于表示文件路径相对于扫描的根目录的部分。 当要求在其他地方恢复源目录层次结构时,此标头可能很有用。 为此,DefaultFileNameGenerator
(请参阅“'生成文件名)可以配置为使用此标头。
目录扫描和轮询
这FileReadingMessageSource
不会立即为目录中的文件生成消息。
它使用内部队列来存储scanner
.
这scanEachPoll
选项用于确保在每次轮询时使用最新的输入目录内容刷新内部队列。
默认情况下 (scanEachPoll = false
)、FileReadingMessageSource
清空其队列,然后再次扫描目录。
此默认行为对于减少对目录中大量文件的扫描特别有用。
但是,在需要自定义排序的情况下,请务必考虑将此标志设置为true
.
文件的处理顺序可能与预期不符。
默认情况下,队列中的文件以其自然 (path
) 订单。
通过扫描添加的新文件,即使队列中已有文件,也会入到适当的位置,以保持该自然顺序。
要自定义顺序,FileReadingMessageSource
可以接受Comparator<File>
作为构造函数参数。
它由内部 (PriorityBlockingQueue
) 以根据业务要求对其内容进行重新排序。
因此,要按特定顺序处理文件,您应该为FileReadingMessageSource
而不是对自定义生成的列表进行排序DirectoryScanner
.
引入 5.0 版RecursiveDirectoryScanner
执行文件树访问。
该实现基于Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能性。
根目录 (DirectoryScanner.listFiles(File)
) 参数。
所有其他子目录包含和排除项都基于目标FileListFilter
实现。
例如,SimplePatternFileListFilter
默认情况下,会过滤掉目录。
看AbstractDirectoryAwareFileListFilter
及其实现了解更多信息。
从版本 5.5 开始,FileInboundChannelAdapterSpec 的 Java DSL 具有recursive(boolean) 选项以使用RecursiveDirectoryScanner 在目标FileReadingMessageSource 而不是默认的。 |
命名空间支持
通过使用特定于文件的命名空间,可以简化文件读取的配置。 为此,请使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间中,您可以减少FileReadingMessageSource
并将其包装在入站 Channel Adapter 中,如下所示:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的FileListFilter
实现:
-
IgnoreHiddenFileListFilter
(不处理隐藏文件) -
AcceptOnceFileListFilter
(防止重复)
因此,您也可以省略prevent-duplicates
和ignore-hidden
属性,因为它们是true
默认情况下。
Spring Integration 4.2 引入了 |
第二个通道适配器示例使用自定义过滤器,第三个示例使用filename-pattern
属性以添加AntPathMatcher
based 过滤器,第四个使用filename-regex
属性,将基于正则表达式模式的筛选条件添加到FileReadingMessageSource
.
这filename-pattern
和filename-regex
属性都与常规的filter
reference 属性。
但是,您可以使用filter
属性来引用CompositeFileListFilter
它组合了任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。
当多个进程从同一目录读取数据时,您可能希望锁定文件以防止它们被并发选取。
为此,您可以使用FileLocker
.
有一个java.nio
,但也可以实现自己的锁定方案。
这nio
可以按如下方式注入 Locker:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
您可以按如下方式配置自定义保险箱:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了locker时,它负责在允许接收文件之前获取锁。
它不承担解锁文件的责任。
如果已处理文件并保持锁挂起,则存在内存泄漏。
如果这是一个问题,您应该调用FileLocker.unlock(File file) 你自己在适当的时候。 |
当筛选和锁定文件还不够时,您可能需要完全控制文件的列出方式。
要实现此类要求,您可以使用DirectoryScanner
.
此扫描程序可让您准确确定每个轮询中列出的文件。
这也是 Spring Integration 在内部用来连接FileListFilter
instances 和FileLocker
到FileReadingMessageSource
.
您可以注入自定义DirectoryScanner
到<int-file:inbound-channel-adapter/>
在scanner
属性,如下例所示:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做可以让您完全自由地选择排序、列表和锁定策略。
了解过滤器(包括patterns
,regex
,prevent-duplicates
等)和locker
实例实际上由scanner
.
在适配器上设置的这些属性中的任何一个随后都会注入到内部的scanner
.
对于外部scanner
,则FileReadingMessageSource
.
必须在该自定义中指定它们(如果需要)DirectoryScanner
.
换句话说,如果您注入scanner
到FileReadingMessageSource
,您应该提供filter
和locker
在那个scanner
,而不是在FileReadingMessageSource
.
默认情况下,DefaultDirectoryScanner 使用IgnoreHiddenFileListFilter 以及一个AcceptOnceFileListFilter .
为了防止使用它们,您可以配置自己的过滤器(例如AcceptAllFileListFilter ),甚至将其设置为null . |
WatchServiceDirectoryScanner
这FileReadingMessageSource.WatchServiceDirectoryScanner
在将新文件添加到目录时依赖于文件系统事件。
在初始化期间,将注册目录以生成事件。
初始文件列表也是在初始化期间构建的。
在遍历目录树时,遇到的任何子目录也会被注册以生成事件。
在第一次轮询时,将返回遍历目录的初始文件列表。
在后续轮询中,将返回来自新创建事件的文件。
如果添加了新的子目录,则其 creation 事件用于遍历新子树以查找现有文件并注册找到的任何新子目录。
存在问题WatchKey 当其内部事件queue 不会在目录修改事件发生时被程序耗尽。
如果超出队列大小,则StandardWatchEventKinds.OVERFLOW ,表示部分文件系统事件可能会丢失。
在这种情况下,将完全重新扫描根目录。
为避免重复,请考虑使用适当的FileListFilter (例如AcceptOnceFileListFilter ) 或在处理完成时删除文件。 |
这WatchServiceDirectoryScanner
可以通过FileReadingMessageSource.use-watch-service
选项,它与scanner
选择。
内部FileReadingMessageSource.WatchServiceDirectoryScanner
实例为提供的directory
.
此外,现在WatchService
轮询逻辑可以跟踪StandardWatchEventKinds.ENTRY_MODIFY
和StandardWatchEventKinds.ENTRY_DELETE
.
如果您需要跟踪现有文件和新文件的修改,则应实现ENTRY_MODIFY
events 逻辑FileListFilter
.
否则,将以相同的方式处理这些事件中的文件。
这ResettableFileListFilter
implementation 拾取ENTRY_DELETE
事件。
因此,他们的文件是为remove()
操作。
启用此事件后,过滤器(如AcceptOnceFileListFilter
删除文件。
因此,如果出现同名文件,它将通过过滤器并作为消息发送。
为此,watch-events
属性 (FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
) 已引入。
(WatchEventType
是FileReadingMessageSource
.)
使用此选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他一些 logic 。
以下示例显示了如何为同一目录中的 create 和 modify 事件配置不同的逻辑:
值得一提的是,ENTRY_DELETE
事件涉及被监视目录的子目录的重命名作。
更具体地说,ENTRY_DELETE
event 的 event(与上一个目录名称相关)位于ENTRY_CREATE
事件,该事件通知新的(重命名的)目录。
在某些作系统(如 Windows)上,ENTRY_DELETE
event 必须注册才能处理这种情况。
否则,在文件资源管理器中重命名监视的子目录可能会导致在该子目录中无法检测到新文件。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
限制内存消耗
您可以使用HeadDirectoryScanner
以限制内存中保留的文件数。
这在扫描大型目录时非常有用。
使用 XML 配置时,可以通过设置queue-size
入站通道适配器上的属性。
在版本 4.2 之前,此设置与任何其他过滤器的使用不兼容。
任何其他筛选条件(包括prevent-duplicates="true"
) 覆盖用于限制大小的筛选条件。
使用 通常,不使用 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
'tail'ing 文件
另一个常见的用例是从文件的末尾(或尾部)获取 'lines',并在添加新行时捕获新行。
提供了两种实现。
第一个OSDelegatingFileTailingMessageProducer
使用本机tail
命令(在具有 COMMAND 的作系统上)。
这通常是这些平台上最有效的实现。
对于没有tail
command 的 app 中,第二个实现ApacheCommonsFileTailingMessageProducer
使用 Apachecommons-io
Tailer
类。
在这两种情况下,文件系统事件(例如文件不可用和其他事件)都发布为ApplicationEvent
实例。
此类事件的示例包括:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
例如,在旋转文件时,可能会发生前面示例中显示的事件序列。
从版本 5.0 开始,FileTailingIdleEvent
在idleEventInterval
.
以下示例显示了此类事件的外观:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持tail 命令提供这些状态消息。 |
从这些终端节点发出的消息具有以下标头:
-
FileHeaders.ORIGINAL_FILE
:这File
对象 -
FileHeaders.FILENAME
:文件名 (File.getName()
)
在 5.0 版之前的版本中,FileHeaders.FILENAME header 包含文件绝对路径的字符串表示形式。
现在,您可以通过调用getAbsolutePath() 在原始文件头上。 |
以下示例使用默认选项 ('-F -n 0',表示从当前端开始遵循文件名) 创建本机适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例使用 '-F -n +0' 选项(表示遵循文件名,发出所有现有行)创建本机适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果tail
命令失败(在某些平台上,缺少文件会导致tail
失败,即使使用-F
指定),则每 10 秒重试一次该命令。
默认情况下,本机适配器从标准输出捕获并将内容作为消息发送。
他们还从标准错误中捕获以引发事件。
从版本 4.3.6 开始,您可以通过设置enable-status-reader
自false
,如下例所示:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval
设置为5000
,这意味着,如果 5 秒内没有写入任何行,则FileTailingIdleEvent
每 5 秒触发一次:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
当您需要停止适配器时,这可能很有用。
以下示例创建一个 Apachecommons-io
Tailer
适配器,它每两秒检查一次文件是否有新行,并每十秒检查一次是否存在缺失的文件:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | 文件从开头 (end="false" ) 而不是 end(这是默认值)。 |
2 | 将为每个块重新打开文件(默认为保持文件打开)。 |
指定delay ,end 或reopen attributes 强制使用 Apachecommons-io 适配器,并使native-options 属性不可用。 |
写入文件
要将消息写入文件系统,您可以使用FileWritingMessageHandler
.
此类可以处理以下有效负载类型:
-
File
-
String
-
字节数组
-
InputStream
(自版本 4.2 起)
对于 String 负载,您可以配置编码和 charset。
为了简化作,您可以配置FileWritingMessageHandler
作为出站通道适配器或出站网关的一部分,通过使用 XML 名称空间。
从版本 4.3 开始,您可以指定写入文件时要使用的缓冲区大小。
从版本 5.1 开始,您可以提供BiConsumer<File, Message<?>>
newFileCallback
如果您使用FileExistsMode.APPEND
或FileExistsMode.APPEND_NO_FLUSH
并且必须创建一个新文件。
此回调接收新创建的文件以及触发该文件的消息。
例如,此回调可用于编写消息标头中定义的 CSV 标头。
生成文件名
在最简单的形式中,FileWritingMessageHandler
只需要一个目标目录来写入文件。
要写入的文件的名称由处理程序的FileNameGenerator
.
默认实现会查找其 key 与定义为FileHeaders.FILENAME
.
或者,您可以指定要根据消息计算的表达式以生成文件名 — 例如headers['myCustomHeader'] + '.something'
.
表达式的计算结果必须为String
.
为方便起见,DefaultFileNameGenerator
还提供setHeaderName
方法,允许您显式指定其值将用作文件名的消息标头。
设置完成后,DefaultFileNameGenerator
采用以下解析步骤来确定给定消息有效负载的文件名:
-
根据消息评估表达式,如果结果为非空
String
,将其用作文件名。 -
否则,如果有效负载是
java.io.File
,请使用File
对象的文件名。 -
否则,请使用附加了 .
msg
作为文件名。
当您使用 XML 名称空间支持时,文件出站通道适配器和文件出站网关都支持以下互斥的配置属性:
-
filename-generator
(对FileNameGenerator
implementation) -
filename-generator-expression
(计算结果为String
)
在写入文件时,将使用临时文件后缀(其默认值为.writing
).
在写入文件时,它会附加到文件名中。
要自定义后缀,您可以设置temporary-file-suffix
File outbound Channel Adapter 和 File Outbound Gateway 上的属性。
使用APPEND 文件mode 这temporary-file-suffix 属性,因为数据是直接附加到文件中的。 |
从版本 4.2.5 开始,生成的文件名(作为filename-generator
或filename-generator-expression
evaluation) 可以表示子路径和目标文件名。
它用作File(File parent, String child)
如故。
但是,在过去,我们没有创建 (mkdirs()
) 目录中,仅假定文件名。
当我们需要恢复文件系统树以匹配源目录时,这种方法非常有用,例如,解压缩存档并按原始顺序保存目标目录中的所有文件时。
指定输出目录
文件出站通道适配器和文件出站网关都提供了两个互斥的配置属性,用于指定输出目录:
-
directory
-
directory-expression
Spring Integration 2.2 引入了directory-expression 属性。 |
使用directory
属性
当您使用directory
属性,则输出目录将设置为固定值,该值在FileWritingMessageHandler
已初始化。
如果不指定此属性,则必须使用directory-expression
属性。
使用directory-expression
属性
如果您想获得完整的 SPEL 支持,可以使用directory-expression
属性。
此属性接受一个 SPEL 表达式,该表达式针对正在处理的每条消息进行评估。
因此,当您动态指定输出文件目录时,您对消息的有效负载及其标头具有完全访问权限。
SPEL 表达式必须解析为String
,java.io.File
或org.springframework.core.io.Resource
.
(后者被评估为File
总之。
此外,由此产生的String
或File
必须指向一个目录。
如果未指定directory-expression
属性,则必须设置directory
属性。
使用auto-create-directory
属性
默认情况下,如果目标目录不存在,则会自动创建相应的目标目录和任何不存在的父目录。
要防止这种行为,您可以将auto-create-directory
属性设置为false
.
此属性适用于directory
和directory-expression
属性。
使用 现在,对正在处理的每条消息执行此检查,而不是在初始化适配器时检查目标目录是否存在。 此外,如果 |
处理现有目标文件
当您写入文件并且目标文件已存在时,默认行为是覆盖该目标文件。
您可以通过设置mode
属性。
存在以下选项:
-
REPLACE
(默认) -
REPLACE_IF_MODIFIED
-
APPEND
-
APPEND_NO_FLUSH
-
FAIL
-
IGNORE
Spring Integration 2.2 引入了mode 属性和APPEND ,FAIL 和IGNORE 选项。 |
REPLACE
-
如果目标文件已存在,则将其覆盖。 如果
mode
属性,这是写入文件时的默认行为。 REPLACE_IF_MODIFIED
-
如果目标文件已存在,则仅当上次修改的时间戳与源文件的时间戳不同时,才会覆盖该目标文件。 为
File
payloads,则 payloadlastModified
将 time 与现有文件进行比较。 对于其他负载,FileHeaders.SET_MODIFIED
(file_setModified
) 标头与现有文件进行比较。 如果标头缺失或具有不是Number
,则始终会替换该文件。 APPEND
-
此模式允许您将消息内容附加到现有文件,而不是每次都创建新文件。 请注意,此属性与
temporary-file-suffix
属性,因为当它将内容附加到现有文件时,适配器不再使用临时文件。 文件在每条消息后关闭。 APPEND_NO_FLUSH
-
此选项与
APPEND
,但不会刷新数据,也不会在每条消息后关闭文件。 这可以提供显著的性能,但在发生故障时有丢失数据的风险。 看使用 时刷新文件APPEND_NO_FLUSH
了解更多信息。 FAIL
-
如果目标文件存在,则
MessageHandlingException
被抛出。 IGNORE
-
如果目标文件存在,则消息负载将被静默忽略。
使用临时文件后缀(默认值为.writing )、IGNORE 选项(如果最终文件名或临时文件名存在)适用。 |
使用 时刷新文件APPEND_NO_FLUSH
这APPEND_NO_FLUSH
mode 是在版本 4.3 中添加的。
使用它可以提高性能,因为文件不会在每条消息后关闭。
但是,如果发生故障,这可能会导致数据丢失。
Spring 集成提供了几种刷新策略来减轻这种数据丢失:
-
用
flushInterval
. 如果文件在这段时间内未写入,则会自动刷新该文件。 这是近似值,可能取决于1.33x
这一次(平均1.167x
). -
将包含正则表达式的消息发送到消息处理程序的
trigger
方法。 具有与模式匹配的绝对路径名的文件将被刷新。 -
为处理程序提供自定义
MessageFlushPredicate
实现来修改将消息发送到trigger
方法。 -
调用处理程序的
flushIfNeeded
方法,方法是传入自定义FileWritingMessageHandler.FlushPredicate
或FileWritingMessageHandler.MessageFlushPredicate
实现。
将为每个打开的文件调用谓词。 有关更多信息,请参阅这些接口的 Javadoc。 请注意,从版本 5.0 开始,谓词方法提供了另一个参数:当前文件的首次写入时间(如果是新的或以前的关闭)。
使用flushInterval
,则间隔从上次写入开始。
仅当文件在时间间隔内处于空闲状态时,才会刷新该文件。
从版本 4.3.7 开始,一个额外的属性 (flushWhenIdle
) 可以设置为false
,这意味着该间隔从第一次写入以前刷新的(或新)文件开始。
文件时间戳
默认情况下,目标文件的lastModified
timestamp 是创建文件的时间(就地重命名保留当前时间戳除外)。
从版本 4.3 开始,您现在可以配置preserve-timestamp
(或setPreserveTimestamp(true)
使用 Java 配置时)。
为File
payloads 的 Payloads,这会将时间戳从入站文件传输到出站文件(无论是否需要副本)。
对于其他负载,如果FileHeaders.SET_MODIFIED
标头 (file_setModified
) 的 API 中,它用于设置目标文件的lastModified
timestamp,只要标头是Number
.
文件权限
从版本 5.0 开始,在将文件写入支持 Posix 权限的文件系统时,您可以在出站通道适配器或网关上指定这些权限。
该属性是一个整数,通常以熟悉的八进制格式提供 — 例如0640
,这意味着所有者具有读/写权限,组具有只读权限,其他人没有访问权限。
文件出站通道适配器
下面的示例配置文件出站通道适配器:
<int-file:outbound-channel-adapter id="filesOut" directory="${input.directory.property}"/>
基于命名空间的配置还支持delete-source-files
属性。
如果设置为true
,它会在写入目标后触发对原始源文件的删除。
该标志的默认值为false
.
以下示例演示如何将其设置为true
:
<int-file:outbound-channel-adapter id="filesOut"
directory="${output.directory}"
delete-source-files="true"/>
这delete-source-files 属性仅在入站消息具有File payload 的 intent 或FileHeaders.ORIGINAL_FILE header 值包含 sourceFile 实例或String 表示原始文件路径。 |
从版本 4.2 开始,FileWritingMessageHandler
支持append-new-line
选择。
如果设置为true
,则在写入消息后,将向文件附加一个新行。
默认属性值为false
.
以下示例演示如何使用append-new-line
选择:
<int-file:outbound-channel-adapter id="newlineAdapter"
append-new-line="true"
directory="${output.directory}"/>
出站网关
如果要继续根据写入的文件处理消息,可以使用outbound-gateway
相反。
它的作用类似于outbound-channel-adapter
.
但是,在写入文件后,它还会将其作为消息的有效负载发送到回复通道。
以下示例配置出站网关:
<int-file:outbound-gateway id="mover" request-channel="moveInput"
reply-channel="output"
directory="${output.directory}"
mode="REPLACE" delete-source-files="true"/>
如前所述,您还可以指定mode
属性,它定义如何处理目标文件已存在的情况的行为。
有关更多详细信息,请参阅 Dealing with Existing Destination Files。
通常,在使用文件出站网关时,结果文件将作为回复通道上的消息负载返回。
这在指定IGNORE
模式。
在这种情况下,将返回预先存在的目标文件。
如果请求消息的负载是一个文件,您仍然可以通过消息标头访问该原始文件。
请参阅 FileHeaders.ORIGINAL_FILE。
“outbound-gateway” 在您希望首先移动文件,然后通过处理管道发送文件的情况下效果很好。
在这种情况下,您可以将文件命名空间的inbound-channel-adapter 元素添加到outbound-gateway 然后连接该网关的reply-channel 拖动到管道的开头。 |
如果您有更详细的要求或需要支持其他有效负载类型作为要转换为文件内容的输入,则可以扩展FileWritingMessageHandler
,但更好的选择是依赖Transformer
.
使用 Java 配置进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class FileWritingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.writeToFile("foo.txt", new File(tmpDir.getRoot(), "fileWritingFlow"), "foo");
}
@Bean
@ServiceActivator(inputChannel = "writeToFileChannel")
public MessageHandler fileWritingMessageHandler() {
Expression directoryExpression = new SpelExpressionParser().parseExpression("headers.directory");
FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
handler.setFileExistsMode(FileExistsMode.APPEND);
return handler;
}
@MessagingGateway(defaultRequestChannel = "writeToFileChannel")
public interface MyGateway {
void writeToFile(@Header(FileHeaders.FILENAME) String fileName,
@Header(FileHeaders.FILENAME) File directory, String data);
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class FileWritingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MessageChannel fileWritingInput = context.getBean("fileWritingInput", MessageChannel.class);
fileWritingInput.send(new GenericMessage<>("foo"));
}
@Bean
public IntegrationFlow fileWritingFlow() {
return IntegrationFlow.from("fileWritingInput")
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.txt")
.header("directory", new File(tmpDir.getRoot(), "fileWritingFlow")))
.handle(Files.outboundGateway(m -> m.getHeaders().get("directory")))
.channel(MessageChannels.queue("fileWritingResultChannel"))
.get();
}
}
文件转换器
要将从文件系统读取的数据转换为对象,反之亦然,您需要做一些工作。
与FileReadingMessageSource
在较小程度上FileWritingMessageHandler
,则可能需要自己的机制来完成工作。
为此,您可以实现Transformer
接口。
或者,您可以扩展AbstractFilePayloadTransformer
用于入站消息。
Spring 集成提供了一些明显的实现。
请参阅Javadoc 的Transformer
接口以查看哪些 Spring Integration 类实现了它。
同样,您可以检查Javadoc 的AbstractFilePayloadTransformer
类以查看哪些 Spring Integration 类扩展了它。
FileToByteArrayTransformer
延伸AbstractFilePayloadTransformer
并转换File
object 转换为byte[]
通过使用 Spring 的FileCopyUtils
.
通常,使用一系列 transformer 比将所有转换放在一个类中要好。
在这种情况下,File
自byte[]
转换可能是合乎逻辑的第一步。
FileToStringTransformer
延伸AbstractFilePayloadTransformer
将File
object 设置为String
.
如果不出意外,这对调试很有用(考虑将其与 wire tap 一起使用)。
要配置特定于文件的转换器,可以使用 file 命名空间中的相应元素,如下例所示:
<int-file:file-to-bytes-transformer input-channel="input" output-channel="output"
delete-files="true"/>
<int-file:file-to-string-transformer input-channel="input" output-channel="output"
delete-files="true" charset="UTF-8"/>
这delete-files
option 向转换器发出信号,表示它应该在转换完成后删除入站文件。
这绝不能替代使用AcceptOnceFileListFilter
当FileReadingMessageSource
正在多线程环境中使用(例如,当您通常使用 Spring 集成时)。
文件拆分器
这FileSplitter
在版本 4.1.2 中添加,其命名空间支持在版本 4.2 中添加。
这FileSplitter
将文本文件拆分为单独的行,基于BufferedReader.readLine()
.
默认情况下,拆分器使用Iterator
在从文件中读取行时一次发出一行。
设置iterator
property 设置为false
使它在将所有行作为消息发出之前将其读入内存。
一个用例可能是,您希望在发送任何包含行的消息之前检测文件上的 I/O 错误。
但是,它仅适用于相对较短的文件。
入站有效负载可以是File
,String
(一个File
path)、InputStream
或Reader
.
其他负载类型保持不变。
以下清单显示了配置FileSplitter
:
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
1 | splitter 的 bean 名称。 |
2 | 设置为true (默认值)来使用迭代器,或者false 在发送行之前将文件加载到内存中。 |
3 | 设置为true 在文件数据之前和之后发出文件开始和文件结束标记消息。
标记是带有FileSplitter.FileMarker payloads(使用START 和END 值中的mark 属性)。
在筛选某些行的下游流中按顺序处理文件时,可以使用标记。
它们使下游处理能够知道文件何时被完全处理。
此外,一个file_marker 标头,其中包含START 或END 已添加到这些消息中。
这END marker 包括行数。
如果文件为空,则仅START 和END 标记使用0 作为lineCount .
默认值为false .
什么时候true ,apply-sequence 是false 默认情况下。
另请参阅markers-json (next 属性)。 |
4 | 什么时候markers 为 true,请将此项设置为true 以将FileMarker 对象转换为 JSON 字符串。
(使用SimpleJsonSerializer 下面)。 |
5 | 设置为false 要禁用sequenceSize 和sequenceNumber 消息中的标头。
默认值为true 除非markers 是true .
什么时候true 和markers 是true ,则标记将包含在排序中。
什么时候true 和iterator 是true 这sequenceSize header 设置为0 ,因为大小未知。 |
6 | 设置为true 以引起RequiresReplyException 如果文件中没有行,则引发。
默认值为false . |
7 | 设置将文本数据读入时使用的字符集名称String 负载。
默认值为 platform charset。 |
8 | 在为其余行发出的消息中作为标题携带的第一行的标题名称。 从 5.0 版本开始。 |
9 | 设置用于将消息发送到拆分器的输入通道。 |
10 | 设置将消息发送到的输出通道。 |
11 | 设置发送超时。
仅当output-channel can 阻止 — 例如完整的QueueChannel . |
12 | 设置为false 以禁用在刷新上下文时自动启动 Splitter。
默认值为true . |
13 | 如果input-channel 是一个<publish-subscribe-channel/> . |
14 | 设置拆分器的启动阶段(在auto-startup 是true ). |
这FileSplitter
还会拆分任何基于文本的InputStream
转换为行。
从版本 4.3 开始,当与 FTP 或 SFTP 流入站通道适配器或者使用stream
选项来检索文件,则当文件被完全消耗时,拆分器会自动关闭支持流的会话
有关这些工具的更多信息,请参阅 FTP Streaming Inbound Channel Adapter 和 SFTP Streaming Inbound Channel Adapter 以及 FTP Outbound Gateway 和 SFTP Outbound Gateway。
使用 Java 配置时,可以使用其他构造函数,如下例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
什么时候markersJson
为 true,则标记表示为 JSON 字符串(使用SimpleJsonSerializer
).
版本 5.0 引入了firstLineAsHeader
选项以指定内容的第一行是标题(如 CSV 文件中的列名)。
传递给此属性的参数是标头名称,在该名称下,第一行作为其余行发出的消息中的标头。
此行不包含在序列标头中(如果applySequence
为 true),也不在lineCount
关联FileMarker.END
.
注意:从版本 5.5 开始,lineCount' 也作为FileHeaders.LINE_COUNT
到FileMarker.END
消息,由于FileMarker
可以序列化为 JSON。
如果文件仅包含标题行,则该文件将被视为空,因此仅FileMarker
在拆分期间发出实例(如果启用了标记 — 否则,不会发出任何消息)。
默认情况下(如果未设置标头名称),第一行被视为 data 并成为第一个发出的消息的有效负载。
如果您需要有关从文件内容中提取 Headers 的更复杂的逻辑(不是第一行,不是行的整个内容,也不是一个特定的 Header,等等),请考虑在FileSplitter
.
请注意,已移动到标题的行可能会从正常内容流程的下游进行筛选。
幂等下游处理拆分文件
什么时候apply-sequence
为 true,则拆分器会将SEQUENCE_NUMBER
标头(当markers
为 true,则标记计为行)。
该行号可与幂等接收器一起使用,以避免在重新启动后重新处理行。
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}
文件聚合器
从版本 5.5 开始,一个FileAggregator
的引入是为了覆盖FileSplitter
use-case 启用 START/END 标记时。
为方便起见,FileAggregator
实现所有三种序列详细信息策略:
-
这
HeaderAttributeCorrelationStrategy
使用FileHeaders.FILENAME
attribute 用于关联键的计算。 在FileSplitter
,它不会填充序列详细信息标头,因为 START/END 标记消息也包含在序列大小中。 这FileHeaders.FILENAME
仍会为发出的每一行填充,包括 START/END 标记消息。 -
这
FileMarkerReleaseStrategy
- 检查FileSplitter.FileMarker.Mark.END
消息,然后比较FileHeaders.LINE_COUNT
header 值与组大小减2
-FileSplitter.FileMarker
实例。 它还实现了一个方便的GroupConditionProvider
联系方式conditionSupplier
函数中要使用的AbstractCorrelatingMessageHandler
. 有关更多信息,请参阅 Message Group Condition 。 -
这
FileAggregatingMessageGroupProcessor
只是删除FileSplitter.FileMarker
消息,并将其余消息收集到要生成的 List 负载中。
以下清单显示了配置FileAggregator
:
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
integrationFlow {
split(Files.splitter().markers().firstLineAsHeader("firstLine"))
channel { executor(taskExecutor) }
filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
transform(String::toUpperCase)
channel("aggregatorChannel")
aggregate(FileAggregator())
channel { queue("resultChannel") }
}
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new FileAggregator());
aggregator.setOutputChannel(outputChannel);
return aggregator;
}
<int:chain input-channel="input" output-channel="output">
<int-file:splitter markers="true"/>
<int:aggregator>
<bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
</int:aggregator>
</int:chain>
如果FileAggregator
不满足目标逻辑,建议为聚合器终端节点配置单个策略。
看FileAggregator
JavaDocs 了解更多信息。
远程持久文件列表过滤器
入站和流式处理入站远程文件通道适配器 (FTP
,SFTP
和其他技术)配置了相应的AbstractPersistentFileListFilter
默认情况下,使用 In-Memory 配置MetadataStore
.
要在集群中运行,可以使用共享的MetadataStore
(有关更多信息,请参阅 元数据存储 )。
这些过滤器用于防止多次获取同一文件(除非修改时间更改)。
从版本 5.2 开始,在获取文件之前立即将文件添加到过滤器中(如果获取失败,则撤消)。
如果发生灾难性故障(例如断电),当前正在获取的文件可能会保留在过滤器中,并且在重新启动应用程序时不会重新获取。
在这种情况下,您需要从MetadataStore . |
在以前的版本中,在提取任何文件之前都会过滤文件,这意味着在发生灾难性故障后,多个文件可能会处于此状态。
为了促进这种新行为,在FileListFilter
.
boolean accept(F file);
boolean supportsSingleFileFiltering();
如果筛选条件返回true
在supportsSingleFileFiltering
,它必须实现accept()
.
如果远程过滤器不支持单个文件过滤(例如AbstractMarkerFilePresentFileListFilter
),则适配器将恢复为之前的行为。
如果使用了多个过滤器(使用CompositeFileListFilter
或ChainFileListFilter
),则所有委托过滤器都必须支持单个文件过滤,复合过滤器才能支持它。
持久文件列表过滤器现在具有布尔属性forRecursion
.
将此属性设置为true
,还会设置alwaysAcceptDirectories
,这意味着出站网关 (ls
和mget
) 现在每次都始终遍历完整的目录树。
这是为了解决未检测到目录树深处更改的问题。
另外forRecursion=true
使文件的完整路径用作元数据存储键;这解决了以下问题:如果具有相同名称的文件在不同目录中多次出现,则过滤器无法正常工作。
重要说明:这意味着对于顶级目录下的文件,将无法找到持久性元数据存储中的现有键。
因此,该属性为false
默认情况下;这可能会在未来版本中更改。