11

我有一个DenseVector RDD这样的

>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]

我想将其转换为Dataframe. 我试过这样

>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()

它给出了这样的错误

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 360, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 340, in _inferSchema
    schema = _infer_schema(first)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 991, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 968, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.ndarray'>

旧解决方案

frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))

编辑 1 - 代码可重现

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import split

from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector

sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc.setLogLevel('ERROR')

sentenceData = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData = sentenceData.withColumn("sentence", split("sentence", "\s+"))
sentenceData.show()

vectorizer = CountVectorizer(inputCol="sentence", outputCol="rawfeatures").fit(sentenceData)
countVectors = vectorizer.transform(sentenceData).select("label", "rawfeatures")

idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(countVectors)
tfidf = idfModel.transform(countVectors).select("label", "features")
frequencyDenseVectors = tfidf.rdd.map(lambda vector: [vector[0],DenseVector(vector[1].toArray())])
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])
4

2 回答 2

14

不能RDD[Vector]直接转换。它应该映射到一个RDD可以解释为的对象structs,例如RDD[Tuple[Vector]]

frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])

否则 Spark 将尝试转换对象__dict__并创建使用不受支持的 NumPy 数组作为字段。

from pyspark.ml.linalg import DenseVector  
from pyspark.sql.types import _infer_schema

v = DenseVector([1, 2, 3])
_infer_schema(v)
TypeError                                 Traceback (most recent call last)
... 
TypeError: not supported type: <class 'numpy.ndarray'>

对比

_infer_schema((v, ))
StructType(List(StructField(_1,VectorUDT,true)))

备注

  • 在 Spark 2.0 中,您必须使用正确的本地类型:

    • pyspark.ml.linalg工作时DataFrame基于pyspark.mlAPI。
    • pyspark.mllib.linalg工作时RDD基于pyspark.mllibAPI。

    这两个命名空间不再兼容并需要显式转换(例如如何从 org.apache.spark.mllib.linalg.VectorUDT 转换为 ml.linalg.VectorUDT)。

  • 编辑中提供的代码与原始问题中的代码不同。您应该意识到这一点tuple并且list没有相同的语义。如果您将向量映射为配对使用tuple并直接转换为DataFrame

    tfidf.rdd.map(
        lambda row: (row[0], DenseVector(row[1].toArray()))
    ).toDF()
    

    using tuple(product type) 也适用于嵌套结构,但我怀疑这是你想要的:

    (tfidf.rdd
        .map(lambda row: (row[0], DenseVector(row[1].toArray())))
        .map(lambda x: (x, ))
        .toDF())
    

    list在顶层以外的任何其他地方row都被解释为ArrayType.

  • 使用 UDF 进行转换要干净得多(Spark Python: Standard scaler error "Do not support ... SparseVector")。

于 2016-12-26T11:50:40.390 回答
1

我认为这里的问题是 createDataframe 不以denseVactor 作为参数请尝试将denseVector 转换为相应的集合[即数组或列表]。在 Scala 和 Java 中

toArray()

方法可用,您可以将密集向量转换为数组或列表,然后尝试创建数据帧。

于 2016-12-26T10:35:30.587 回答