所以我找到了解决方法。
首先, chord()(callback) 实际上并没有阻止脚本,只有 .get() 部分会。将所有任务发布到代理可能需要很长时间。幸运的是,有一种简单的方法可以通过信号跟踪此发布过程。我们可以在发布开始之前创建一个进度条,并从文档中修改示例处理程序以更新它:
from tqdm import tqdm
from celery.signals import after_task_publish
publish_pbar = tqdm(total=100, desc="Publishing tasks")
@after_task_publish.connect(sender='tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
publish_pbar.update(1)
c = chord(add.s(i, i)
for i in range(100))(tsum.s())
# The script will resume once all tasks are published so close the pbar
publish_pbar.close()
但是,这仅适用于发布任务,因为此信号在发送任务的信号中执行。task_success 信号是在工作进程中执行的,所以这个技巧只能在工作日志中使用(据我所知)。
因此,为了在所有任务都已发布并且脚本恢复后跟踪进度,我从app.control.inspect().stats()转向工作人员统计信息。这将返回一个包含各种统计信息的字典,其中包括已完成的任务。这是我的实现:
tasks_pbar = tqdm(total=100, desc="Executing tasks")
previous_total = 0
current_total = 0
while current_total<100:
current_total = 0
for key in app.control.inspect().stats():
current_total += app.control.inspect().stats()[key]['total']['tasks.add']
if current_total > previous_total:
tasks_pbar.update(current_total-previous_total)
previous_total = current_total
results = c.get()
tasks_pbar.close()
最后,我认为可能有必要为任务命名,既用于信号处理程序的过滤,也用于 stats() 字典,所以不要忘记将其添加到您的任务中:
#proj/tasks.py
@app.task(name='tasks.add')
def add(x, y):
return x + y
如果有人能找到更好的解决方案,请分享!