0

我正在尝试从我创建的目录文件中获取数据集列表,并将它们作为单个节点的输入传递以组合它们并最终使用 kedro-airflow 插件在气流上运行管道

这适用于 kedro run 的 cli,但似乎在气流中失败,我不知道为什么:

#my_pipeline/pipeline.py
def create_pipeline(**kwargs):
      conf_loader = ConfigLoader(['conf/base'])
      conf_catalog = conf_loader.get('catalog-a*')

      datasets = [key for key, value in conf_catalog.items()] 
      return Pipeline([
           node(
            func=combine_data,
            inputs=datasets,
            outputs="combined_data",
            name="combined_data"
        ),
        ...#other nodes
      ])

我在气流上遇到的错误看起来像这样: Broken dag:给定的配置路径不存在或不是有效目录:'conf/base'

这肯定是 Kedro 配置加载程序错误,但我似乎无法弄清楚为什么在通过气流运行管道时会发生唯一错误。从我一直在阅读的内容来看,不建议在代码 API 中混合。这是传入数据集列表的正确方法吗?

编辑

我的目录基本上是一个 Sql 查询数据集的列表:

dataset_1:
  type: pandas.SQLQueryDataSet
  sql: select * from my_table where created_at >= '2018-12-21 16:00:00' and partner_id=1
  credentials: staging_sql

dataset_2:
  type: pandas.SQLQueryDataSet
  sql: select * from my_table where created_at >= '2019-08-15 11:55:00' and partner_id=2
  credentials: staging_sql
4

1 回答 1

0

我认为它可能会失败,因为 kedro run 是从它的根目录运行它,它可以找到 conf/base 但 create_pipeline 函数在my_pipeline目录下,所以 kedro ConfigLoader 找不到它。我认为我过去这样做的另一种方式是,catalog: DataCatalog像这样通过:

def create_pipeline(catalog: DataCatalog = None, * *kwargs) -> Pipeline:

然后你可以迭代或做:

datasets = catalog.datasets.

于 2020-09-24T15:23:19.330 回答