1

与前面描述的问题类似,我遵循了spaceflights 教程,在创建管道步骤中,运行时出现以下错误kedro run --node=preproces_companies_node

ValueError: Pipeline does not contain nodes named ['preprocess_companies_node'].

相关文件按照教程中的说明指定

  • src/kedro_tutorial/pipelines/data_processing/pipeline.py
from kedro.pipeline import Pipeline, node

from .nodes import preprocess_companies, preprocess_shuttles

def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies",
                outputs="preprocessed_companies",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles_node",
            ),
        ]
    )
  • src/kedro_tutorial/pipelines/data_processing/nodes.py
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for companies.

    Args:
        companies: Raw data.
    Returns:
        Preprocessed data, with `company_rating` converted to a float and
        `iata_approved` converted to boolean.
    """
    companies["iata_approved"] = _is_true(companies["iata_approved"])
    companies["company_rating"] = _parse_percentage(companies["company_rating"])
    return companies


def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for shuttles.

    Args:
        shuttles: Raw data.
    Returns:
        Preprocessed data, with `price` converted to a float and `d_check_complete`,
        `moon_clearance_complete` converted to boolean.
    """
    shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
    shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
    shuttles["price"] = _parse_money(shuttles["price"])
    return shuttles
  • src/kedro_tutorial/pipeline_registry.py
from typing import Dict

from kedro.pipeline import Pipeline

from kedro_tutorial.pipelines import data_processing as dp


def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipeline.

    Returns:
    A mapping from a pipeline name to a ``Pipeline`` object.

    """
    data_processing_pipeline = dp.create_pipeline()

    return {
        "__default__": data_processing_pipeline,
        "dp": data_processing_pipeline,
    }

我确保我已经注册了一个__default__管道并且我的节点名称与命令运行的完全相同preprocess_companies_node

我的 Kedro 版本是 0.16.6,python 版本是 3.7.10

知道我在这里做错了什么吗?

谢谢你。

4

1 回答 1

3

问题是您0.17.3+在使用kedro==0.16.6. 这是一个容易犯的错误,不要担心。该pipeline_registry.py模块是在0.17.3. 您的选择是升级到最新的 kedro 版本,或者将您的注册管道放在一个名为hooks.py而不是pipeline_registry.py.

# src/<project_name>/hooks.py
"""Project hooks."""
from typing import Any, Dict, Iterable, Optional

from kedro.config import ConfigLoader
from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.versioning import Journal

from sixteen.pipelines import data_engineering as de
from sixteen.pipelines import data_science as ds


class ProjectHooks:
    @hook_impl
    def register_pipelines(self) -> Dict[str, Pipeline]:
        """Register the project's pipeline.

        Returns:
            A mapping from a pipeline name to a ``Pipeline`` object.

        """
        data_engineering_pipeline = de.create_pipeline()
        data_science_pipeline = ds.create_pipeline()

        return {
            "de": data_engineering_pipeline,
            "ds": data_science_pipeline,
            "__default__": data_engineering_pipeline + data_science_pipeline,
        }

    @hook_impl
    def register_config_loader(self, conf_paths: Iterable[str]) -> ConfigLoader:
        return ConfigLoader(conf_paths)

    @hook_impl
    def register_catalog(
        self,
        catalog: Optional[Dict[str, Dict[str, Any]]],
        credentials: Dict[str, Dict[str, Any]],
        load_versions: Dict[str, str],
        save_version: str,
        journal: Journal,
    ) -> DataCatalog:
        return DataCatalog.from_config(
            catalog, credentials, load_versions, save_version, journal
        )


project_hooks = ProjectHooks()

您可以通过针对此版本运行kedro new命令为自己生成此版本的完整示例。

# these bash two commands are safe to run outside of a virtual environment
# pipx creates the virtual environment for you
pip install pipx
pipx run --spec kedro==0.16.6 kedro new

对我来说,您的其余代码看起来像是有效的0.16.6kedro。一旦你pipeline_registry搬进去hooks,你可以确认它与kedro pipeline list命令一起工作,以确保 kedro 正在获取你的管道代码。

于 2021-07-20T03:26:48.963 回答