2

我的数据集有 20000 个文件,每个文件都非常小。我将如何减少文件数量以及最佳数量是多少?

4

2 回答 2

2

最直接的方法是在转换结束时显式地执行一个repartition()(或者coalesce()如果分区计数从原始数量严格减少)。

这需要是您返回/写出结果之前的最后一次调用。

这看起来像:

# ...

@transform_df(
  # ... inputs
)
def my_compute_function(my_inputs):
  # ... my transform logic ...

  df = df.coalesce(500) 
  # df = df.repartition(500) # this also works but is slightly slower than coalesce
  return df

这是称为“桶”的前导步骤,以供参考。

存储桶的最佳数量取决于您操作的数据规模。通过在成功构建后观察磁盘上数据集的总大小来计算最佳存储桶数有点简单。

如果您的数据集大小为 128GB,您最终会希望得到 128MB 的文件,因此您的存储桶数为:

128 GB * (1000 MB / 1 GB) * (1 file / 128MB) -> 1000 files

注意:这不是一个精确的计算,因为由于 Snappy + Parquet 写出中使用的数据压缩,更改存储桶计数后您的最终数据集大小会有所不同。您会注意到文件大小与您预期的略有不同,因此上例中可能需要 1100 或 900 个文件

于 2020-11-02T18:50:42.793 回答
1

由于这是一个我不得不多次解决的问题,因此我决定编写一份更详细的指南,其中包含一系列不同的技术、优缺点以及存在的理由。

为什么要减少文件数?

避免使用包含许多文件的数据集有几个很好的理由:

  • 读取性能可能更差。当数据分散在许多小文件中时,轮廓(分析)等应用程序的性能可能会受到严重影响,因为执行程序必须承受从支持文件系统下载许多小文件的开销。
  • 如果后备文件系统是 HDFS,许多小文件会增加 hadoop 名称节点和 gossip 协议的堆压力。HDFS 不能很好地处理许多小文件,因为它不会对文件系统中的文件列表进行流式处理/分页,而是构造包含所有文件的完整枚举的消息。当您在 HDFS 中有数千万甚至数亿个文件系统对象时,这最终会遇到名称节点 RPC 消息大小限制(您可以在配置中增加)和可用堆内存(您可以在配置中增加) ...如果您有更多可用内存。)节点间通信变得越来越慢。
  • 转换变得更慢,因为(目前甚至对于增量转换)驱动程序线程必须从目录中检索当前视图中所有文件的完整列表,以及事务的元数据和出处(这只是切线相关的,但这并不罕见许多文件与许多事务相关)
  • 转换可以 OOM 驱动程序,因为文件集和事务集在某些时间点保存在内存中。这可以通过为驱动程序分配更大的内存配置文件来解决——但这会增加成本和/或减少可用于其他管道的资源。

为什么我们首先会在数据集中得到很多文件?

最终得到包含许多文件的数据集通常是由以下三个原因之一引起的:

  • 摄取许多小文件的文件摄取
  • 产生许多小文件的(不良行为)转换。每次执行 spark 中的宽操作时,都会发生改组。例如,当groupBy执行 a 时(这意味着 shuffle),spark 默认选择将数据重新分区到 200 个新分区中,这对于增量转换来说太多了。由于分区错误(下面讨论),转换也可能产生过多的输出文件。
  • 增量运行且频繁运行的管道。每次管道运行并处理一段(通常是小)数据时,都会在每个数据集上创建一个新事务,每个数据集至少包含一个文件。

接下来,我将列出我所知道的减少数据集中文件数的所有方法,以及它们的优缺点,以及适用时的一些特征。

摄取时(magritte 变压器)

最好的选择之一是首先避免有很多文件。当从类似文件系统的源中摄取许多文件时,像“连接转换器”这样的 magritte 转换器可能有助于将许多 CSV、JSON 或 XML 文件组合成一个文件。在适用时,连接然后应用 gzip 转换器是一种特别有效的策略,因为它通常会将 XML 和类似文本格式的大小减少 94% 左右。

主要限制是,要应用它,您需要

  • 每当摄取运行时都有多个文件可用(因此对于在频繁更新数据源时非常频繁地运行的摄取效果不佳)
  • 有一个数据源,为您提供可以连接的文件

也可以将许多文件压缩成更少的文件(使用 .tar.bz2、.tar.gz、.zip、.rar 等格式),但这随后需要知道这种文件格式的下游转换并手动解包(文档中提供了一个示例),因为代工厂无法透明地提供这些档案中的数据。但是,没有预制的 magritte 处理器可以执行此操作,并且在我应用此技术的情况下,我使用 bash 脚本在摄取之前执行此任务,这显然不太理想。

后台压缩

Foundry 中有一种新机制,可以将您写入的数据集与读取的数据集分离。本质上,有一个后台作业正在运行,它在您附加文件时将文件打乱到优化的索引中,因此数据集的读取可以(大部分)进入这个优化的索引,而不是作者留下的(通常有些随意的)数据布局。

