1

我现在整天都被这个问题困扰,尝试了我发现的一切,我无法完成将我的 Pandas 数据帧(看起来像这样)转换为 PySpark 数据帧:

  label     url_words_t                             url_words_lex
0   0       [ziyexing, picture, phto_fun, img_01]   [281518, 489596, 281519, 281520]
1   0       [animegiant, kimi, todoke]              [187613, 375314, 281521]
2   0       [bitsnoop, files, q873899, html]        [389091, 490613, 187614, 490676]

有信息:

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 300000 entries, 0 to 299999
Data columns (total 3 columns):
 #   Column         Non-Null Count   Dtype 
---  ------         --------------   ----- 
 0   label          300000 non-null  int64 
 1   url_words_t    300000 non-null  object
 2   url_words_lex  300000 non-null  object
dtypes: int64(1), object(2)
memory usage: 6.9+ MB

特别是我遇到了最后一列“url_words_lex”的问题,它是一个整数列表。

我创建了一个架构:

schema_pdf = StructType([
    StructField('label', IntegerType(), True),
    StructField('url_words_t', ArrayType(StringType(), True), True),
    StructField('url_words_lex', ArrayType(LongType(), True), True)
])

最后,尝试转换:

df_lex = spark.createDataFrame(pdf, schema_pdf)

如果我跳过最后一列 'url_words_lex' 它转换得很好,但是有了它,我收到一个错误:

TypeError                                 Traceback (most recent call last)
<ipython-input-92-686967517920> in <module>
----> 1 df_lex = spark.createDataFrame(pdf, schema_pdf)

~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    746             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    747         else:
--> 748             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    749         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    750         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    411         # make sure data could consumed multiple times
    412         if not isinstance(data, list):
--> 413             data = list(data)
    414 
    415         if schema is None or isinstance(schema, (list, tuple)):

~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/session.py in prepare(obj)
    728 
    729             def prepare(obj):
--> 730                 verify_func(obj)
    731                 return obj
    732         elif isinstance(schema, DataType):

~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify(obj)
   1387     def verify(obj):
   1388         if not verify_nullability(obj):
-> 1389             verify_value(obj)
   1390 
   1391     return verify

~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify_struct(obj)
   1368                                 "length of fields (%d)" % (len(obj), len(verifiers))))
   1369                 for v, (_, verifier) in zip(obj, verifiers):
-> 1370                     verifier(v)
   1371             elif hasattr(obj, "__dict__"):
   1372                 d = obj.__dict__

~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify(obj)
   1387     def verify(obj):
   1388         if not verify_nullability(obj):
-> 1389             verify_value(obj)
   1390 
   1391     return verify

~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify_array(obj)
   1328             verify_acceptable_types(obj)
   1329             for i in obj:
-> 1330                 element_verifier(i)
   1331 
   1332         verify_value = verify_array

~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify(obj)
   1387     def verify(obj):
   1388         if not verify_nullability(obj):
-> 1389             verify_value(obj)
   1390 
   1391     return verify

~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify_default(obj)
   1381         def verify_default(obj):
   1382             assert_acceptable_types(obj)
-> 1383             verify_acceptable_types(obj)
   1384 
   1385         verify_value = verify_default

~/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/types.py in verify_acceptable_types(obj)
   1276         if type(obj) not in _acceptable_types[_type]:
   1277             raise TypeError(new_msg("%s can not accept object %r in type %s"
-> 1278                                     % (dataType, obj, type(obj))))
   1279 
   1280     if isinstance(dataType, StringType):

TypeError: element in array field url_words_lex: LongType can not accept object 281518 in type <class 'numpy.int32'>

我尝试过使用 IntegerType、LongType - 我遇到了同样的错误。我认为我正在用 PySpark 的 IntegerType 映射 numpy.int32,但我无法解决这个问题......

但是,当我尝试查看结果时,它只会使用 StringType 进行无错误转换

df_lex.show()

我收到一个不同的错误:

Py4JJavaError: An error occurred while calling o642.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.0 failed 1 times, most recent failure: Lost task 0.0 in stage 32.0 (TID 57, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)

如果您能分享您解决此问题的想法,我将不胜感激。

4

1 回答 1

0

我尝试将这些数字显式转换为 int(),同时附加到之前代码中的 'url_words_lex' 中的列表并且它有效!

于 2020-05-31T19:03:25.863 回答