我正在使用 spark 流 api 来熟悉它。我正在实现一个字数问题,我听一个流并在 x 秒后找到累积的字数并将其输出到文本文件。因此,经过所有转换后,当我使用 DStream 的 saveAsTextFiles() 函数将输出打印到文件时,我得到了奇怪的输出。
我希望它每隔 x 秒创建一个文本文件并将最新结果转储到该文件中。我希望文件的名称是文档中提到的前缀时间戳后缀。但相反,我得到的是每 x 秒一个文件夹,命名为我期望文件命名的内容,并且在该文件夹中是带有我的结果的部分 * 文件。虽然他们是正确的,但为什么会发生这种情况?我的期望在某种程度上是错误的吗?
对于转换和我正在使用的东西(以防有人好奇):
# sort the dstream for current batch
sorted_counts = counts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
# get the top K values of each rdd from the transformed dstream
topK = sorted_counts.transform(lambda rdd: rdd.zipWithIndex().filter(<filter with big index>).map(<remove index here>))