这有很多好处(比如自动生成针对最常见读取模式优化的数据布局),其中之一是它可以在后台“压缩”您的数据集。

从这样的数据集读取时,您的读取基本上会命中索引以及输入数据集(其中包含尚未被后台进程合并到索引中的任何文件。)

最大的优势是这会在后台自动发生,并且无论您的数据摄取或转换有多混乱,您都可以简单地写出数据(在写入时不会受到性能影响并将数据尽快发送给消费者),同时仍然结束用很少的文件(最终)得到一个很好的分区数据集。

这里的主要限制是,这仅适用于 spark 可以本机理解的格式的数据集,例如 parquet、avro、json、csv ......在摄取之前将它们打包成例如镶木地板。这样,铸造厂仍然可以随着时间的推移合并多个这些拼花文件。

此功能尚不完全可供最终用户使用(但计划默认为所有内容启用。)如果您认为这是您的管道之一最理想的解决方案,您的 palantir POC 可以与团队一起开票启用此功能。

重新分区和合并

Coalescing 是 spark 中的一种操作,可以减少 partition 的数量而不会产生广泛的依赖(spark 中唯一的这种操作)。合并很快,因为它最大限度地减少了洗牌。与以前的 spark 版本相比,它的工作原理已经发生了变化(并且那里有很多相互矛盾的信息),但它通常比repartition. 但是,它有一个很大的警告:它会降低整个 transform 的并行性

即使您coalesce在写入数据之前的最后一刻,spark 也会调整整个查询计划以使用更少的分区,从而减少使用的执行程序,这意味着您获得的并行性更少。

重新分区是类似的,但它插入了一个完整的洗牌阶段。这会带来更高的性能成本,但这意味着从这个阶段出来的数据基本上可以保证是良好分区的(不管输入)。虽然repartition它本身有点昂贵,但它不会受到在整个转换过程中降低并行度的问题的影响。

这意味着总体而言,如果您最终写出的数据量与您之前在其上所做的工作量相比并没有那么大,那么您通常会在使用repartitionover时获得更好的性能,因为能够在更多执行器上处理数据coalesce最终超过了洗牌的缺点。根据我的经验,repartition除非您的转换非常简单,否则通常会在这里获胜。

一个值得讨论的特定用例是增量管道。如果您的增量管道相对简单并且只执行映射和过滤等操作,那么执行 acoalesce就可以了。然而,许多增量管道也会读取非常大数据集的快照视图。例如,增量管道可能会接收一行新数据,并读取整个先前的输出数据集(可能数百万行),因此查看输出数据集中是否已存在该行。如果已经存在,则不发出任何行,如果不存在,则追加该行。将一小部分增量数据与大型静态数据集等相结合时会发生类似的情况。

在这种情况下,转换是增量的,但它仍然受益于高并行性,因为它仍然处理大量数据。

我的粗略指导方针是:

  • 转换作为快照运行:repartition到一个合理的数字
  • transform 增量运行,不需要高并行度:coalesce(1)
  • transform 以增量方式运行,但仍受益于并行性:repartition(1)

如果写入速度/流水线延迟非常重要,那么这些选项都不能接受。在这种情况下,我会考虑背景压缩。

定期快照

作为前一点的扩展,为了保持增量管道的高性能,我喜欢在它们上安排定期快照,这允许我每隔一段时间重新分区数据集,执行基本上是“压缩”的操作。

我已经描述了如何在此处进行设置的机制:如何强制增量 Foundry Transforms 作业以非增量方式构建而不影响语义版本?

我通常会在周末安排快照。在一周内,管道中的每个数据集(可能有数百个数据集)将累积数千或数万个事务和文件。然后在周末,随着计划的快照在管道中滚动,每个数据集将被重新分区为一百个文件。

空气质量工程师

不久前,AQE 可用于代工厂。AQE 本质上(出于本讨论的目的)将coalesce操作注入到您已经进行随机操作的阶段,这取决于先前操作的结果。这通常会改善分区(从而改善文件计数),但据报道在极少数情况下也会使情况变得更糟(但我自己没有观察到这一点)。

默认情况下启用 AQE,但如果您想尝试禁用它,可以将火花配置文件应用于您的转换。

分桶和分区

分桶和分区与这个讨论有些相切,因为它们主要是关于布局数据以优化读取数据的特定方法。这些技术目前都不适用于增量管道。

一个常见的错误是写出由高基数列(例如时间戳)分区的数据集。在具有 1000 万个唯一时间戳的数据集中,这将导致(至少)1000 万个文件出现在输出数据集中。

在这些情况下,应修复转换,并应通过应用保留删除旧事务(包含数百万个文件)。

其他黑客

其他压缩数据集的技巧也是可能的,例如创建“环回”转换来读取先前的输出并将其重新分区,或者手动打开数据集上的事务以重写它。

这些非常hacky,但在我看来是不可取的,应该避免。如今,背景压缩主要以一种更优雅、更可靠、更简洁的方式解决了这个问题。

于 2020-11-04T09:39:06.187 回答