5

我正在尝试在整个脚本周围包裹一个进度指示器。但是,set_index(..., compute=False)它仍然在调度程序上运行任务,可在 Web 界面中观察到。

如何报告set_index步骤的进度?

import dask.dataframe as dd
from dask.distributed import Client, progress

if __name__ == '__main__':

  with Client() as client:

    df = dd.read_csv('big.csv')

    # I can see on the web interface that something is happening.
    # This blocks 20-30s on this particular CSV.
    df = df.set_index('id', compute=False)

    # Progress reporting works from here
    out = client.compute(
      df
    )
    progress(out)

    # out.result()
    # ...
4

2 回答 2

1

不幸的是,没有简单的方法可以做到这一点。set_index急切地在内部调用compute,因为它需要扫描您的数据以确定新分区的边界应该是什么。无法控制此compute调用并让它使用进度条。如果您认为这值得向用户公开,您可以打开一个问题进一步讨论。

或者(困难的方法),您可以自己计算 DataFrame 的划分(这样做时显示进度条),然后将它们传递给set_index,以防止set_index进行计算。

这里的缺点是你正在复制 dask 已经拥有的代码,并且 dask 的方法在重新分区、检测预排序值和使用快速路径等方面有一些额外的 逻辑。但在简单的情况下,这样的事情可能会起作用(注意这是不使用公共/记录的 API,因此它可以在没有警告的情况下更改):

import dask
from dask.distributed import Client, progress

if __name__ == "__main__":

    with Client() as client:

        df = dask.datasets.timeseries()

        # Calculate divisions manually to show a progress bar
        from dask.dataframe.partitionquantiles import partition_quantiles

        divisions_ddf = partition_quantiles(df["id"], df.npartitions)
        future = client.compute(divisions_ddf)
        print("Computing divisions")
        progress(future)
        divisions = future.result().tolist()

        df = df.set_index("id", divisions=divisions, compute=False)

        out = client.compute(df)
        progress(out)

        # out.result()
        # ...
于 2021-10-07T20:46:22.190 回答
0

dask.distributed 进度条跟踪期货的进度。请参阅此处的文档。

您的.compute电话正在返回具体结果,因此实际上没有要跟踪的期货。

您可以通过使用.persist()call 而不是.compute().

MCVE

import pandas as pd
import dask.dataframe as dd 
from dask.distributed import progress

ddf = dd.read_csv('big.csv')
result = ddf.set_index('id').persist()
progress(result)

这篇文章是一个很好的资源,你可以通过不同的方式检查 Dask 计算的进度。

FWIW 在 Dask Distributed here有一个相关的未解决问题

于 2021-10-05T08:16:48.420 回答