0

我有两台服务器(EC2 实例)。在一台服务器(服务器 1)上我有 5 个批次,而在另一台服务器(服务器 2)上我有 6 个批次。我将每个批次包装成活动,下面给出了工作流程实现类。我想根据给定的执行日期迭代活动(整个活动,包括服务器 1 和 2)。例如,如果日期小于当前日期,则从给定日期到当前日期执行服务器 1 和 2 的所有活动。如果执行日期等于当前日期,则在当前日期执行服务器 1 和 2 的所有活动。此外,如果一天的任何活动引发任何异常,则不要执行第二天的活动(<=当前日期)。

 public class JobWorkflowImpl implements JobWorkflow{

   private DecisionContextProvider contextProvider
     = new DecisionContextProviderImpl();

   private WorkflowClock clock
     = contextProvider.getDecisionContext().getWorkflowClock();

    private BS1JobActivitiesClient bs1ActivitiesClient 
     = new BS1JobActivitiesClientImpl();
    private BS2JobActivitiesClient bs2ActivitiesClient 
     = new BS2JobActivitiesClientImpl();

    @Override
    public void executeJob(Date exedate) {
       Date today = new Date(clock.currentTimeMillis());
       Date toProcess = exedate;
       // All date manipulations are pseudocode here as I'm lazy 
       // to look up the real ones.
       Promise<Void> previousDateDone = null;
       while(toProcess <= today) {
          // Create chain of executeJobForExactDate 
          // linked by previousDateDone to ensure that they are executed sequentially.
          // null Promise is treated as ready promise by the framework.
          previousDateDone = executeJobForExactDate(toProcess, previousDateDone);
          toProcess.addDay(1);
       }
    }

    Promise<Void> void executeJobForExactDate(Date date, Promise<Void> previous) {

       Settable<Integer> firstServerDone = new Settable<Integer>();
       Settable<Integer> secondServerDone = new Settable<Integer>();

       Settable<Integer> resultODLSLBs1 = new Settable<Integer>();

       //TODO Iterate over the activities
        new TryCatchFinally(previous){

            @Override
            protected void doTry(){
                Promise<Integer> resultFARBs1 = bs1ActivitiesClient.executecommand1(date);
                Promise<Integer> resultUAMLFBs1 = bs1ActivitiesClient.executecommand2(date, resultFARBs1);
                Promise<Integer> resultLLPBs1 = bs1ActivitiesClient.executecommand3(date, resultUAMLFBs1);
                Promise<Integer> resultODLDLBs1 = bs1ActivitiesClient.executecommand4(date, resultLLPBs1);
                // Chain links result of the activity execution 
                // to an aready existing Settable.
                resultODLSLBs1.chain(bs1ActivitiesClient.executecommand5(date, resultODLDLBs1));
            }

            @Override
            protected void doCatch(Throwable e){
               throw new MyException("Failed");
            }

            @Override
            protected void doFinally() throws Throwable {
                firstServerDone.set(null);
            }
        };

        new TryCatchFinally(previous){

            @Override
            protected void doTry()  {
                Promise<Integer> resultFARBs2 = bs2ActivitiesClient.executecommand1(date);
                Promise<Integer> resultUAMLFBs2 = bs2ActivitiesClient.executecommand2(date, resultFARBs2);
                Promise<Integer> resultLLPBs2 = bs2ActivitiesClient.executecommand3(date, resultUAMLFBs2);
                Promise<Integer> resultODLDLBs2 = bs2ActivitiesClient.executecommand4(date, resultLLPBs2);
                Promise<Integer> resultODLSLBs2 = bs2ActivitiesClient.executecommand5(date, resultODLDLBs2, resultODLSLBs1);
                bs2ActivitiesClient.executecommand6(date, resultODLSLBs2);
            }

            @Override
            protected void doCatch(Throwable e){
                throw new MyException("Failed");
            }

            @Override
            protected void doFinally(){
                secondServerDone.set(null);
            }

        };
        // AndPromise is done when all of its constructor parameters are done.
        // I decided to consider the date processing done when both 
        // TryCatchFinallies are done. You can implement more complex logic depending on
        // the business requirements. One option is to wrap both TryCatcFinallies 
        // in another TryCatchFinally.
        return new AndPromise(firstServerDone, secondServerDone);
    }
}

