我们在 YARN ec2 集群中部署了一个 Spark 流应用程序,该集群具有 1 个名称节点和 2 个数据节点。我们提交的应用程序有 11 个执行程序,每个执行程序有 1 个内核和 588 MB RAM。该应用程序从 S3 中不断写入的目录中流出;这是实现这一目标的代码行:
val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](Settings.S3RequestsHost , (f:Path)=> true, true )
//some maps and other logic here
ssc.start()
ssc.awaitTermination()
使用 fileStream 而不是 textFileStream 的目的是自定义 spark 在进程启动时处理现有文件的方式。我们只想处理进程启动后添加的新文件并忽略现有文件。我们配置了 10 秒的批处理持续时间。
当我们将少量文件添加到 s3 时,这个过程很顺利,比如 4 或 5 个。我们可以在流式 UI 中看到阶段是如何在执行器中成功执行的,每个处理的文件都有一个。但有时当我们尝试添加大量文件时,我们会遇到奇怪的行为;应用程序开始流式传输已经流式传输的文件。
例如,我在 s3 中添加了 20 个文件。文件分 3 批处理。第一批处理 7 个文件,第二批处理 8 个,第三批处理 5 个。此时不再向 S3 添加文件,但 spark 开始使用相同的文件无休止地重复这些阶段! 有什么想法会导致这种情况吗?
我已经为此问题发布了一张 Jira 票: https ://issues.apache.org/jira/browse/SPARK-3553