1

如何将此 JSON 对象列表转换为 Spark 数据框?

[
  {
    '1': 'A', 
    '2': 'B'
  }, 
  {
    '1': 'A', 
    '3': 'C'
  }
] 

进入

 1     2     3
 A     B     null
 A     null  C

我已经尝试过spark.read.json(spark.sparkContext.parallelize(d))将其与json.dumps(d).

4

2 回答 2

1

我不得不杀死这条龙来导入 JIRA 问题。它们作为响应对象的数据集返回,每个响应对象都包含一个问题 JSON 对象的内部数组。

此代码作为单个转换工作,以获取 DataFrame 中正确解析的 JSON 对象:

import json
from pyspark.sql import Row
from pyspark.sql.functions import explode

def issues_enumerated(All_Issues_Paged):

    def generate_issue_row(input_row: Row) -> Row:
        """
        Generates a dataframe of each responses issue array as a single array record per-Row
        """
        d = input_row.asDict()
        resp_json = d['response']
        resp_obj = json.loads(resp_json)
        issues = list(map(json.dumps,resp_obj['issues']))

        return Row(issues=issues)
    
    # array-per-record
    unexploded_df = All_Issues_Paged.rdd.map(generate_issue_row).toDF()
    # row-per-record
    row_per_record_df = unexploded_df.select(explode(unexploded_df.issues))
    # raw JSON string per-record RDD
    issue_json_strings_rdd = row_per_record_df.rdd.map(lambda _: _.col)
    # JSON object dataframe
    issues_df = spark.read.json(issue_json_strings_rdd)
    issues_df.printSchema()
    return issues_df

Schema 太大而无法显示,但这里有一个片段:

root
 |-- expand: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- aggregateprogress: struct (nullable = true)
 |    |    |-- percent: long (nullable = true)
 |    |    |-- progress: long (nullable = true)
 |    |    |-- total: long (nullable = true)
 |    |-- aggregatetimeestimate: long (nullable = true)
 |    |-- aggregatetimeoriginalestimate: long (nullable = true)
 |    |-- aggregatetimespent: long (nullable = true)
 |    |-- assignee: struct (nullable = true)
 |    |    |-- accountId: string (nullable = true)
 |    |    |-- accountType: string (nullable = true)
 |    |    |-- active: boolean (nullable = true)
 |    |    |-- avatarUrls: struct (nullable = true)
 |    |    |    |-- 16x16: string (nullable = true)
 |    |    |    |-- 24x24: string (nullable = true)
 |    |    |    |-- 32x32: string (nullable = true)
 |    |    |    |-- 48x48: string (nullable = true)
 |    |    |-- displayName: string (nullable = true)
 |    |    |-- emailAddress: string (nullable = true)
 |    |    |-- self: string (nullable = true)
 |    |    |-- timeZone: string (nullable = true)
 |    |-- components: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- self: string (nullable = true)
 |    |-- created: string (nullable = true)
 |    |-- creator: struct (nullable = true)
 |    |    |-- accountId: string (nullable = true)
 |    |    |-- accountType: string (nullable = true)
 |    |    |-- active: boolean (nullable = true)
 |    |    |-- avatarUrls: struct (nullable = true)
 |    |    |    |-- 16x16: string (nullable = true)
 |    |    |    |-- 24x24: string (nullable = true)
 |    |    |    |-- 32x32: string (nullable = true)
 |    |    |    |-- 48x48: string (nullable = true)
 |    |    |-- displayName: string (nullable = true)
 |    |    |-- emailAddress: string (nullable = true)
 |    |    |-- self: string (nullable = true)
 |    |    |-- timeZone: string (nullable = true)
 |    |-- customfield_10000: string (nullable = true)
 |    |-- customfield_10001: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- isShared: boolean (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |-- customfield_10002: string (nullable = true)
 |    |-- customfield_10003: string (nullable = true)
 |    |-- customfield_10004: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- self: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |    |-- customfield_10005: string (nullable = true)
 |    |-- customfield_10006: string (nullable = true)
 |    |-- customfield_10007: string (nullable = true)
 |    |-- customfield_10008: struct (nullable = true)
 |    |    |-- data: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- issueType: struct (nullable = true)
 |    |    |    |    |-- iconUrl: string (nullable = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- key: string (nullable = true)
 |    |    |    |-- keyNum: long (nullable = true)
 |    |    |    |-- projectId: long (nullable = true)
 |    |    |    |-- summary: string (nullable = true)
 |    |    |-- hasEpicLinkFieldDependency: boolean (nullable = true)
 |    |    |-- nonEditableReason: struct (nullable = true)
 |    |    |    |-- message: string (nullable = true)
 |    |    |    |-- reason: string (nullable = true)
 |    |    |-- showField: boolean (nullable = true)
 |    |-- customfield_10009: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- boardId: long (nullable = true)
 |    |    |    |-- completeDate: string (nullable = true)
 |    |    |    |-- endDate: string (nullable = true)
 |    |    |    |-- goal: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- startDate: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)

...
于 2022-01-12T22:10:16.143 回答
0

您可以使用它spark.createDataFrame(d)来获得想要的效果。

您确实会收到关于从字典中推断模式的弃用警告,因此“正确”的方法是首先创建行:

from pyspark.sql import Row
data = [{'1': 'A', '2': 'B'}, {'1': 'A', '3': 'C'}]
schema = ['1', '2', '3']
rows = []
for d in data:
    dict_for_row = {k: d.get(k,None) for k in schema}
    rows.append(Row(**dict_for_row))

然后创建数据框:

df = spark.createDataFrame(row)
于 2020-09-25T15:15:23.063 回答