问题是,如果服务器 1 的任何活动引发任何异常,那么它将取消服务器 1 和服务器 2 尚未启动的所有活动。但我只希望服务器内未执行的活动应该得到由于自己的服务器活动失败而取消,其他服务器应尽可能继续(即它所依赖的地方)。

4

1 回答 1

0

棘手的部分是流框架要求您使用它通过决策上下文提供的时钟。否则它将无法正常工作。其余的只是异步 Java 编程。

public class JobWorkflowImpl implements JobWorkflow{

   private DecisionContextProvider contextProvider
     = new DecisionContextProviderImpl();

   private WorkflowClock clock
     = contextProvider.getDecisionContext().getWorkflowClock();

    private BS1JobActivitiesClient bs1ActivitiesClient 
     = new BS1JobActivitiesClientImpl();
    private BS2JobActivitiesClient bs2ActivitiesClient 
     = new BS2JobActivitiesClientImpl();

    @Override
    public void executeJob(Date exedate) {
       Date today = new Date(clock.currentTimeMillis());
       Date toProcess = exedate;
       // All date manipulations are pseudocode here as I'm lazy 
       // to look up the real ones.
       Promise<Void> previousDateDone = null;
       while(toProcess <= today) {
          // Create chain of executeJobForExactDate 
          // linked by previousDateDone to ensure that they are executed sequentially.
          // null Promise is treated as ready promise by the framework.
          previousDateDone = executeJobForExactDate(toProcess, previousDateDone);
          toProcess.addDay(1);
       }
    }

    Promise<Void> void executeJobForExactDate(Date date, Promise<Void> previous) {

       Settable<Integer> firstServerDone = new Settable<Integer>();
       Settable<Integer> secondServerDone = new Settable<Integer>();

       Settable<Integer> resultODLSLBs1 = new Settable<Integer>();

       //TODO Iterate over the activities
        new TryCatchFinally(previous){

            @Override
            protected void doTry(){
                Promise<Integer> resultFARBs1 = bs1ActivitiesClient.executecommand1(date);
                Promise<Integer> resultUAMLFBs1 = bs1ActivitiesClient.executecommand2(date, resultFARBs1);
                Promise<Integer> resultLLPBs1 = bs1ActivitiesClient.executecommand3(date, resultUAMLFBs1);
                Promise<Integer> resultODLDLBs1 = bs1ActivitiesClient.executecommand4(date, resultLLPBs1);
                // Chain links result of the activity execution 
                // to an aready existing Settable.
                resultODLSLBs1.chain(bs1ActivitiesClient.executecommand5(date, resultODLDLBs1));
            }

            @Override
            protected void doCatch(Throwable e){
                System.out.println("Failed to execute BS1 daily job");
            }

            @Override
            protected void doFinally() throws Throwable {
                firstServerDone.set(null);
            }
        };

        new TryCatchFinally(previous){

            @Override
            protected void doTry()  {
                Promise<Integer> resultFARBs2 = bs2ActivitiesClient.executecommand1(date);
                Promise<Integer> resultUAMLFBs2 = bs2ActivitiesClient.executecommand2(date, resultFARBs2);
                Promise<Integer> resultLLPBs2 = bs2ActivitiesClient.executecommand3(date, resultUAMLFBs2);
                Promise<Integer> resultODLDLBs2 = bs2ActivitiesClient.executecommand4(date, resultLLPBs2);
                Promise<Integer> resultODLSLBs2 = bs2ActivitiesClient.executecommand5(date, resultODLDLBs2, resultODLSLBs1);
                bs2ActivitiesClient.executecommand6(date, resultODLSLBs2);
            }

            @Override
            protected void doCatch(Throwable e){
                System.out.println("Failed to execute BS2 daily job");
            }

            @Override
            protected void doFinally(){
                secondServerDone.set(null);
            }

        };
        // AndPromise is done when all of its constructor parameters are done.
        // I decided to consider the date processing done when both 
        // TryCatchFinallies are done. You can implement more complex logic depending on
        // the business requirements. One option is to wrap both TryCatcFinallies 
        // in another TryCatchFinally.
        return new AndPromise(firstServerDone, secondServerDone);
    }
}
于 2014-10-03T17:59:15.287 回答