我正在尝试使用 pyspark 从云存储中加载 json 数据:
json_Df= spark.read \
.option("multiline", "true") \
.option("inferSchema","true") \
.option("mode","DROPMALFORMED") \ \
.json("gs://operations-kafka/connector/dev/abcd/year=2022/month=02/day=09/")
我得到的数据框是嵌套的。所以我使用下面的函数来展平它。
def flatten(df):
complex_fields = dict([
(field.name, field.dataType)
for field in df.schema.fields
if isinstance(field.dataType, T.ArrayType) or isinstance(field.dataType, T.StructType)
])
while len(complex_fields) != 0:
col_name = list(complex_fields.keys())[0]
if isinstance(complex_fields[col_name], T.StructType):
expanded = [F.col(col_name + '.' + k).alias(col_name + '_' + k)
for k in [ n.name for n in complex_fields[col_name]]
]
df = df.select("*", *expanded).drop(col_name)
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,F.explode_outer(col_name))
complex_fields = dict([
(field.name, field.dataType)
for field in df.schema.fields
if isinstance(field.dataType, T.ArrayType) or isinstance(field.dataType, T.StructType)
])
return df
这是展平前的数据框架构
cookies: struct (nullable = true)
| |-- AMS_C4C6370453309C960A490D44%40AOrg: string (nullable = true)
| |-- vtc: string (nullable = true)
| |-- smj.c: string (nullable = true)
| |-- AM_C4C6370453309C960A490D44%40AOrg: string (nullable = true)
| |-- BVBRANDID: string (nullable = true)
| |-- BVBRANDSID: string (nullable = true)
| |-- ENV: string (nullable = true)
| |-- SiteId: string (nullable = true)
将 json_Df 传递给函数 flatten() 后,它会抛出错误:
No such struct field smj in AMS_C4C6370453309C960A490D44%40AOrg, AM_C4C6370453309C960A490D44%40AdobeOrg, BVBRANDID, BVBRANDSID, ENV, SiteId,
我认为是由于 ' . ' ,它无法展平它。任何人都可以建议一种方法来替换 ' 。'来自带有'_'的smj.c。谢谢