0

我有一个非常耗时的任务(图像处理),它从给 FastAPI 端点的请求中接收一些输入数据。为了让呼叫者保持响应,我需要发送一个即时响应消息,如“ok”以及 201 状态代码(后者可选)。

到目前为止,我一直在使用这个:

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def main_process(parameters)
...some long task

@app.post('/task')
async def do_task(reference_id: int,
              bucket: str,
              document_url: str,
              return_url: str,
              background_tasks: BackgroundTasks):

    background_tasks.add_task(main_process, bucket, document_url, reference_id, return_url)
    return 'ok'

每个main_process任务从 S3 中的存储桶下载图像,然后进行一些处理。上面显示的解决方案可以正常工作,直到达到 10 个异步处理的图像(给定 async def)然后它崩溃。

我也尝试过增加一些 gunicorn 参数,比如max-requests增加到 100,如下所示:

gunicorn api:app -b 0.0.0.0:8000 -w 4 -k uvicorn.workers.UvicornWorker --preload --max-requests 100 --daemon

这给了我更多处理空间(另外 20 张图像),但它还是崩溃了。

我也考虑过使用 Celery 或一些分布式任务队列解决方案,但我想让事情尽可能简单。

由于异步行为并不重要,但即时响应很重要,是否可以切换到同步解决方案但立即获得“ok”响应?

4

1 回答 1

1

不,您必须真正分派任务并将其委托给某个处理后端。这样的后端可以非常简单,例如只是一个任务队列(celery/amqp、redis、一个关系数据库,任何适合您的需求)和至少一个消耗该队列的进程,执行计算并将结果反馈到存储中。

当您从 API 分派请求时,同时生成一个 UUID 并将其与您的计算作业一起存储在队列中。当您向呼叫者反馈您的快速 200 OK 时,还要向他们提供他们工作的 UUID(如果需要)。他们会再次访问您的 API 查询结果;让他们提供 UUID 并使用它在您的存储后端查找结果。

为了避免两次计算相同的请求,请从请求中生成一个散列并使用它而不是 UUID(注意冲突,您需要一些更长的散列)。只要您不必处理用户/图像权限,这很容易。

于 2020-03-25T22:57:43.857 回答