我需要每天通过 SFTP 获取一个文件。我想将 Spring Integration 与 Java 配置一起使用。该文件通常在每天的特定时间可用。应用程序应每天尝试在该时间附近获取文件。如果文件不可用,它应该继续重试 x 次。在 x 次尝试后,它应该发送一封电子邮件,让管理员知道该文件在 SFTP 站点上仍然不可用。
一种选择是使用SftpInboundFileSynchronizingMessageSource
. 在 中MessageHandler
,我可以开始处理文件的工作。但是,我真的不需要与远程文件系统同步。毕竟,这是文件的预定交付。另外,我最多需要延迟 15 分钟以进行下一次重试,并且每 15 分钟轮询一次对于每日文件来说似乎有点过分。我想我可以使用它,但需要一些机制在经过一段时间后发送电子邮件并且没有收到文件。
另一个选项似乎是使用get
SFTP 出站网关。但我能找到的唯一例子似乎是 XML 配置。
更新
使用以下 Artem Bilan 的回答提供的帮助后添加代码:
配置类:
@Bean
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata"))
public SftpInboundFileSynchronizingMessageSource sftpMessageSource(ApplicationProperties applicationProperties, PropertiesPersistingMetadataStore store) {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer(applicationProperties));
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
FileSystemPersistentAcceptOnceFileListFilter local = new FileSystemPersistentAcceptOnceFileListFilter(store,"test");
source.setLocalFilter(local);
source.setCountsEnabled(true);
return source;
}
@Bean
public PollerMetadata pollerMetadata() {
PollerMetadata pollerMetadata = new PollerMetadata();
List<Advice> adviceChain = new ArrayList<Advice>();
adviceChain.add(retryCompoundTriggerAdvice());
pollerMetadata.setAdviceChain(adviceChain);
pollerMetadata.setTrigger(compoundTrigger());
return pollerMetadata;
}
@Bean
public RetryCompoundTriggerAdvice retryCompoundTriggerAdvice() {
return new RetryCompoundTriggerAdvice(compoundTrigger(), secondaryTrigger());
}
@Bean
public CompoundTrigger compoundTrigger() {
CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger());
return compoundTrigger;
}
@Bean
public Trigger primaryTrigger() {
return new CronTrigger("*/60 * * * * *");
}
@Bean
public Trigger secondaryTrigger() {
return new PeriodicTrigger(10000);
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler(PropertiesPersistingMetadataStore store) {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
store.flush();
}
};
}
RetryCompoundTriggerAdvice 类:
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice {
private final CompoundTrigger compoundTrigger;
private final Trigger override;
private int count = 0;
public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger, Trigger overrideTrigger) {
Assert.notNull(compoundTrigger, "'compoundTrigger' cannot be null");
this.compoundTrigger = compoundTrigger;
this.override = overrideTrigger;
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return true;
}
@Override
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
if (result == null && count <= 5) {
count++;
this.compoundTrigger.setOverride(this.override);
}
else {
this.compoundTrigger.setOverride(null);
if (count > 5) {
//send email
}
count = 0;
}
return result;
}
}