0

我正在尝试学习如何使用 Fast API 库。我正在尝试使用frozenset 作为文档状态在帖子中接受一个数组,但它似乎不起作用。

import logging

from fastapi import FastAPI, BackgroundTasks
from worker.celery_app import celery_app

log = logging.getLogger(__name__)

app = FastAPI()


def celery_on_message(body):
    """
    Logs the initiation of the endpoint
    """
    log.warn(body)


def background_on_message(task):
    """
    logs the function when it is added to queue
    """
    log.warn(task.get(on_message=celery_on_message, propagate=False))


@app.get("/")
async def root(stocks: frozenset, background_task: BackgroundTasks):
    """
    :param stocks: stocks  to be analyzed
    :param background_task: initiate the tasks queue
    :type background_task: starlette.background.BackgroundTasks
    :return:
    """
    task_name = None

    # set correct task name based on the way you run the example
    log.log(level=1, msg=f'{stocks}')
    task_name = "app.app.worker.celery_worker.compute_stock_indicators"

    task = celery_app.send_task(task_name, args=[stocks])
    background_task.add_task(background_on_message, task)

    return {"message": "Stocks analyzed"}

当我使用 swagger 文档发送请求时:

curl -X GET "http://127.0.0.1:8000/?stocks=wix&stocks=amzn" -H  "accept: application/json"

回应是:

{
  "detail": [
    {
      "loc": [
        "query",
        "stocks"
      ],
      "msg": "value is not a valid frozenset",
      "type": "type_error.frozenset"
    }
  ]
}
4

1 回答 1

0

我认为有一个小错误

您正在尝试发送POST请求,但您只有一个GET端点

@app.post("/")

这应该可以解决问题,而且,在使用 Type 进行注释时,使用 typing

from typing import FrozenSet


@app.post("/")
async def root(stocks: FrozenSet, background_task: BackgroundTasks):

万维网(W3)现在也支持在GET请求中接收正文,但也非常不鼓励这样做。请参阅W3出版物。OpenAPI在请求中包含了body的信息,接下来的版本会被OpenAPI支持,但是目前还不支持SwaggerUI。

于 2020-09-06T13:49:17.110 回答