我正在尝试通过 spark2 流处理一些数据并将它们保存到 hdfs。在流运行时,我想通过简单的选择通过节俭服务器读取存储的数据:
SELECT COUNT(*) FROM stream_table UNION ALL SELECT COUNT(*) FROM thisistable;
但我得到了这个例外
错误:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 5.0 中的任务 0 失败 1 次,最近一次失败:阶段 5.0 中丢失任务 0.0(TID 6,本地主机):java.lang.RuntimeException:hdfs ://5b6b8bf723a2:9000/archiveData/parquets/efc44dd4-1792-4b6d-b0f2-120818047b1b 不是 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412) 中 parquet.hadoop.ParquetFileReader 的 Parquet 文件(太小) .readFooter(ParquetFileReader.java:385) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:371) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.getSplit(ParquetRecordReaderWrapper.java:252 ) 在 org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:99) 在 org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java) 的 org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:85) :72) 在 org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:246) 在 org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102) 在 org.apache.spark .rdd.RDD.iterator(RDD.scala:283)(RDD.scala:319) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)scala:38) 在 org.apache.spark。 rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)(RDD.scala:319) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala:319)scala:38) 在 org.apache.spark.rdd.MapPartitionsRDD。在 org.apache.spark.rdd.RDD.iterator(RDD.scala:283)(RDD.scala:319) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala) 计算(MapPartitionsRDD.scala:38) :319)scala:38) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 在 org.apache.spark.scheduler.Task.run(Task.scala:85)ShuffleMapTask.scala:47 ) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.lang.Thread.run(Thread.java:745) 或 $Worker.run(ThreadPoolExecutor.java:617)apache.spark.scheduler.Task.run(Task.scala:85)ShuffleMapTask.scala:47) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.lang.Thread.run(Thread. java:745) 或$Worker.run(ThreadPoolExecutor.java:617)apache.spark.scheduler.Task.run(Task.scala:85)ShuffleMapTask.scala:47) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.lang.Thread.run(Thread. java:745) 或$Worker.run(ThreadPoolExecutor.java:617)
我的假设是 spark 将在批处理开始时创建一个空的镶木地板文件,并在批处理结束时填充它,并且我正在select
通过存档文件运行 A,但是一个是空的,因为实际批处理尚未完成.
简单的火花流示例(Thread.sleep 用于模拟转换延迟)
spark
.readStream()
.schema(schema)
.json("/tmp")
.filter(x->{
Thread.sleep(1000);
return true;
})
.writeStream()
.format("parquet")
.queryName("thisistable")
.start()
.awaitTermination();
有没有办法让我避免这种异常,并且使用 thrift 服务器只获取完成的文件?