我现在整天都被这个问题困扰,尝试了我发现的一切,我无法完成将我的 Pandas 数据帧(看起来像这样)转换为 PySpark 数据帧:
label url_words_t url_words_lex
0 0 [ziyexing, picture, phto_fun, img_01] [281518, 489596, 281519, 281520]
1 0 [animegiant, kimi, todoke] [187613, 375314, 281521]
2 0 [bitsnoop, files, q873899, html] [389091, 490613, 187614, 490676]
有信息:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 300000 entries, 0 to 299999
Data columns (total 3 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 label 300000 non-null int64
1 url_words_t 300000 non-null object
2 url_words_lex 300000 non-null object
dtypes: int64(1), object(2)
memory usage: 6.9+ MB
特别是我遇到了最后一列“url_words_lex”的问题,它是一个整数列表。
我创建了一个架构:
schema_pdf = StructType([
StructField('label', IntegerType(), True),
StructField('url_words_t', ArrayType(StringType(), True), True),
StructField('url_words_lex', ArrayType(LongType(), True), True)
])
最后,尝试转换:
df_lex = spark.createDataFrame(pdf, schema_pdf)
如果我跳过最后一列 'url_words_lex' 它转换得很好,但是有了它,我收到一个错误:
TypeError Traceback (most recent call last)
<ipython-input-92-686967517920> in <module>
----> 1 df_lex = spark.createDataFrame(pdf, schema_pdf)
~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
746 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
747 else:
--> 748 rdd, schema = self._createFromLocal(map(prepare, data), schema)
749 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
750 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/session.py in _createFromLocal(self, data, schema)
411 # make sure data could consumed multiple times
412 if not isinstance(data, list):
--> 413 data = list(data)
414
415 if schema is None or isinstance(schema, (list, tuple)):
~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/session.py in prepare(obj)
728
729 def prepare(obj):
--> 730 verify_func(obj)
731 return obj
732 elif isinstance(schema, DataType):
~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify(obj)
1387 def verify(obj):
1388 if not verify_nullability(obj):
-> 1389 verify_value(obj)
1390
1391 return verify
~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify_struct(obj)
1368 "length of fields (%d)" % (len(obj), len(verifiers))))
1369 for v, (_, verifier) in zip(obj, verifiers):
-> 1370 verifier(v)
1371 elif hasattr(obj, "__dict__"):
1372 d = obj.__dict__
~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify(obj)
1387 def verify(obj):
1388 if not verify_nullability(obj):
-> 1389 verify_value(obj)
1390
1391 return verify
~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify_array(obj)
1328 verify_acceptable_types(obj)
1329 for i in obj:
-> 1330 element_verifier(i)
1331
1332 verify_value = verify_array
~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify(obj)
1387 def verify(obj):
1388 if not verify_nullability(obj):
-> 1389 verify_value(obj)
1390
1391 return verify
~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify_default(obj)
1381 def verify_default(obj):
1382 assert_acceptable_types(obj)
-> 1383 verify_acceptable_types(obj)
1384
1385 verify_value = verify_default
~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify_acceptable_types(obj)
1276 if type(obj) not in _acceptable_types[_type]:
1277 raise TypeError(new_msg("%s can not accept object %r in type %s"
-> 1278 % (dataType, obj, type(obj))))
1279
1280 if isinstance(dataType, StringType):
TypeError: element in array field url_words_lex: LongType can not accept object 281518 in type <class 'numpy.int32'>
我尝试过使用 IntegerType、LongType - 我遇到了同样的错误。我认为我正在用 PySpark 的 IntegerType 映射 numpy.int32,但我无法解决这个问题......
但是,当我尝试查看结果时,它只会使用 StringType 进行无错误转换
df_lex.show()
我收到一个不同的错误:
Py4JJavaError: An error occurred while calling o642.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.0 failed 1 times, most recent failure: Lost task 0.0 in stage 32.0 (TID 57, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
如果您能分享您解决此问题的想法,我将不胜感激。