0

https://kedro.readthedocs.io/en/stable/kedro.pipeline.node.Node.html#kedro.pipeline.node.Node.inputs

I have a function

def function(**kwargs): return

How can I pass variable to it as a node inputs?

**inputs**
Return node inputs as a list, in the order required to bind them properly to the node’s function. If the node’s function contains kwargs, then kwarg inputs are sorted alphabetically (for python 3.5 deterministic behavior).
4

3 回答 3

1

使用装饰器工厂函数可以解决此问题。在这个例子中使用 python3。

您可以从funtools包中导入包装装饰器


    from functools import wraps

    def deco_factory_df_params(
        *deco_args,
        **deco_kwargs,
    ) -> Callable:
        def deco_fn_df_params(
                func: Callable,
        ) -> Callable:

            @wraps(func)
            def add_df_params(*args, **kwargs):
                kwargs = {**kwargs, **deco_kwargs}
                return func(*args, *deco_args, **kwargs)

            return add_df_params

        return deco_fn_df_params

并在调用 kedro 管道中的节点时将您选择的关键字参数传递给此装饰工厂函数。例如。


    node(
        func=deco_factory_df_params(
            parameter1="value1",
            parameter2="value2",
            parameter3="value3",
        )(your_function_name),
        inputs="data_input1",
        outputs=None,
        name="name_of_node",
    ),

您的节点(函数)定义可能如下所示


    def your_function_name(
        df_input: pyspark.sql.DataFrame,
        parameter1: str,
        parameter2: str,
        parameter3: str,
        **kwargs,  # this is important to be here to avoid error
    ) -> None:
        function_code_here
        
        return None

@mediumnok:我希望这对您的问题有用。

于 2020-08-31T20:01:04.770 回答
0

发送参数时使用该键。

例如,如果要发送STRINGTOPASS,请使用带有前缀的可拆分键input_

...node(
func=function_to_send_parameter,
inputs={"input_STRINGTOPASS": "valid_data_catalog_eg_dataframe_item", ...},
outputs="another_valid_data_catalog_item",
name="pass_parameter_node"), ...

然后,在您的函数中,STRINGTOPASS从以下内容input_中提取:

def function_to_send_parameter(**kwargs):

    # parse input
    for key, value in kwargs.items():
        # parse dataframe and string
        if key.startswith("input_"):
            df = value
            input_string = key.split("input_")[1]
            print("Received string: ", input_string)
    # parse other parameters
    ...
于 2021-11-18T06:08:12.660 回答
0

您将需要node使用kedro.pipeline. 您可以使用字典将 kwargs 传递给函数。

from kedro.pipeline input Pipeline, node

def my_kwarg_fun(**kwargs):
    print(kwargs)

def create_pipeline():
    return Pipeline([
        node(
            my_kwarg_fun,
            inputs={
                "my_kwarg": "example_my_kwarg"
            },
            outputs=None
        )
    ])
# This should print
# {'example_my_kwarg': ...}
于 2020-08-31T11:32:42.217 回答