我想对我的数据集进行 Hive 分区,但我不太清楚如何确保拆分中的文件数是合理的。我知道我应该大致针对大小为 128MB 的文件
如何安全地缩放和控制 Hive 分区数据集文件内的行数?
我想对我的数据集进行 Hive 分区,但我不太清楚如何确保拆分中的文件数是合理的。我知道我应该大致针对大小为 128MB 的文件
如何安全地缩放和控制 Hive 分区数据集文件内的行数?
对于这个答案,我假设您已经正确理解了应该和不应该进行 Hive 样式分区的原因,并且不会涵盖支持理论。
在这种情况下,重要的是要确保我们不仅正确计算拆分中所需的文件数量,而且还要根据这些计算重新分区我们的数据集。 在写出 Hive 风格的分区数据集之前未能进行重新分区可能会导致您的工作尝试写出数百万个小文件,这会影响您的性能。
在我们的例子中,我们将使用的策略是创建N
每个文件最多包含行的文件,这将限制每个文件的大小。我们不能轻易地限制拆分中每个文件的确切大小,但我们可以使用行数作为一个很好的近似值。
我们将用于实现此目的的方法是创建一个合成列,该列描述一行将属于哪个“批次”,在 Hive 拆分列和此合成列上重新分区最终数据集,并在写入时使用此结果。
为了确保我们的合成列指示行所属的正确批次,我们需要确定每个 hive 拆分内的行数,并将此拆分内的行“散布”到正确数量的文件中。
总体而言,该策略将如下所示:
让我们从考虑以下数据集开始:
from pyspark.sql import types as T, functions as F, window, SparkSession
spark = SparkSession.builder.getOrCreate()
# Synthesize DataFrames
schema = T.StructType([
T.StructField("col_1", T.StringType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("col_3", T.StringType(), False),
T.StructField("col_4", T.IntegerType(), False),
])
data = [
{"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
]
final_data = []
# Scale up a bit
for _ in range(10):
final_data += data
df = spark.createDataFrame(final_data, schema)
df.show()
假设我们想要 Hive 拆分的列是col_1
,并且我们希望每个文件的每个值 5 行col_1
。
ROWS_PER_FILE = 5
# Per value in our Hive split, how many rows are there?
split_counts = df.groupBy("col_1").agg(F.count("col_1").alias("rows_in_this_split"))
# Add these counts to the main df
df_with_counts = df.join(split_counts, on="col_1")
df_with_index = df_with_counts.withColumn( # Determine the number of files...
"num_files_unrounded",
F.col("rows_in_this_split") / F.lit(ROWS_PER_FILE)
).withColumn( # Make this number of files the nearest int...
"num_files",
F.round(
F.ceil(
F.col("num_files_unrounded")
),
0
).cast("int")
).withColumn(
"file_index", # Pick a random value between 0 and the number of files....
F.rand() * F.col("num_files")
).withColumn(
"index", # Round to the nearest int
F.round(
F.floor(
F.col("file_index")
),
0
).cast("int")
)
df_with_index.show()
"""
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
|col_1|col_2| col_3|col_4|rows_in_this_split|num_files_unrounded|num_files| file_index|index|
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.92294281966342| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.7701823230466494| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.7027155114438342| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.2386678474259014| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.983665114675822| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 0.9674556368778833| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 1.0727574871222592| 1|
|key_1| 1|CREATE| 0| 10| 2.0| 2|0.07142743481376246| 0|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 1.0401870580895767| 1|
|key_1| 1|CREATE| 0| 10| 2.0| 2| 1.0915212267807561| 1|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 0.5097131383965849| 0|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 0.1837310991545238| 0|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 0.3142077066468343| 0|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 1.330191792519476| 1|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 1.5802012613480614| 1|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 1.1701764577368479| 1|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 0.9786522146923651| 0|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 0.5304094894753706| 0|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 1.2317743611604448| 1|
|key_2| 2|CREATE| 0| 10| 2.0| 2| 1.867430955808408| 1|
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
"""
现在我们知道每行属于哪个文件索引,我们现在需要在写出之前重新分区。
split_counts = df_with_index.groupBy("col_1", "index").agg(F.count("*").alias("row_count")).orderBy("col_1", "index") # Show the counts per unique combination of hive split column and file index
split_counts.show()
"""
+-----+-----+---------+
|col_1|index|row_count|
+-----+-----+---------+
|key_1| 0| 7|
|key_1| 1| 3|
|key_2| 0| 5|
|key_2| 1| 5|
|key_3| 0| 5|
|key_3| 1| 5|
+-----+-----+---------+
"""
number_distinct_splits = split_counts.count() # This number of unique combinations is what we will repartition into
final_write_df = df_with_index.repartition(number_distinct_splits, ["col_1", "index"])
现在,在写出时,请确保您的写选项包括partition_cols=["col_1"]
,瞧!
我强烈建议您阅读这篇文章,以确保您在写出之前准确理解为什么需要分区