4

我想在我的转换为 Foundry 时设置测试,通过测试输入并检查输出是否是预期的。是否可以使用虚拟数据集(存储库中的 .csv 文件)调用转换,或者我应该在转换中创建函数以供测试调用(在代码中创建的数据)?

4

1 回答 1

4

如果你在Code Repositories-> Python Transforms->下查看你的平台文档Python Unit Tests,你会发现那里有很多有用的资源。

特别是关于编写和运行测试的部分是您正在寻找的。


// 开始文档

编写测试

完整的文档可以在https://docs.pytest.org找到

Pytest 在任何以 test_ 开头的 Python 文件中查找测试。建议将你所有的测试放到你项目的 src 目录下的一个测试包中。测试只是 Python 函数,它们也以 test_ 前缀命名,并且使用 Python 的 assert 语句进行断言。PyTest 还将运行使用 Python 的内置 unittest 模块编写的测试。例如,在 transforms-python/src/test/test_increment.py 中,一个简单的测试如下所示:

def increment(num):
     return num + 1

def test_increment():
     assert increment(3) == 5

运行此测试将导致检查失败,并显示如下所示的消息:

============================= test session starts =============================
collected 1 item

test_increment.py F                                                       [100%]

================================== FAILURES ===================================
_______________________________ test_increment ________________________________

    def test_increment():
>       assert increment(3) == 5
E       assert 4 == 5
E        +  where 4 = increment(3)

test_increment.py:5: AssertionError
========================== 1 failed in 0.08 seconds ===========================

使用 PySpark 进行测试

PyTest 固定装置是一项强大的功能,只需添加同名参数即可将值注入测试函数。此功能用于提供 spark_session 固定装置以在您的测试功能中使用。例如:

def test_dataframe(spark_session):
    df = spark_session.createDataFrame([['a', 1], ['b', 2]], ['letter', 'number'])
    assert df.schema.names == ['letter', 'number']

// 结束文档


如果您不想在代码中指定模式,您还可以按照How To->下文档中的说明读取存储库中的文件Read file in Python repository


// 开始文档

读取 Python 存储库中的文件

您可以将存储库中的其他文件读取到转换上下文中。这可能有助于为您的转换代码设置参数以供参考。

首先,在您的 python 存储库中编辑 setup.py:

setup(
    name=os.environ['PKG_NAME'],
# ...
    package_data={
        '': ['*.yaml', '*.csv']
    }
)

这告诉 python 将 yaml 和 csv 文件捆绑到包中。然后在您的 python 转换旁边放置一个配置文件(例如 config.yaml,但也可以是 csv 或 txt)(例如 read_yml.py 见下文):

- name: tbl1
  primaryKey:
  - col1
  - col2
  update:
  - column: col3
    with: 'XXX'

您可以read_yml.py使用以下代码在转换中读取它:

from transforms.api import transform_df, Input, Output
from pkg_resources import resource_stream
import yaml
import json

@transform_df(
    Output("/Demo/read_yml")
)
def my_compute_function(ctx):
    stream = resource_stream(__name__, "config.yaml")
    docs = yaml.load(stream)
    return ctx.spark_session.createDataFrame([{'result': json.dumps(docs)}])

所以你的项目结构是:

  • 一些文件夹
    • 配置.yaml
    • read_yml.py

这将在您的数据集中输出一行,其中包含一列“结果”和内容:

[{"primaryKey": ["col1", "col2"], "update": [{"column": "col3", "with": "XXX"}], "name": "tbl1"}]

// 结束文档

于 2020-10-20T13:58:50.107 回答