0

有没有办法跟踪和弦的进度,最好是在 tqdm 栏中?

例如,如果我们以文档为例,我们将创建这个文件:

#proj/tasks.py

@app.task
def add(x, y):
    return x + y

@app.task
def tsum(numbers):
    return sum(numbers)

然后运行这个脚本:

from celery import chord
from proj.tasks import add, tsum

chord(add.s(i, i)
      for i in range(100))(tsum.s()).get()

我们如何跟踪和弦的进行?

  • 我们不能使用 update_state,因为 chord() 对象不是函数。
  • 我们不能使用 collect() 因为 chord()(callback) 在结果准备好之前会阻塞脚本。

理想情况下,我会为 Dask 设想这样的自定义 tqdm 子类,但是我一直无法找到类似的解决方案。

非常感谢任何帮助或提示!

4

1 回答 1

0

所以我找到了解决方法。

首先, chord()(callback) 实际上并没有阻止脚本,只有 .get() 部分会。将所有任务发布到代理可能需要很长时间。幸运的是,有一种简单的方法可以通过信号跟踪此发布过程。我们可以在发布开始之前创建一个进度条,并从文档中修改示例处理程序以更新它:

from tqdm import tqdm
from celery.signals import after_task_publish

publish_pbar = tqdm(total=100, desc="Publishing tasks")

@after_task_publish.connect(sender='tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    publish_pbar.update(1)

c = chord(add.s(i, i)
      for i in range(100))(tsum.s())

# The script will resume once all tasks are published so close the pbar
publish_pbar.close()

但是,这仅适用于发布任务,因为此信号在发送任务的信号中执行。task_success 信号是在工作进程中执行的,所以这个技巧只能在工作日志中使用(据我所知)。

因此,为了在所有任务都已发布并且脚本恢复后跟踪进度,我从app.control.inspect().stats()转向工作人员统计信息。这将返回一个包含各种统计信息的字典,其中包括已完成的任务。这是我的实现:

tasks_pbar = tqdm(total=100, desc="Executing tasks")

previous_total = 0
current_total = 0

while current_total<100:

    current_total = 0
    for key in app.control.inspect().stats():
        current_total += app.control.inspect().stats()[key]['total']['tasks.add']

    if current_total > previous_total:
        tasks_pbar.update(current_total-previous_total)

    previous_total = current_total

results = c.get()
tasks_pbar.close()

最后,我认为可能有必要为任务命名,既用于信号处理程序的过滤,也用于 stats() 字典,所以不要忘记将其添加到您的任务中:

#proj/tasks.py

@app.task(name='tasks.add')
def add(x, y):
    return x + y

如果有人能找到更好的解决方案,请分享!

于 2020-08-13T07:15:45.973 回答