我想在我的转换为 Foundry 时设置测试,通过测试输入并检查输出是否是预期的。是否可以使用虚拟数据集(存储库中的 .csv 文件)调用转换,或者我应该在转换中创建函数以供测试调用(在代码中创建的数据)?
1 回答
如果你在Code Repositories
-> Python Transforms
->下查看你的平台文档Python Unit Tests
,你会发现那里有很多有用的资源。
特别是关于编写和运行测试的部分是您正在寻找的。
// 开始文档
编写测试
完整的文档可以在https://docs.pytest.org找到
Pytest 在任何以 test_ 开头的 Python 文件中查找测试。建议将你所有的测试放到你项目的 src 目录下的一个测试包中。测试只是 Python 函数,它们也以 test_ 前缀命名,并且使用 Python 的 assert 语句进行断言。PyTest 还将运行使用 Python 的内置 unittest 模块编写的测试。例如,在 transforms-python/src/test/test_increment.py 中,一个简单的测试如下所示:
def increment(num):
return num + 1
def test_increment():
assert increment(3) == 5
运行此测试将导致检查失败,并显示如下所示的消息:
============================= test session starts =============================
collected 1 item
test_increment.py F [100%]
================================== FAILURES ===================================
_______________________________ test_increment ________________________________
def test_increment():
> assert increment(3) == 5
E assert 4 == 5
E + where 4 = increment(3)
test_increment.py:5: AssertionError
========================== 1 failed in 0.08 seconds ===========================
使用 PySpark 进行测试
PyTest 固定装置是一项强大的功能,只需添加同名参数即可将值注入测试函数。此功能用于提供 spark_session 固定装置以在您的测试功能中使用。例如:
def test_dataframe(spark_session):
df = spark_session.createDataFrame([['a', 1], ['b', 2]], ['letter', 'number'])
assert df.schema.names == ['letter', 'number']
// 结束文档
如果您不想在代码中指定模式,您还可以按照How To
->下文档中的说明读取存储库中的文件Read file in Python repository
// 开始文档
读取 Python 存储库中的文件
您可以将存储库中的其他文件读取到转换上下文中。这可能有助于为您的转换代码设置参数以供参考。
首先,在您的 python 存储库中编辑 setup.py:
setup(
name=os.environ['PKG_NAME'],
# ...
package_data={
'': ['*.yaml', '*.csv']
}
)
这告诉 python 将 yaml 和 csv 文件捆绑到包中。然后在您的 python 转换旁边放置一个配置文件(例如 config.yaml,但也可以是 csv 或 txt)(例如 read_yml.py 见下文):
- name: tbl1
primaryKey:
- col1
- col2
update:
- column: col3
with: 'XXX'
您可以read_yml.py
使用以下代码在转换中读取它:
from transforms.api import transform_df, Input, Output
from pkg_resources import resource_stream
import yaml
import json
@transform_df(
Output("/Demo/read_yml")
)
def my_compute_function(ctx):
stream = resource_stream(__name__, "config.yaml")
docs = yaml.load(stream)
return ctx.spark_session.createDataFrame([{'result': json.dumps(docs)}])
所以你的项目结构是:
- 一些文件夹
- 配置.yaml
- read_yml.py
这将在您的数据集中输出一行,其中包含一列“结果”和内容:
[{"primaryKey": ["col1", "col2"], "update": [{"column": "col3", "with": "XXX"}], "name": "tbl1"}]
// 结束文档