5

我已按照链接 http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html中给出的指南进行操作

但这已经过时了,因为它使用 spark Mlib RDD 方法。新 Spark 2.0 具有 DataFrame 方法。现在我的问题是我有更新的代码

val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
val predictions = model.transform(test)

现在问题来了,在旧代码中,获得的模型是 MatrixFactorizationModel,现在它有自己的模型(ALSModel)

在 MatrixFactorizationModel 你可以直接做

val recommendations = bestModel.get
  .predict(userID)

这将给出用户喜欢它们的概率最高的产品列表。

但是现在没有 .predict 方法。任何想法如何在给定用户 ID 的情况下推荐产品列表

4

3 回答 3

6

模型使用transform方法:

import spark.implicits._
val dataFrameToPredict = sparkContext.parallelize(Seq((111, 222)))
    .toDF("userId", "productId")
val predictionsOfProducts = model.transform (dataFrameToPredict)

有一个实现推荐(用户|产品)方法的 jira 票,但它还没有在默认分支上

现在你有用户得分的 DataFrame

您可以简单地使用 orderBy 和 limit 来显示 N 个推荐的产品:

// where is for case when we have big DataFrame with many users
model.transform (dataFrameToPredict.where('userId === givenUserId))
    .select ('productId, 'prediction)
    .orderBy('prediction.desc)
    .limit(N)
    .map { case Row (productId: Int, prediction: Double) => (productId, prediction) }
    .collect()

DataFrame dataFrameToPredict 可以是一些大​​的用户产品 DataFrame,例如所有用户 x 所有产品

于 2016-12-20T15:02:47.053 回答
3

Spark 中的ALS 模型包含以下有用的方法:

  • recommendForAllItems(int numUsers)

    返回为每个项目、所有项目推荐的前 numUsers 用户。

  • recommendForAllUsers(int numItems)

    为所有用户返回为每个用户推荐的前 numItems 个项目。

  • recommendForItemSubset(Dataset<?> dataset, int numUsers)

    返回为输入数据集中的每个项目 id 推荐的前 numUsers 用户。

  • recommendForUserSubset(Dataset<?> dataset, int numItems)

    返回为输入数据集中每个用户 id 推荐的前 numItems 个项目。


例如 Python

from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import explode

alsEstimator = ALS()

(alsEstimator.setRank(1)
  .setUserCol("user_id")
  .setItemCol("product_id")
  .setRatingCol("rating")
  .setMaxIter(20)
  .setColdStartStrategy("drop"))

alsModel = alsEstimator.fit(productRatings)

recommendForSubsetDF = alsModel.recommendForUserSubset(TargetUsers, 40)

recommendationsDF = (recommendForSubsetDF
  .select("user_id", explode("recommendations")
  .alias("recommendation"))
  .select("user_id", "recommendation.*")
)

display(recommendationsDF)

例如斯卡拉:

import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.functions.explode 

val alsEstimator = new ALS().setRank(1)
  .setUserCol("user_id")
  .setItemCol("product_id")
  .setRatingCol("rating")
  .setMaxIter(20)
  .setColdStartStrategy("drop")

val alsModel = alsEstimator.fit(productRatings)

val recommendForSubsetDF = alsModel.recommendForUserSubset(sampleTargetUsers, 40)

val recommendationsDF = recommendForSubsetDF
  .select($"user_id", explode($"recommendations").alias("recommendation"))
  .select($"user_id", $"recommendation.*")

display(recommendationsDF)
于 2018-09-19T21:48:13.433 回答
0

以下是我为获得针对特定用户的推荐所做的工作spark.ml

import com.github.fommil.netlib.BLAS.{getInstance => blas}

userFactors.lookup(userId).headOption.fold(Map.empty[String, Float]) { user =>

  val ratings = itemFactors.map { case (id, features) =>
    val rating = blas.sdot(features.length, user, 1, features, 1)
    (id, rating)
  }

  ratings.sortBy(_._2).take(numResults).toMap
}

两者都userFactorsitemFactors我的情况下,RDD[(String, Array[Float])]但你应该能够用 DataFrames 做类似的事情。

于 2017-01-18T17:00:08.960 回答