我有一个 pyspark 应用程序。我将一个 hive 表复制到我的 hdfs 目录中,并在 pythonsqlContext.sql
中对该表进行了查询。现在这个变量是我调用的数据框rows
。我需要随机洗牌rows
,所以我不得不将它们转换为行列表rows_list = rows.collect()
。然后我shuffle(rows_list)
将列表重新排列到位。我取所需的随机行数x
:
for r in range(x):
allrows2add.append(rows_list[r])
现在我想将 allrows2add 保存为 hive 表或附加现有的 hive 表(以更容易做的为准)。问题是我不能这样做:
all_df = sc.parallelize(allrows2add).toDF()
不能这样做,无法推断架构
ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
没有放入整个架构。的架构rows
有 117 列,所以我不想输入它们。有没有办法提取架构rows
来帮助我制作 allrows2add 数据框或以某种方式保存为配置单元表?我可以做
rows.printSchema()
,但不确定如何将其转换为模式格式作为变量传递toDF()
,而无需解析所有文本
谢谢
添加for循环信息
#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()
for i in range(len(Table)):
rows = sqlContext.sql(qry)
val1 = Table[i][0]
val2 = Table[i][1]
count = Table[i][2]
x = 100 - count
#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;
query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)
rows = sqlContext.sql(query)
rows = rows.withColumn("col4", lit(10))
rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
#rows_list = rows.collect()
#shuffle(rows_list)