1

我有一个带有标题的 CSV,我想将其保存为 Parquet(实际上是一个增量表)

列标题中有空格,实木复合地板无法处理。如何将空格更改为下划线?

到目前为止,这是我从其他 SO 帖子中拼凑而成的:

from pyspark.sql.functions import *

df = spark.read.option("header", True).option("delimiter","\u0001").option("inferSchema",True).csv("/mnt/landing/MyFile.TXT")

names = df.schema.names
for name in names:
    df2 = df.withColumnRenamed(name,regexp_replace(name, ' ', '_'))

当我运行它时,最后一行给了我这个错误:

TypeError:列不可迭代

我认为这将是一个常见的要求,因为镶木地板无法处理空间,但很难找到任何示例。

4

3 回答 3

1

您需要使用reduce函数迭代地对数据框应用重命名,因为在您的代码df2中只有最后一列重命名...

代码如下所示(而不是for循环):

df2 = reduce(lambda data, name: data.withColumnRenamed(name, name.replace('1', '2')), 
             names, df)
于 2020-06-24T12:57:44.223 回答
0

你得到异常是因为 -functionregexp_replace返回 typeColumn但 functionwithColumnRenamed是 exception 的 type String

def regexp_replace(e: org.apache.spark.sql.Column,pattern: String,replacement: String): org.apache.spark.sql.Column
def withColumnRenamed(existingName: String,newName: String): org.apache.spark.sql.DataFrame
于 2020-06-24T13:05:30.777 回答
0

使用.toDF(或).select并传递列列表来创建新的数据框。

df.show()
#+---+----+----+
#| id|id a|id b|
#+---+----+----+
#|  1|   a|   b|
#|  2|   c|   d|
#+---+----+----+
new_cols=list(map(lambda x: x.replace(" ", "_"), df.columns))

df.toDF(*new_cols).show()

df.select([col(s).alias(s.replace(' ','_')) for s in df.columns]).show()
#+---+----+----+
#| id|id_a|id_b|
#+---+----+----+
#|  1|   a|   b|
#|  2|   c|   d|
#+---+----+----+
于 2020-06-24T13:00:28.553 回答