2

我想对我的数据集进行 Hive 分区,但我不太清楚如何确保拆分中的文件数是合理的。我知道我应该大致针对大小为 128MB 的文件

如何安全地缩放和控制 Hive 分区数据集文件内的行数?

4

1 回答 1

1

对于这个答案,我假设您已经正确理解了应该和不应该进行 Hive 样式分区的原因,并且不会涵盖支持理论。

在这种情况下,重要的是要确保我们不仅正确计算拆分中所需的文件数量,而且还要根据这些计算重新分区我们的数据集在写出 Hive 风格的分区数据集之前未能进行重新分区可能会导致您的工作尝试写出数百万个小文件,这会影响您的性能。

在我们的例子中,我们将使用的策略是创建N每个文件最多包含行的文件,这将限制每个文件的大小。我们不能轻易地限制拆分中每个文件的确切大小,但我们可以使用行数作为一个很好的近似值。

我们将用于实现此目的的方法是创建一个合成列,该列描述一行将属于哪个“批次”,在 Hive 拆分列此合成列上重新分区最终数据集,并在写入时使用此结果。

为了确保我们的合成列指示行所属的正确批次,我们需要确定每个 hive 拆分内的行数,并将此拆分内的行“散布”到正确数量的文件中。

总体而言,该策略将如下所示:

  1. 确定每个 Hive 值的行数
  2. 将这个计数加入主数据框
  3. 通过将每个拆分的行数除以每个文件的行数来确定拆分中的文件数
  4. 在 0 和文件计数之间创建随机索引,本质上是“挑选”该行所属的文件
  5. 计算 Hive 拆分列和我们的合成列的唯一组合数
  6. 将 Hive 列和合成列上的输出数据集重新分区为唯一组合的数量。即每个组合一个文件,正是我们想要的

让我们从考虑以下数据集开始:

from pyspark.sql import types as T, functions as F, window, SparkSession

spark = SparkSession.builder.getOrCreate()


# Synthesize DataFrames
schema = T.StructType([
  T.StructField("col_1", T.StringType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("col_3", T.StringType(), False),
  T.StructField("col_4", T.IntegerType(), False),
])
data = [
  {"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
  {"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
  {"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
]

final_data = []
# Scale up a bit
for _ in range(10):
    final_data += data

df = spark.createDataFrame(final_data, schema)
df.show()

假设我们想要 Hive 拆分的列是col_1,并且我们希望每个文件的每个值 5 行col_1

ROWS_PER_FILE = 5

# Per value in our Hive split, how many rows are there?
split_counts = df.groupBy("col_1").agg(F.count("col_1").alias("rows_in_this_split"))

# Add these counts to the main df
df_with_counts = df.join(split_counts, on="col_1")


df_with_index = df_with_counts.withColumn(  # Determine the number of files...
    "num_files_unrounded",
    F.col("rows_in_this_split") / F.lit(ROWS_PER_FILE)
).withColumn(                               # Make this number of files the nearest int...
    "num_files",
    F.round(
        F.ceil(
            F.col("num_files_unrounded")
        ),
        0
    ).cast("int")
).withColumn(
    "file_index",                           # Pick a random value between 0 and the number of files....
    F.rand() * F.col("num_files")
).withColumn(
    "index",                                # Round to the nearest int
    F.round(
        F.floor(
            F.col("file_index")
        ),
        0
    ).cast("int")
)

df_with_index.show()
"""
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
|col_1|col_2| col_3|col_4|rows_in_this_split|num_files_unrounded|num_files|         file_index|index|
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
|key_1|    1|CREATE|    0|                10|                2.0|        2|   0.92294281966342|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 0.7701823230466494|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 0.7027155114438342|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 0.2386678474259014|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2|  0.983665114675822|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 0.9674556368778833|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 1.0727574871222592|    1|
|key_1|    1|CREATE|    0|                10|                2.0|        2|0.07142743481376246|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 1.0401870580895767|    1|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 1.0915212267807561|    1|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 0.5097131383965849|    0|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 0.1837310991545238|    0|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 0.3142077066468343|    0|
|key_2|    2|CREATE|    0|                10|                2.0|        2|  1.330191792519476|    1|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 1.5802012613480614|    1|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 1.1701764577368479|    1|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 0.9786522146923651|    0|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 0.5304094894753706|    0|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 1.2317743611604448|    1|
|key_2|    2|CREATE|    0|                10|                2.0|        2|  1.867430955808408|    1|
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
"""

现在我们知道每行属于哪个文件索引,我们现在需要在写出之前重新分区。

split_counts = df_with_index.groupBy("col_1", "index").agg(F.count("*").alias("row_count")).orderBy("col_1", "index")  # Show the counts per unique combination of hive split column and file index
split_counts.show()
"""
+-----+-----+---------+
|col_1|index|row_count|
+-----+-----+---------+
|key_1|    0|        7|
|key_1|    1|        3|
|key_2|    0|        5|
|key_2|    1|        5|
|key_3|    0|        5|
|key_3|    1|        5|
+-----+-----+---------+
"""
number_distinct_splits = split_counts.count()    # This number of unique combinations is what we will repartition into

final_write_df = df_with_index.repartition(number_distinct_splits, ["col_1", "index"])

现在,在写出时,请确保您的写选项包括partition_cols=["col_1"],瞧!

我强烈建议您阅读这篇文章,以确保您在写出之前准确理解为什么需要分区

于 2021-11-08T18:00:13.703 回答