1

最近发现了 Celery'stask.replace并试图利用它的力量,我遇到了一个令人费解的情况,根据 Celery 的文档(以及互联网上的各种 Pull-Requests/SO 问题) - 应该可以工作。
假设我有以下代码:

from celery import Celery, group
app = Celery(broker='pyamqp://', backend='redis://localhost:6379/0')

@app.task
def to_char(seq):
    return map(chr, seq)

@app.task
def flatten(seqs):
    final = list()
    for seq in seqs:
        final.extend(seq)
    return final

@app.task(bind=True)
def combine(self):
    sig = group([to_char.s([97]), to_char.s([98, 98])]) | flatten.s()
    raise self.replace(sig)

我显然将问题简化了一点——但即使在这种简单的情况下,当我打电话时,combine.delay().get()我也希望收到[a, b, b].
而不是得到一个结果,调用get()无限期挂起并查看工作日志 - 我可以看到两个调用都to_char被接收并成功完成但没有调用flatten.

当我尝试单独执行它时(例如在 Python 控制台中),它可以工作并返回预期的结果,所以问题肯定出在我对以下机制的理解中task.replace

g = group([to_char.s([97]), to_char.s([98, 98])])
c = g | flatten.s()
c.delay().get() # -> [a, b, b]

非常感谢谁能阐明这个问题!

4

0 回答 0