3

我正在使用云任务。只有在任务 A 和任务 B 成功完成后,我才需要触发任务 C 的执行。所以我需要某种方式来阅读/被通知触发的任务状态。但是我在 GCP 的文档中看不到这样做的方法。如果有帮助的话,使用 Node.js SDK 创建任务和 Cloud Functions 作为任务处理程序。

编辑:

根据要求,这里有更多关于我们正在做什么的信息:

任务 1 到 10 每个都发出 HTTP 请求、获取数据、根据这些数据更新 Firestore 中的各个集合。这 10 个任务可以并行运行,并且没有特定的顺序,因为它们彼此之间没有任何依赖关系。所有这些任务实际上都是在 GCF 内部实现的。

任务 11 实际上依赖于任务 1 - 10 更新的 Firestore 集合数据。所以它只能在任务 1 - 10 成功完成后运行。

我们确实发布了一个 RunID 作为通用标识符,以对所有任务 (1 - 11) 的特定运行进行分组。

4

2 回答 2

3

Cloud Task 仅触发任务,您只能定义时间条件。您必须在任务 C 运行时手动编写检查代码。

这里是一个过程的例子:

  • 任务A正在运行,最后,在firestore中写入的任务已完成
  • 任务B正在运行,最后,在firestore中写入的任务已完成
  • 任务 C 启动并检查 A 和 B 是否在 Firestore 中完成。
    • 如果不是,任务错误退出
    • 是,继续流程

您必须自定义 C 任务队列,以便在出现错误时重试任务。

另一个昂贵的解决方案是使用 Cloud Composer 来处理此工作流

目前没有关于工作流管理的其他解决方案。

于 2020-04-15T13:23:30.850 回答
1

在这种情况下,您不想使用 Cloud Tasks 工具。看看Cloud Composer,它内置于 Apache Airflow for GCP 之上。

编辑:您可以创建一个 GCF 来处理这些请求的状态

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

################ TASK A
taskA_list = [
    "https://via.placeholder.com/400",
    "https://via.placeholder.com/410",
    "https://via.placeholder.com/420",
    "https://via.placeholder.com/430",
    "https://via.placeholder.com/440",
    "https://via.placeholder.com/450",
    "https://via.placeholder.com/460",
    "https://via.placeholder.com/470",
    "https://via.placeholder.com/480",
    "https://via.placeholder.com/490",
]

def call2TaskA(url):
    html = requests.get(url, stream=True)
    return (url,html.status_code)


processes = []
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
    for url in taskA_list:
        processes.append(executor.submit(call2TaskA, url))

isOkayToDoTaskB = True
for taskA in as_completed(processes):
    result = taskA.result()
    if result[1] != 200: # your validation on taskA
        isOkayToDoTaskB = False
    results.append(result)

if not isOkayToDoTaskB:
    raise ValueError('Problems: {}'.format(results))

################ TASK B
def doTaskB():
    pass

doTaskB()
于 2020-04-16T12:07:59.073 回答