2

我有一个数据集摄取对我的数据行的最新编辑,但它只摄取最近编辑的版本。update_ts(即它在时间戳列上是增量的)。

原表:

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |

更新后的表格:

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |
| key_1       | 1         |
| key_2       | 1         |
| key_1       | 2         |

摄取后,我需要计算所有先前更新的“最新版本”,同时还要考虑任何新的编辑。

这意味着我每次都在进行增量摄取并运行 SNAPSHOT 输出。这对我的构建来说非常慢,因为我注意到每次我想为我的数据计算最新版本时,我都必须查看所有输出行。

交易 n=1(快照):

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |

交易 n=2(追加):

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 1         |
| key_2       | 1         |

我怎样才能使这个“最新版本”计算更快?

4

1 回答 1

2

这是一种可以从分桶中受益的常见模式。

其要点是:根据您的列将输出 SNAPSHOT 写入存储桶,其中完全跳过primary_key了对更大输出进行改组的昂贵步骤。

这意味着您只需要将新数据交换到已经包含您先前历史记录的存储桶中。

让我们从初始状态开始,我们在之前计算的“最新”版本上运行,该版本是一个缓慢的 SNAPSHOT:

- output: raw_dataset
  input: external_jdbc_system
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - APPEND
    - APPEND
- output: clean_dataset
  input: raw_dataset
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - SNAPSHOT
    - SNAPSHOT

如果我们clean_dataset在列上使用分桶写入primary_key单独计算的桶数以适应我们预期的数据规模,我们将需要以下代码:

from transforms.api import transform, Input, Output
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window


@transform(
    my_output=Output("/datasets/clean_dataset"),
    my_input=Input("/datasets/raw_dataset")
)
def my_compute_function(my_input, my_output):

    BUCKET_COUNT = 600
    PRIMARY_KEY = "primary_key"
    ORDER_COL = "update_ts"

    updated_keys = my_input.dataframe("added")
    last_written = my_output.dataframe("current")

    updated_keys.repartition(BUCKET_COUNT, PRIMARY_KEY)

    value_cols = [x for x in last_written.columns if x != PRIMARY_KEY]

    updated_keys = updated_keys.select(
      PRIMARY_KEY,
      *[F.col(x).alias("updated_keys_" + x) for x in value_cols]
    )

    last_written = last_written.select(
      PRIMARY_KEY,
      *[F.col(x).alias("last_written_" + x) for x in value_cols]
    )

    all_rows = updated_keys.join(last_written, PRIMARY_KEY, "fullouter")
    
    latest_df = all_rows.select(
      PRIMARY_KEY,
      *[F.coalesce(
          F.col("updated_keys_" + x),
          F.col("last_written_" + x)
        ).alias(x) for x in value_cols]
    )

    my_output.set_mode("replace")

    return my_output.write_dataframe(
        latest_df,
        bucket_cols=PRIMARY_KEY,
        bucket_count=BUCKET_COUNT,
        sort_by=ORDER_COL
    )

当它运行时,您会在查询计划中注意到,项目在输出上的单步不再包含 exchange,这意味着它不会对数据进行混洗。您现在将看到的唯一交换是在输入上,它需要以与输出格式完全相同的方式分发更改(这是一个非常快速的操作)。

然后将此交换保留到fullouter连接步骤中,然后连接将利用此交换并非常快速地运行 600 个任务。最后,我们通过在与以前相同的列上显式地分桶到相同数量的桶中来维护输出的格式。

注意:使用这种方法,您在每个存储桶中的文件大小会随着时间的推移而增长,并且不会考虑增加存储桶数量以保持大小合适的需要。您最终将使用这种技术达到一个阈值,文件大小超过 128MB,并且您不再有效地执行(修复是提高BUCKET_COUNT值)。

您的输出现在将如下所示:

- output: raw_dataset
  input: external_jdbc_system
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - APPEND
    - APPEND
- output: clean_dataset
  input: raw_dataset
  hive_partitioning: none
  bucketing: BUCKET_COUNT by PRIMARY_KEY
  transactions:
    - SNAPSHOT
    - SNAPSHOT
    - SNAPSHOT
于 2020-10-20T16:01:50.507 回答