我在使用 trigger.once 的流作业时遇到问题。当我第一次运行它时,它工作正常,将所有可用数据写入路径并完成。但是第二天,当原始路径中有新数据可用时,流看不到它并在写入任何数据之前完成查询。我正在使用自动加载器和 sqs 队列。检查点路径是正确的,并且有文件夹偏移,提交..
1 回答
0
您期望的行为应该没有触发器,如下所示:
// Default trigger (runs micro-batch as soon as it can)
df.writeStream
.format("console")
.start()
触发器之间的区别如下:
// Default trigger (runs micro-batch as soon as it can)
df.writeStream
.format("console")
.start()
// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
// One-time trigger
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start()
// Continuous trigger with one-second checkpointing interval
df.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start()
您可以参考https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
于 2022-02-09T14:57:58.393 回答