1

我在使用 trigger.once 的流作业时遇到问题。当我第一次运行它时,它工作正常,将所有可用数据写入路径并完成。但是第二天,当原始路径中有新数据可用时,流看不到它并在写入任何数据之前完成查询。我正在使用自动加载器和 sqs 队列。检查点路径是正确的,并且有文件夹偏移,提交..

4

1 回答 1

0

您期望的行为应该没有触发器,如下所示:

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start()

触发器之间的区别如下:

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start()

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()

您可以参考https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

于 2022-02-09T14:57:58.393 回答