1

我有一系列约 30 个数据集,所有这些数据集都需要连接在一起才能形成一个宽泛的决赛桌。这个最终表需要大约 5 年的单个表(每年一个表)并将它们联合在一起,然后将这个完整的历史与其他表的完整历史(类似地联合)加入一个大的、历史的、宽的表。

这些第一个每年表的布局如下:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |

与像这样的其他年份表:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 1    |
| key_2       | 1    |

然后将它们联合在一起以创建:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |
| key_1       | 1    |
| key_2       | 1    |

同样,合并后的第二种类型的表会导致以下结果:

table_type_2:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |
| key_1       | 1    |
| key_2       | 1    |

我现在想加入table_type_1ontable_type_2primary_key产生year一个更宽的表。我注意到这个最终的连接需要长时间并且打乱了很多数据。

我怎样才能让它更快?

4

2 回答 2

0

您可以在每年表上使用primary_keyyear列上的分桶到完全相同数量的桶中,以避免在计算最终连接时进行昂贵的交换。

- output: table_type_1_year_0
  input: raw_table_type_1_year_0
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
- output: table_type_1_year_1
  input: raw_table_type_1_year_1
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
...
- output: table_type_2_year_0
  input: raw_table_type_2_year_0
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
- output: table_type_2_year_1
  input: raw_table_type_2_year_1
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
...
- output: all_tables
  input:
    - table_type_1_year_0
    - table_type_1_year_1
...
    - table_type_2_year_0
    - table_type_2_year_1
...
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)

注意:当您选择BUCKET_COUNT值时,重要的是要了解它应该针对最终all_tables输出进行优化,而不是针对中间表。这意味着您最终可能会得到对于中间表来说非常小的文件。与输出的效率增益相比,这可能无关紧要,all_tables因为在连接所有内容时您不必计算大量交换;您的存储桶将被预先计算,您只需SortMergeJoin在输入文件上即可。

有关如何写出指定数量的存储桶的转换的显式示例,我在此处的回答可能很有用。

于 2020-10-21T15:45:55.213 回答
0

我的建议是:在小型数据集上进行第一个联合,然后广播数据集,第一个联合的结果,spark 将在其不同的节点上部署该数据集,这将减少洗牌的次数。spark上的联合优化得很好,所以你要做的就是考虑拥有:从一开始就只选择你需要的列,避免在联合之前进行任何类型的非成本效益操作,比如groupByKey ...等,因为spark会在进行最终流程时调用这些操作。我建议您避免使用 hive,因为它使用与 spark sql 相比不值得的 map reduce 策略,您可以使用这个函数示例,只需更改密钥,如果可以的话使用 scala,它将直接与 spark 交互:

def map_To_cells(df1: DataFrame, df2: DataFrame): DataFrame = {
val df0= df2.withColumn("key0",F.col("key")).drop("key")
df1.as("main").join(
broadcast(df0),
df0("key0") <=> df("key")
).select( needed columns)
}  
于 2020-10-21T18:22:09.850 回答