2

我在 Django REST Framework 的序列化程序中有一个被覆盖的update方法
在此update,由于用户可以发送很多孩子,我有一个异步芹菜任务process_children来处理孩子。

class MyModelSerializer(serializers.ModelSerializer):
    ....

    @transaction.atomic
    def update(self, mymodel, validated_data):
        try:
            children_data = validated_data.pop('children')
            transaction.on_commit(lambda: process_children.apply_async(
                countdown=1,
                args=[mymodel.id, children_data]))
        except KeyError:
            pass
        ...

在 args 中,有一个参数不是json对象而是OrderedDict: children_data

任务看起来像:

@app.task
def process_children(mymodel_id, children_data):
    mymodel = MyModel.objects.get(pk=mymodel_id)
    children = mymodel.children.all()
    for child_data in children_data:
        try:
            child = children.get(start=child_data['start'])
            child = populate_child(child, child_data)
            child.save()
        except Child.DoesNotExist:
            create_child(mymodel, child_data)

我读到我们应该只发送json(或泡菜,yaml,无论如何......)args。

  • 但是这个设置似乎有效
  • 我什至可以发送datetime对象(即start我在任务中使用的属性来匹配存储的孩子与通过 api 发送的新值)。

那么这里发生了什么?

  • 一切正常吗,celery 像老大一样对 OrderedDict 进行序列化和反序列化。
  • 或者我疯了,应该在调用任务之前序列化并在任务内部反序列化?

[更新,添加芹菜设置]

CELERY_BROKER_URL = get_env_variable('REDIS_URL')
CELERY_BROKER_POOL_LIMIT = 0
CELERY_REDIS_MAX_CONNECTIONS = 10
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'
4

2 回答 2

0

您正在使用 pickle 序列化程序,它可以相对较好地处理对象,但存在一些问题。这是一篇关于序列化和芹菜概念的博客文章。

于 2018-11-07T04:47:22.343 回答
0

是的,你做对了。

文档中所述。

在客户端和工作人员之间传输的数据需要序列化,因此 Celery 中的每条消息都有一个 content_type 标头,描述了用于对其进行编码的序列化方法。

此外,从 celery 4.0 开始,默认序列化程序是 JSON(pickle更早)。因此,每当您调用此任务时,celery 默认会对其进行序列化和反序列化。如果您想使用任何其他序列化程序,那么在调用任务时您需要指定content-type(如果您正在使用,.delay那么默认情况下序列化程序将是json.

process_children.apply_async((model_id, children_data), serializer='pickle')
于 2018-11-08T05:31:23.710 回答