我有一段代码可以创建 aDataFrame
并将其保存到 S3。下面创建DataFrame
1000 行和 100 列,由 填充math.Random
。我在一个有 4 个r3.8xlarge
工作节点的集群上运行它,并配置了大量的内存。我尝试了最大数量的执行器,每个节点一个执行器。
// create some random data for performance and scalability testing
val df = sqlContext.range(0,1000).map(x =>
Row.fromSeq((1 to 100).map(y => math.Random)))
df.saveAsParquetFile("s3://kirk/my_file.parquet")
我的问题是我可以创建DataFrame
比保存到 S3 更大的内存。
比如可以构造和查询10亿行1000列,但是我用这种方式写S3的时候1亿行100列就失败了。我没有从 Spark 上下文中得到很好的消息,但是由于太多的任务失败了,所以作业会失败。
是否有一些配置可以更有效地保存文件?我应该以不同的方式配置 SparksaveAsParquetFile
吗?
这是来自执行程序的堆栈跟踪:
15/09/09 18:10:26 ERROR sources.InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
at parquet.column.values.dictionary.IntList.initSlab(IntList.java:87)
at parquet.column.values.dictionary.IntList.<init>(IntList.java:83)
at parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:85)
at parquet.column.values.dictionary.DictionaryValuesWriter$PlainIntegerDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:549)
at parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:88)
at parquet.column.impl.ColumnWriterImpl.<init>(ColumnWriterImpl.java:74)
at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:68)
at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:83)
at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)
at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470)
at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)