0

我正在尝试使用 pyspark 从云存储中加载 json 数据:

json_Df= spark.read \  
        .option("multiline", "true") \  
        .option("inferSchema","true") \  
        .option("mode","DROPMALFORMED") \ \
        .json("gs://operations-kafka/connector/dev/abcd/year=2022/month=02/day=09/")

我得到的数据框是嵌套的。所以我使用下面的函数来展平它。

def flatten(df):
    complex_fields = dict([
    (field.name, field.dataType)
    for field in df.schema.fields
    if isinstance(field.dataType, T.ArrayType) or isinstance(field.dataType, T.StructType)
    ])

    while len(complex_fields) != 0:

        col_name = list(complex_fields.keys())[0]
        if isinstance(complex_fields[col_name], T.StructType):
            expanded = [F.col(col_name + '.' + k).alias(col_name + '_' + k)
                    for k in [ n.name for n in  complex_fields[col_name]]
                   ]

            df = df.select("*", *expanded).drop(col_name)

        elif (type(complex_fields[col_name]) == ArrayType):
            df=df.withColumn(col_name,F.explode_outer(col_name))


        complex_fields = dict([
            (field.name, field.dataType)
            for field in df.schema.fields
            if isinstance(field.dataType, T.ArrayType) or isinstance(field.dataType, T.StructType)

    ])

   
    return df

这是展平前的数据框架构

  cookies: struct (nullable = true)
|    |-- AMS_C4C6370453309C960A490D44%40AOrg: string (nullable = true)
|    |-- vtc: string (nullable = true)
|    |-- smj.c: string (nullable = true)
|    |-- AM_C4C6370453309C960A490D44%40AOrg: string (nullable = true)
|    |-- BVBRANDID: string (nullable = true)
|    |-- BVBRANDSID: string (nullable = true)
|    |-- ENV: string (nullable = true)
|    |-- SiteId: string (nullable = true)

将 json_Df 传递给函数 flatten() 后,它会抛出错误:

No such struct field smj in AMS_C4C6370453309C960A490D44%40AOrg, AM_C4C6370453309C960A490D44%40AdobeOrg, BVBRANDID, BVBRANDSID, ENV, SiteId,

我认为是由于 ' . ' ,它无法展平它。任何人都可以建议一种方法来替换 ' 。'来自带有'_'的smj.c。谢谢

4

0 回答 0