5

我有一个 pyspark 应用程序。我将一个 hive 表复制到我的 hdfs 目录中,并在 pythonsqlContext.sql中对该表进行了查询。现在这个变量是我调用的数据框rows。我需要随机洗牌rows,所以我不得不将它们转换为行列表rows_list = rows.collect()。然后我shuffle(rows_list)将列表重新排列到位。我取所需的随机行数x

for r in range(x): allrows2add.append(rows_list[r]) 现在我想将 allrows2add 保存为 hive 表或附加现有的 hive 表(以更容易做的为准)。问题是我不能这样做:

all_df = sc.parallelize(allrows2add).toDF()不能这样做,无法推断架构 ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

没有放入整个架构。的架构rows有 117 列,所以我不想输入它们。有没有办法提取架构rows来帮助我制作 allrows2add 数据框或以某种方式保存为配置单元表?我可以做 rows.printSchema(),但不确定如何将其转换为模式格式作为变量传递toDF(),而无需解析所有文本

谢谢

添加for循环信息

#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()

for i in range(len(Table)):

    rows = sqlContext.sql(qry)
    val1 = Table[i][0]
    val2 = Table[i][1]
    count = Table[i][2]
    x = 100 - count

#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;

    query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)

    rows = sqlContext.sql(query)
    rows = rows.withColumn("col4", lit(10))
    rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
    rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
    #rows_list = rows.collect()
    #shuffle(rows_list)
4

1 回答 1

23

当无法推断架构时,通常是有原因的。toDFcreateDataFrame函数的语法糖,默认情况下只使用前 100 行(尽管文档说它只使用第一行)来确定架构应该是什么。要改变这一点,您可以增加采样率以查看更大百分比的数据:

df = rdd.toDF(sampleRatio=0.2)
# or...
df = sqlContext.createDataFrame(rdd, samplingRatio=0.2)

您的随机样本也有可能碰巧只取某些特定列的空值行。如果是这种情况,您可以像这样从头开始创建架构

from pyspark.sql.types import *
# all DataFrame rows are StructType
# can create a new StructType with combinations of StructField
schema = StructType([
    StructField("column_1", StringType(), True),
    StructField("column_2", IntegerType(), True),
    # etc.
])
df = sqlContext.createDataFrame(rdd, schema=schema)

或者,您可以通过访问值从之前创建的 DataFrame 中获取架构schema

df2 = sqlContext.createDataFrame(rdd, schema=df1.schema)

请注意,如果您的 RDD 的行不是StructType(aka Row)对象而不是字典或列表,您将无法从它们创建数据框。如果您的 RDD 行是字典,您可以将它们转换为Row如下对象:

rdd = rdd.map(lambda x: pyspark.sql.Row(**x))
# ** is to unpack the dictionary since the Row constructor
# only takes keyword arguments
于 2016-04-28T01:46:41.010 回答