1

我需要每天通过 SFTP 获取一个文件。我想将 Spring Integration 与 Java 配置一起使用。该文件通常在每天的特定时间可用。应用程序应每天尝试在该时间附近获取文件。如果文件不可用,它应该继续重试 x 次。在 x 次尝试后,它应该发送一封电子邮件,让管理员知道该文件在 SFTP 站点上仍然不可用。

一种选择是使用SftpInboundFileSynchronizingMessageSource. 在 中MessageHandler,我可以开始处理文件的工作。但是,我真的不需要与远程文件系统同步。毕竟,这是文件的预定交付。另外,我最多需要延迟 15 分钟以进行下一次重试,并且每 15 分钟轮询一次对于每日文件来说似乎有点过分。我想我可以使用它,但需要一些机制在经过一段时间后发送电子邮件并且没有收到文件。

另一个选项似乎是使用getSFTP 出站网关。但我能找到的唯一例子似乎是 XML 配置。

更新

使用以下 Artem Bilan 的回答提供的帮助后添加代码:

配置类:

@Bean
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata"))
public SftpInboundFileSynchronizingMessageSource sftpMessageSource(ApplicationProperties applicationProperties, PropertiesPersistingMetadataStore store) {
    SftpInboundFileSynchronizingMessageSource source =
            new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer(applicationProperties));
    source.setLocalDirectory(new File("ftp-inbound"));
    source.setAutoCreateLocalDirectory(true);
    FileSystemPersistentAcceptOnceFileListFilter local = new FileSystemPersistentAcceptOnceFileListFilter(store,"test");
    source.setLocalFilter(local);
    source.setCountsEnabled(true);        
    return source;
}

@Bean
public PollerMetadata pollerMetadata() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    List<Advice> adviceChain = new ArrayList<Advice>();
    adviceChain.add(retryCompoundTriggerAdvice());
    pollerMetadata.setAdviceChain(adviceChain);
    pollerMetadata.setTrigger(compoundTrigger());
    return pollerMetadata;
}

@Bean
public RetryCompoundTriggerAdvice retryCompoundTriggerAdvice() {
    return new RetryCompoundTriggerAdvice(compoundTrigger(), secondaryTrigger());
}

@Bean
public CompoundTrigger compoundTrigger() {
    CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger());
    return compoundTrigger;
}

@Bean
public Trigger primaryTrigger() {
    return new CronTrigger("*/60 * * * * *");
}

@Bean
public Trigger secondaryTrigger() {
    return new PeriodicTrigger(10000);
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler(PropertiesPersistingMetadataStore store) {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
            store.flush();
        }

    };
}

RetryCompoundTriggerAdvice 类:

public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice {

    private final CompoundTrigger compoundTrigger;

    private final Trigger override;

    private int count = 0;

    public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger, Trigger overrideTrigger) {
        Assert.notNull(compoundTrigger, "'compoundTrigger' cannot be null");
        this.compoundTrigger = compoundTrigger;
        this.override = overrideTrigger;
    }

    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        return true;
    }

    @Override
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
        if (result == null && count <= 5) {
            count++;
            this.compoundTrigger.setOverride(this.override);
        }
        else {
            this.compoundTrigger.setOverride(null);
            if (count > 5) {
                 //send email
            }
            count = 0;
        }
        return result;
    }
}
4

1 回答 1

2

由于 Spring Integration 4.3 有CompoundTrigger

* A {@link Trigger} that delegates the {@link #nextExecutionTime(TriggerContext)}
* to one of two Triggers. If the {@link #setOverride(Trigger) override} trigger is
* {@code null}, the primary trigger is invoked; otherwise the override trigger is
* invoked.

结合CompoundTriggerAdvice

* An {@link AbstractMessageSourceAdvice} that uses a {@link CompoundTrigger} to adjust
* the poller - when a message is present, the compound trigger's primary trigger is
* used to determine the next poll. When no message is present, the override trigger is
* used.

它可用于完成您的任务:

primaryTrigger可以是每天CronTrigger只运行一次任务。

override可能是一个PeriodicTrigger需要很短的重试时间。

retry您可以将另一个逻辑用于Advice轮询器,或者只是扩展它CompoundTriggerAdvice以添加count逻辑以最终发送电子邮件。

由于没有文件,因此没有消息踢流。我们别无选择,除非围绕轮询基础设施跳舞。

于 2016-08-29T20:56:07.730 回答