我想使用 解析一系列.csv
文件spark.read.csv
,但我想在文件中包含每一行的行号。
我知道 Spark 通常不会订购 DataFrame,除非明确告知这样做,而且我不想编写自己的.csv
文件解析器,因为这将比 Spark 自己的实现慢得多。如何以分布式安全方式添加此行号?
通过阅读zipWithIndex,它似乎很有用,但不幸的是,它似乎需要分区顺序稳定
我想使用 解析一系列.csv
文件spark.read.csv
,但我想在文件中包含每一行的行号。
我知道 Spark 通常不会订购 DataFrame,除非明确告知这样做,而且我不想编写自己的.csv
文件解析器,因为这将比 Spark 自己的实现慢得多。如何以分布式安全方式添加此行号?
通过阅读zipWithIndex,它似乎很有用,但不幸的是,它似乎需要分区顺序稳定
假设我们有以下设置,用于.csv
在磁盘上创建一个包含我们控制的内容的文件:
from pyspark.sql import types as T, functions as F, SparkSession
import os
import tempfile
spark = SparkSession.builder.getOrCreate()
# Synthesize DataFrames
schema = T.StructType([
T.StructField("col_1", T.StringType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("col_3", T.StringType(), False),
T.StructField("col_4", T.IntegerType(), False),
])
data = [
{"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
]
final_data = []
# Scale up a bit
for _ in range(3):
final_data += data
def refresh():
df = spark.createDataFrame(final_data, schema)
with tempfile.TemporaryDirectory() as tmpdirname:
pathname = os.path.join(
tmpdirname + "output.csv"
)
df.write.format("csv").option(
"header",
True
).save(pathname)
return spark.read.option(
"header",
True
).csv(pathname)
在此设置中,我们可以重复地创建.csv
文件并将它们保存到磁盘,然后像第一次解析它们时那样检索它们。
我们解析这些文件的策略将归结为以下几点:
zipWithIndex
如下所示:
def create_index(
parsed_df,
row_number_column_name="index",
file_name_column_name="_file_name",
block_start_column_name="_block_start",
row_id_column_name="_row_id",
):
unindexed_df = parsed_df.selectExpr(
*parsed_df.columns,
f"input_file_name() AS {file_name_column_name}",
f"input_file_block_start() AS {block_start_column_name}",
f"monotonically_increasing_id() AS {row_id_column_name}"
).orderBy(
file_name_column_name,
block_start_column_name,
row_id_column_name
)
# Unfortunately, we have to unwrap the results of zipWithIndex, so there's some aliasing required
input_cols = unindexed_df.columns
zipped = unindexed_df.rdd.zipWithIndex().toDF()
aliased_columns = []
for input_col in input_cols:
aliased_columns += [zipped["_1." + input_col].alias(input_col)]
# Alias the original columns, remove the ones we built internally
return zipped.select(
*aliased_columns,
zipped["_2"].alias(row_number_column_name)
).drop(
file_name_column_name,
block_start_column_name,
row_id_column_name
)
example_df = refresh()
example_df = create_index(example_df)
example_df.show()
"""
+-----+-----+------+-----+-----+
|col_1|col_2| col_3|col_4|index|
+-----+-----+------+-----+-----+
|key_1| 1|CREATE| 0| 0|
|key_2| 2|CREATE| 0| 1|
|key_3| 3|CREATE| 0| 2|
|key_1| 1|CREATE| 0| 3|
|key_2| 2|CREATE| 0| 4|
|key_3| 3|CREATE| 0| 5|
|key_1| 1|CREATE| 0| 6|
|key_2| 2|CREATE| 0| 7|
|key_3| 3|CREATE| 0| 8|
+-----+-----+------+-----+-----+
"""