0

我使用 Dask 在我的代码中具有以下结构:

@dask.delayed
def calculate(data):
    services = data.service_id
    prices = data.price
    
    return [services, prices]

output = []

for qid in notebook.tqdm(ids):
    r = calculate(parts[parts.quotation_id == qid])
    output.append(r)

事实证明,当我在列表中调用该dask.compute()方法时output,我没有任何进度指示。诊断 UI 不会“捕获”此操作,我什至不确定它是否正常运行(根据我的处理器使用情况判断,我认为不是)。

result = dask.compute(*output)

我正在关注 dask 文档中的“最佳实践”文章:

https://docs.dask.org/en/latest/delayed-best-practices.html

我错过了什么?

编辑:我认为它正在运行,因为我仍然收到内存泄漏/高使用率警告。仍然没有进展迹象。

4

1 回答 1

1

正如相关帖子中所指出的,dask有两种显示进度的方法:一种用于“正常” dask,一种用于dask.distributed.

这是一个可重现的示例:

import random 
from time import sleep

import dask
from dask.diagnostics import ProgressBar
from dask.distributed import Client, progress

# simulate work
@dask.delayed
def work(x):

    sleep(x)
    return True
    

# generate tasks

random.seed(42)
tasks = [work(random.randint(1,5)) for x in range(50)]

使用普通dask

ProgressBar().register()
dask.compute(*tasks)

产生:

在此处输入图像描述

使用dask.distributed

client = Client()
futures = client.compute(tasks)

progress(futures)

产生:

在此处输入图像描述

于 2021-01-15T16:22:19.827 回答