2

鉴于输入数据集适合本地机器的内存,在本地开发和测试 python 转换代码的推荐方法是什么?

4

1 回答 1

4

不需要您模拟转换包的最简单方法是将您的逻辑提取到具有 pyspark 函数的纯 python 中,该函数接收数据帧作为输入并返回数据帧。

IE:

# yourtransform.py
from my_business_logic import magic_super_complex_computation

@transform_df(
   Output("/foo/bar/out_dataset"),
   input1=Input("/foo/bar/input1"),
   input2=Input("/foo/bar/input2"))
def my_transform(input1, input2):
   return magic_super_complex_computation(input1, input2)

您现在可以在测试中导入magic_super_complex_computation并使用 pyspark 对其进行测试。

IE:

from my_business_logic import magic_super_complex_computation


def test_magic_super_complex_computation(spark_context):
    df1 = load_my_data_as_df(spark_context, "input1")
    df2 = load_my_data_as_df(spark_context, "input2")

    result = magic_super_complex_computation(input1, input2).collect()
    assert len(result) == 123

请注意,这需要您在 pytest(或您正在使用的任何测试框架)中提供有效的 spark 上下文作为夹具

于 2020-11-06T09:51:18.473 回答