文件聚合器
从版本 5.5 开始,一个FileAggregator
的引入是为了覆盖FileSplitter
use-case 启用 START/END 标记时。
为方便起见,FileAggregator
实现所有三种序列详细信息策略:
-
这
HeaderAttributeCorrelationStrategy
使用FileHeaders.FILENAME
attribute 用于关联键的计算。 在FileSplitter
,它不会填充序列详细信息标头,因为 START/END 标记消息也包含在序列大小中。 这FileHeaders.FILENAME
仍会为发出的每一行填充,包括 START/END 标记消息。 -
这
FileMarkerReleaseStrategy
- 检查FileSplitter.FileMarker.Mark.END
消息,然后比较FileHeaders.LINE_COUNT
header 值与组大小减2
-FileSplitter.FileMarker
实例。 它还实现了一个方便的GroupConditionProvider
联系方式conditionSupplier
函数中要使用的AbstractCorrelatingMessageHandler
. 有关更多信息,请参阅 Message Group Condition 。 -
这
FileAggregatingMessageGroupProcessor
只是删除FileSplitter.FileMarker
消息,并将其余消息收集到要生成的 List 负载中。
以下清单显示了配置FileAggregator
:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
integrationFlow {
split(Files.splitter().markers().firstLineAsHeader("firstLine"))
channel { executor(taskExecutor) }
filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
transform(String::toUpperCase)
channel("aggregatorChannel")
aggregate(FileAggregator())
channel { queue("resultChannel") }
}
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new FileAggregator());
aggregator.setOutputChannel(outputChannel);
return aggregator;
}
<int:chain input-channel="input" output-channel="output">
<int-file:splitter markers="true"/>
<int:aggregator>
<bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
</int:aggregator>
</int:chain>
如果FileAggregator
不满足目标逻辑,建议为聚合器终端节点配置单个策略。
看FileAggregator
JavaDocs 了解更多信息。