对于这个问题,我们将结合几种不同的技术来使这段代码既可测试又具有高度可扩展性。
理论
解析原始文件时,您可以考虑以下几个选项:
- ❌ 您可以编写自己的解析器来从文件中读取字节并将它们转换为 Spark 可以理解的数据。
- 由于工程时间和不可扩展的架构,尽可能不鼓励这样做。当您这样做时,它不会利用分布式计算,因为您必须将整个原始文件带到您的解析方法中,然后才能使用它。这不是对资源的有效利用。
- ⚠ 您可以使用自己的非 Spark 解析器库,例如问题中提到的 XML Python 库
- 虽然这比编写自己的解析器更容易实现,但它仍然没有利用 Spark 中的分布式计算。运行起来更容易,但最终会遇到性能限制,因为它没有利用仅在编写 Spark 库时才公开的低级 Spark 功能。
- ✅ 你可以使用 Spark 原生的原始文件解析器
- 这是所有情况下的首选选项,因为它利用了低级 Spark 功能,并且不需要您编写自己的代码。如果存在低级 Spark 解析器,则应使用它。
在我们的例子中,我们可以使用 Databricks 解析器来获得很好的效果。
通常,您还应该避免使用该.udf
方法,因为它可能正在使用,而不是 Spark API 中已有的良好功能。UDF 的性能不如本机方法,只有在没有其他选项可用时才应使用。
UDF 掩盖隐藏问题的一个很好的例子是对列内容的字符串操作;虽然从技术上讲,您可以使用 UDF 来执行拆分和修剪字符串等操作,但这些内容已经存在于Spark API中,并且比您自己的代码快几个数量级。
设计
我们的设计将使用以下内容:
- 通过Databricks XML Parser完成的低级 Spark 优化文件解析
- 测试驱动的原始文件解析,如此处所述
连接解析器
首先,我们需要添加.jar
到我们spark_session
可用的内部变换。由于最近的改进,此参数在配置后将允许您.jar
在预览/测试和完整构建时使用。以前,这需要一个完整的构建,但现在不需要。
我们需要转到我们的transforms-python/build.gradle
文件并添加 2 个配置块:
- 启用
pytest
插件
- 启用
condaJars
参数并声明.jar
依赖项
我/transforms-python/build.gradle
现在的样子如下:
buildscript {
repositories {
// some other things
}
dependencies {
classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"
}
}
apply plugin: 'com.palantir.transforms.lang.python'
apply plugin: 'com.palantir.transforms.lang.python-defaults'
dependencies {
condaJars "com.databricks:spark-xml_2.13:0.14.0"
}
// Apply the testing plugin
apply plugin: 'com.palantir.transforms.lang.pytest-defaults'
// ... some other awesome features you should enable
应用此配置后,您需要通过单击底部功能区并点击重新启动 Code Assist 会话Refresh

刷新 Code Assist 后,我们现在可以使用低级功能来解析我们的.xml
文件,现在我们需要对其进行测试!
测试解析器
如果我们采用和这里一样的测试驱动开发方式,我们最终/transforms-python/src/myproject/datasets/xml_parse_transform.py
会得到以下内容:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.format('xml').options(rowTag="tag").load(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs, how="wide")
return output_df
@transform(
the_output=Output("my.awesome.output"),
the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)
...一个包含内容的示例文件/transforms-python/test/myproject/datasets/sample.xml
:
<tag>
<field1>
my_value
</field1>
</tag>
和一个测试文件/transforms-python/test/myproject/datasets/test_xml_parse_transform.py
:
from myproject.datasets import xml_parse_transform
from pkg_resources import resource_filename
def test_parse_xml(spark_session):
file_path = resource_filename(__name__, "sample.xml")
parsed_df = xml_parse_transform.read_files(spark_session, [file_path])
assert parsed_df.count() == 1
assert set(parsed_df.columns) == {"field1"}
我们现在有:
.xml
高度可扩展的分布式计算、低级解析器
- 一个测试驱动的设置,我们可以快速迭代以使我们的确切功能正确
干杯