1

我正在尝试通过 spark2 流处理一些数据并将它们保存到 hdfs。在流运行时,我想通过简单的选择通过节俭服务器读取存储的数据:

SELECT COUNT(*) FROM stream_table UNION ALL SELECT COUNT(*) FROM thisistable;

但我得到了这个例外

错误:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 5.0 中的任务 0 失败 1 次,最近一次失败:阶段 5.0 中丢失任务 0.0(TID 6,本地主机):java.lang.RuntimeException:hdfs ://5b6b8bf723a2:9000/archiveData/parquets/efc44dd4-1792-4b6d-b0f2-120818047b1b 不是 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412) 中 parquet.hadoop.ParquetFileReader 的 Parquet 文件(太小) .readFooter(ParquetFileReader.java:385) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:371) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.getSplit(ParquetRecordReaderWrapper.java:252 ) 在 org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:99) 在 org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java) 的 org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:85) :72) 在 org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:246) 在 org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102) 在 org.apache.spark .rdd.RDD.iterator(RDD.scala:283)(RDD.scala:319) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)scala:38) 在 org.apache.spark。 rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)(RDD.scala:319) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala:319)scala:38) 在 org.apache.spark.rdd.MapPartitionsRDD。在 org.apache.spark.rdd.RDD.iterator(RDD.scala:283)(RDD.scala:319) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala) 计算(MapPartitionsRDD.scala:38) :319)scala:38) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 在 org.apache.spark.scheduler.Task.run(Task.scala:85)ShuffleMapTask.scala:47 ) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.lang.Thread.run(Thread.java:745) 或 $Worker.run(ThreadPoolExecutor.java:617)apache.spark.scheduler.Task.run(Task.scala:85)ShuffleMapTask.scala:47) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.lang.Thread.run(Thread. java:745) 或$Worker.run(ThreadPoolExecutor.java:617)apache.spark.scheduler.Task.run(Task.scala:85)ShuffleMapTask.scala:47) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.lang.Thread.run(Thread. java:745) 或$Worker.run(ThreadPoolExecutor.java:617)

我的假设是 spark 将在批处理开始时创建一个空的镶木地板文件,并在批处理结束时填充它,并且我正在select通过存档文件运行 A,但是一个是空的,因为实际批处理尚未完成.

简单的火花流示例(Thread.sleep 用于模拟转换延迟)

spark
    .readStream()
    .schema(schema)
    .json("/tmp")
    .filter(x->{
        Thread.sleep(1000);
        return true;
    })
    .writeStream()
    .format("parquet")
    .queryName("thisistable")
    .start()
    .awaitTermination();

有没有办法让我避免这种异常,并且使用 thrift 服务器只获取完成的文件?

4

0 回答 0