我想创建一个Trigger第一次在 20 秒内触发,之后每五秒触发一次。我用过GlobalWindows和一个习惯Trigger
val windowedStream = valueStream
                          .keyBy(0)
                          .window(GlobalWindows.create())
                          .trigger(TradeTrigger.of())
这是中的代码TradeTrigger:
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;
    static boolean flag=false;
    static long ctime = System.currentTimeMillis();
    private TradeTrigger() {
    }
    @Override
    public TriggerResult onElement(
            Object arg0,
            long arg1,
            W arg2,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
            throws Exception {
        // TODO Auto-generated method stub
        if(flag == false){
            if((System.currentTimeMillis()-ctime) >= 20000){
               flag = true;
               ctime = System.currentTimeMillis();
               return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        } else {
            if((System.currentTimeMillis()-ctime) >= 5000){
                ctime = System.currentTimeMillis();
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }
    }
    @Override
    public TriggerResult onEventTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }
    @Override
    public TriggerResult onProcessingTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }
    public static <W extends Window> TradeTrigger<W> of() {
        return new TradeTrigger<>();
    }
}
所以基本上,when flagis false,即第一次,Trigger应该在 20 秒内触发并将 设置flag为true。从下一次开始,它应该每 5 秒触发一次。
我面临的问题是,每次Trigger触发时我只会在输出中收到一条消息。也就是说,我在 20 秒后收到一条消息,每五秒收到一条消息。我预计每次触发时输出中有 20 条消息。
如果我使用.timeWindow(Time.seconds(5))并创建一个 5 秒的时间窗口,则每 5 秒输出 20 条消息。请帮助我正确获取此代码。有什么我想念的吗?