最近发现了 Celery'stask.replace
并试图利用它的力量,我遇到了一个令人费解的情况,根据 Celery 的文档(以及互联网上的各种 Pull-Requests/SO 问题) - 应该可以工作。
假设我有以下代码:
from celery import Celery, group
app = Celery(broker='pyamqp://', backend='redis://localhost:6379/0')
@app.task
def to_char(seq):
return map(chr, seq)
@app.task
def flatten(seqs):
final = list()
for seq in seqs:
final.extend(seq)
return final
@app.task(bind=True)
def combine(self):
sig = group([to_char.s([97]), to_char.s([98, 98])]) | flatten.s()
raise self.replace(sig)
我显然将问题简化了一点——但即使在这种简单的情况下,当我打电话时,combine.delay().get()
我也希望收到[a, b, b]
.
而不是得到一个结果,调用get()
无限期挂起并查看工作日志 - 我可以看到两个调用都to_char
被接收并成功完成但没有调用flatten
.
当我尝试单独执行它时(例如在 Python 控制台中),它可以工作并返回预期的结果,所以问题肯定出在我对以下机制的理解中task.replace
:
g = group([to_char.s([97]), to_char.s([98, 98])])
c = g | flatten.s()
c.delay().get() # -> [a, b, b]
非常感谢谁能阐明这个问题!