1

我想测试 PySpark 正则表达式的不同输入,以查看它们在运行构建之前是否失败/成功。在运行完整的构建/检查之前,有没有办法在 Foundry 中对此进行测试?

4

2 回答 2

2

您可以使用创作中的预览功能对您的输入进行下采样,然后您可以在其中指定一个过滤器来制作您的输入以进行测试。

然后,您可以在此自定义示例上运行您的 PySpark 代码,以验证它是否符合您的预期。

单击Preview按钮后,单击以下视图中的齿轮。

采样

然后,您可以描述您想要的样本。

过滤

完成此操作后,在输入上运行正则表达式将快速且易于测试。

于 2021-11-16T22:28:14.033 回答
1

我也是编写单元测试的粉丝。创建一个小的输入 df,所需的输出 df,并编写一个简单的函数来获取输入、应用正则表达式并返回输出。

import pytest
from datetime import date
import pandas as pd  # noqa
import numpy as np
from myproject.analysis.simple_discount import (
    calc
)

columns = [
    "date",
    "id",
    "other",
    "brand",
    "grp_id",
    "amounth",
    "pct",
    "max_amount",
    "unit",
    "total_units"
]

output_columns = [
    "date",
    "id",
    "other",
    "brand",
    "grp_id",
    "amount",
    "pct",
    "max_amount",
    "qty",
    "total_amount"
]


@pytest.fixture
def input_df(spark_session):
    data = [
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 4],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 3],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 4],
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
        ['3/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
        ['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
        ['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
        ['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
        ['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
    ]
    pdf = pd.DataFrame(data, columns=columns)
    pdf = pdf.replace({np.nan: None})
    return spark_session.createDataFrame(pdf)


@pytest.fixture
def output_df(spark_session):
    data = [
        ['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 27, 14.580000000000002],
        ['3/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 1, 1.3],
    ]
    pdf = pd.DataFrame(data, columns=columns)
    pdf = pdf.replace({np.nan: None})
    return spark_session.createDataFrame(pdf)


# ======= FIRST RUN CASE

def test_normal_input(input_df, output_df):
    calc_output_df = calc(input_df)
    assert sorted(calc_output_df.collect()) == sorted(output_df.collect())




#
# Folder Structure
#
# transforms-python/
# ├── ...
# └── src/
#     ├── ...
#     ├── myproject/
#     │   ├── ...
#     │   └── analysis/
#     │       ├── ...
#     │       └── simple_discounts.py
#     └── tests/
#         ├── ...
#         └── unit_tests.py
于 2021-12-09T00:02:51.673 回答