0

跟随 celery chord 上的答案,带有一组链条,链条中有例外

我收到一个错误,Celery 似乎将任务签名链更改为导致以下错误的字典。

芹菜 - 4.4 Redis - 3.3.11

代码- 来自@bigzbig

@celery_app.task
def task_one():
    return 'OKIDOKI'

@celery_app.task
def task_two(str):
    return f'{str} YOUPI'

@celery_app.task
def task_three(str):
    return f'{str} MAKAPAKA'

@celery_app.task
def task_exception(str):
    raise KeyError(f'{str} Ups')

@celery_app.task(ignore_result=True)
def task_wrapper(*args, **kwargs):
    if 'job' in kwargs:
        kwargs['job'].apply()

@celery_app.task(ignore_result=True)
def callback_task(*args, **kwargs):
    return (args, kwargs, 'Yeah')

def test():
    chains = []

    tasks = [
        task_one.s(),
        task_two.s(),
        task_exception.s(),
        task_three.s(),
    ]
    chains.append(task_wrapper.s(job=chain(*tasks)))

    tasks = [
        task_one.s(),
        task_two.s(),
        task_three.s(),
    ]
    chains.append(task_wrapper.s(job=chain(*tasks)))

    chord(chains, callback_task.s()).apply_async()

kwargs的打印['job']

celeryworker2_1  | [2020-03-23 22:31:01,646: WARNING/ForkPoolWorker-1] {'task': 'celery.chain', 'args': [], 'kwargs': {'tasks': [{'task': 'portfolio.tasks.task_one', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_two', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_exception', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_three', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False, 'chord_size': None}]}, 'options': {}, 'subtask_type': 'chain', 'immutable': False, 'chord_size': None}
celeryworker_1   | [2020-03-23 22:31:01,650: WARNING/ForkPoolWorker-1] {'task': 'celery.chain', 'args': [], 'kwargs': {'tasks': [{'task': 'portfolio.tasks.task_one', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_two', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_three', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False, 'chord_size': None}]}, 'options': {}, 'subtask_type': 'chain', 'immutable': False, 'chord_size': None}

错误

Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 650, in __protected_call__
return self.run(*args, **kwargs)
File "/app/portfolio/tasks.py", line 241, in task_wrapper
kwargs['job'].apply()
AttributeError: 'dict' object has no attribute 'apply'
4

1 回答 1

0

您正在尝试将签名传递给另一个任务。所以 Celery 将其转换为 dict。您可以从 dict 构建签名。

task_always_eager=True ”设置为在同一进程下运行,而不是作为不同的 celery 任务,因为执行链本身是不同的任务。这样,您将保留链接或 link_error 是任何给定的。

from celery.canvas import Signature
callback = Signature(kwargs['job'])
callback.delay(task_always_eager=True)
于 2020-08-20T15:58:15.797 回答