我有两台服务器(EC2 实例)。在一台服务器(服务器 1)上我有 5 个批次,而在另一台服务器(服务器 2)上我有 6 个批次。我将每个批次包装成活动,下面给出了工作流程实现类。我想根据给定的执行日期迭代活动(整个活动,包括服务器 1 和 2)。例如,如果日期小于当前日期,则从给定日期到当前日期执行服务器 1 和 2 的所有活动。如果执行日期等于当前日期,则在当前日期执行服务器 1 和 2 的所有活动。此外,如果一天的任何活动引发任何异常,则不要执行第二天的活动(<=当前日期)。
public class JobWorkflowImpl implements JobWorkflow{
private DecisionContextProvider contextProvider
= new DecisionContextProviderImpl();
private WorkflowClock clock
= contextProvider.getDecisionContext().getWorkflowClock();
private BS1JobActivitiesClient bs1ActivitiesClient
= new BS1JobActivitiesClientImpl();
private BS2JobActivitiesClient bs2ActivitiesClient
= new BS2JobActivitiesClientImpl();
@Override
public void executeJob(Date exedate) {
Date today = new Date(clock.currentTimeMillis());
Date toProcess = exedate;
// All date manipulations are pseudocode here as I'm lazy
// to look up the real ones.
Promise<Void> previousDateDone = null;
while(toProcess <= today) {
// Create chain of executeJobForExactDate
// linked by previousDateDone to ensure that they are executed sequentially.
// null Promise is treated as ready promise by the framework.
previousDateDone = executeJobForExactDate(toProcess, previousDateDone);
toProcess.addDay(1);
}
}
Promise<Void> void executeJobForExactDate(Date date, Promise<Void> previous) {
Settable<Integer> firstServerDone = new Settable<Integer>();
Settable<Integer> secondServerDone = new Settable<Integer>();
Settable<Integer> resultODLSLBs1 = new Settable<Integer>();
//TODO Iterate over the activities
new TryCatchFinally(previous){
@Override
protected void doTry(){
Promise<Integer> resultFARBs1 = bs1ActivitiesClient.executecommand1(date);
Promise<Integer> resultUAMLFBs1 = bs1ActivitiesClient.executecommand2(date, resultFARBs1);
Promise<Integer> resultLLPBs1 = bs1ActivitiesClient.executecommand3(date, resultUAMLFBs1);
Promise<Integer> resultODLDLBs1 = bs1ActivitiesClient.executecommand4(date, resultLLPBs1);
// Chain links result of the activity execution
// to an aready existing Settable.
resultODLSLBs1.chain(bs1ActivitiesClient.executecommand5(date, resultODLDLBs1));
}
@Override
protected void doCatch(Throwable e){
throw new MyException("Failed");
}
@Override
protected void doFinally() throws Throwable {
firstServerDone.set(null);
}
};
new TryCatchFinally(previous){
@Override
protected void doTry() {
Promise<Integer> resultFARBs2 = bs2ActivitiesClient.executecommand1(date);
Promise<Integer> resultUAMLFBs2 = bs2ActivitiesClient.executecommand2(date, resultFARBs2);
Promise<Integer> resultLLPBs2 = bs2ActivitiesClient.executecommand3(date, resultUAMLFBs2);
Promise<Integer> resultODLDLBs2 = bs2ActivitiesClient.executecommand4(date, resultLLPBs2);
Promise<Integer> resultODLSLBs2 = bs2ActivitiesClient.executecommand5(date, resultODLDLBs2, resultODLSLBs1);
bs2ActivitiesClient.executecommand6(date, resultODLSLBs2);
}
@Override
protected void doCatch(Throwable e){
throw new MyException("Failed");
}
@Override
protected void doFinally(){
secondServerDone.set(null);
}
};
// AndPromise is done when all of its constructor parameters are done.
// I decided to consider the date processing done when both
// TryCatchFinallies are done. You can implement more complex logic depending on
// the business requirements. One option is to wrap both TryCatcFinallies
// in another TryCatchFinally.
return new AndPromise(firstServerDone, secondServerDone);
}
}
问题是,如果服务器 1 的任何活动引发任何异常,那么它将取消服务器 1 和服务器 2 尚未启动的所有活动。但我只希望服务器内未执行的活动应该得到由于自己的服务器活动失败而取消,其他服务器应尽可能继续(即它所依赖的地方)。