我在分区目录中有以下镶木地板文件:
/files/dataset
/id=1
parquet.gz
/id=2
parquet.gz
/id=3
parquet.gz
在 spark1.6 中,可以按如下方式访问:
val arr = sqlContext.read.parquet("/files/dataset/").collect
但是在 spark2.0.1 中,此代码会引发错误:
val arr = spark.read.parquet("/files/dataset/").collect
java.lang.NullPointerException
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:272)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
各个分区目录可以单独读取和合并,但我很好奇我应该寻找什么差异。
更新:分区目录是通过三个单独的写入写入的,例如,df.where(id=1).write.parquet
而不是df.write.partitionBy
. 这似乎是问题的根源。但是,我正在积极尝试确定为什么在先前版本的 spark 中读取/收集成功。
更新:上面的 'id' 列是一个 Long,当显式写入时(例如 df.write.parquet('/files/dataset/id=1') 在读取过程中抛出错误。分区发现显然试图将分区读取为IntType 而不是 Long。见https://issues.apache.org/jira/browse/SPARK-18108