2

我目前使用 spark 2.0.1,我尝试使用 insertInto() 将我的数据集保存到“分区表 Hive”中,或者使用 partitionBy("col") 将我的数据集保存到 S3 存储中,同时工作在并发(并行)中。但是使用这 2 种方法,我的数据集的每个分区都会一个接一个地按顺序保存。这是非常非常缓慢的。我已经知道我必须一次使用 insertInto() 或 partitionBy() 一个。我假设 spark.2.0.1 Dataframe 是 Resilient Data Set 。我当前的代码:

df.write.mode(SaveMode.Append).partitionBy("col").save("s3://bucket/diroutput")

或者

df.write.mode(SaveMode.Append).insertInto("TableHivealreadypartitioned")

所以我用 df.foreachPartition 尝试了一些东西,如下所示:

df.foreachPartition{datasetpartition => datasetpartition.foreach(row => row.sometransformation)}

您将在下面找到提取日志。在第一个示例中,它是 hive 中的“InserInto(tablehivealreadypartitionned)”。我们可以看到,Spark 的所有“分区”都是一一写入的。在第二个示例中,直接写入 S3 的是“partitionBy().save()”。我们还可以看到,所有的“分区”spark 都是一一写入的。我们处理的数据帧只有一个“分区”,它的大小约为 200MB 未压缩(在内存中)。使用选项 local[4] ,作业可以花费 120s 170s 来保存数据。

[INFO] 2016-11-03 00:10:33,255 org.apache.spark.SparkContext logInfo - Created broadcast 2330 from broadcast at TorExitLookup.scala:43
[INFO] 2016-11-03 00:10:35,302 org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:10:35,363 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/db/.hive-staging_hive_2016-11-03_00-10-29_426_1749488585639143697-1/-ext-10000/tsbucket=2016-11-02 09%3A00%3A00/part-00001
[INFO] 2016-11-03 00:10:35,380 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030010_0948_m_000001_0
[INFO] 2016-11-03 00:10:35,380 org.apache.spark.executor.Executor logInfo - Finished task 1.0 in stage 948.0 (TID 1385). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:10:35,381 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 1.0 in stage 948.0 (TID 1385) in 5718 ms on localhost (1/2)
[INFO] 2016-11-03 00:11:23,033 org.apache.spark.storage.BlockManagerInfo logInfo - Removed broadcast_2330_piece0 on 10.0.193.149:34016 in memory (size: 6.9 KB, free: 414.4 MB)
[INFO] 2016-11-03 00:11:58,194 org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:12:00,210 org.apache.spark.storage.BlockManagerInfo logInfo - Removed broadcast_2329_piece0 on 10.0.193.149:34016 in memory (size: 6.9 KB, free: 414.4 MB)
[INFO] 2016-11-03 00:12:05,295 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/db/.hive-staging_hive_2016-11-03_00-10-29_426_1749488585639143697-1/-ext-10000/tsbucket=2016-11-02 09%3A00%3A00/part-00000
[INFO] 2016-11-03 00:12:05,831 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030010_0948_m_000000_0
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.executor.Executor logInfo - Finished task 0.0 in stage 948.0 (TID 1384). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 0.0 in stage 948.0 (TID 1384) in 96173 ms on localhost (2/2)
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.scheduler.DAGScheduler logInfo - ResultStage 948 (insertInto at ImportHive.scala:24) finished in 96,173 s
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.scheduler.TaskSchedulerImpl logInfo - Removed TaskSet 948.0, whose tasks have all completed, from pool
[INFO] 2016-11-03 00:12:05,836 org.apache.spark.scheduler.DAGScheduler logInfo - Job 948 finished: insertInto at ImportHive.scala:24, took 96,188035 s


[INFO] 2016-11-03 00:12:17,171 org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:12:17,296 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/rescue/tsbucket=2016-11-02 09%3A00%3A00/part-r-00001-f433a41e-1b59-49af-b232-cf701e0c6df9.zlib.orc
[INFO] 2016-11-03 00:12:17,388 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030012_0949_m_000001_0
[INFO] 2016-11-03 00:12:17,388 org.apache.spark.executor.Executor logInfo - Finished task 1.0 in stage 949.0 (TID 1387). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:12:17,389 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 1.0 in stage 949.0 (TID 1387) in 6892 ms on localhost (1/2)
[INFO] 2016-11-03 00:12:57,467 org.apache.spark.storage.BlockManagerInfo logInfo - Removed broadcast_2333_piece0 on 10.0.193.149:34016 in memory (size: 6.9 KB, free: 414.4 MB)
[INFO] 2016-11-03 00:13:36,195 org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:13:43,689 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/rescue/tsbucket=2016-11-02 09%3A00%3A00/part-r-00000-f433a41e-1b59-49af-b232-cf701e0c6df9.zlib.orc
[INFO] 2016-11-03 00:13:44,258 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030012_0949_m_000000_0
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.executor.Executor logInfo - Finished task 0.0 in stage 949.0 (TID 1386). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 0.0 in stage 949.0 (TID 1386) in 93762 ms on localhost (2/2)
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.DAGScheduler logInfo - ResultStage 949 (save at ImportHive.scala:30) finished in 93,762 s
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.TaskSchedulerImpl logInfo - Removed TaskSet 949.0, whose tasks have all completed, from pool
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.DAGScheduler logInfo - Job 949 finished: save at ImportHive.scala:30, took 93,772483 s
[INFO] 2016-11-03 00:13:44,260 org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter cleanupJob - Nothing to clean up since no temporary files were written.
[INFO] 2016-11-03 00:13:44,260 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/rescue/_SUCCESS
[INFO] 2016-11-03 00:13:44,275 org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer logInfo - Job job_201611030012_0000 committed.

不幸的是,我仍然没有找到一种方法来并行写入/保存我的数据集的每个 spark 分区。

有人已经这样做了吗?

你能告诉我如何进行吗?

这是一个错误的方向吗?感谢您的帮助

4

1 回答 1

1

我们处理的数据帧只有一个“分区”,它的大小约为 200MB 未压缩(在内存中)

这是您的问题.. spark 根据分区在执行程序之间分配工作。

为了并行工作,您需要您的 df 有多个分区。您可以使用以下方法执行此操作:

df.repartition(number)

还要确保您正在使用:

hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2")

写入 s3 时。

于 2017-01-02T17:31:08.353 回答