1

我有一个向量的 RDD 集合,其中每个向量表示一个点xy坐标。例如,文件如下:

1.1 1.2
6.1 4.8
0.1 0.1
9.0 9.0
9.1 9.1
0.4 2.1

我正在阅读它:

  def parseVector(line: String): Vector[Double] = {
    DenseVector(line.split(' ')).map(_.toDouble)
  }

  val lines = sc.textFile(inputFile)
  val points = lines.map(parseVector).cache()

另外,我有一个 epsilon:

  val eps = 2.0

对于每个点,我想找到它在 epsilon 距离内的邻居。我愿意:

points.foreach(point =>
  // squaredDistance(point, ?) what should I write here?
)

如何循环所有点并为每个点找到它的邻居?可能使用map函数?

4

4 回答 4

2

您可以执行以下操作:

val distanceBetweenPoints = points.cartesian(points)
    .filter{case (x,y) => (x!=y)} // remove the (x,x) diagonal
    .map{case (x,y) => ((x,y),distance(x,y))}
val pointsWithinEps = distanceBetweenPoints.filter{case ((x,y),distance) => distance <= eps)}

如果您以后不关心点之间的距离,您也可以在过滤器中组合距离计算。

于 2014-10-25T11:19:35.997 回答
2

即使这个答案已经被接受,我在这里作为一个通知,由于笛卡尔运算O(n^2)的复杂性和巨大的笛卡尔运算,已被建议的与 github repo 中提出的基本相同的已接受解决方案并不是真正可扩展的数据集这绝对是一个问题。

还有另一种解决方案,即 DBSCAN 算法在 Spark 上的另一种实现,可以在这里找到https://github.com/alitouka/spark_dbscan。该解决方案提出了一种不同的方法,将 RDD 数据集划分为“框”。这样,近点只能是所考虑点的同一框中的点,以及距离连续分区边界小于 epsilon 的点。这样,复杂度下降到O(m^2)where mis n/kk即分区数。此外还进行了其他优化(如果您需要更多详细信息,您可以阅读代码,联系作者或询问我)。

以前的实现有一些限制:只支持欧几里得和曼哈顿度量,并且只有很少维度的数据集可以成功处理。为了克服这个问题,我创建了这个分支,旨在消除所有这些问题:https ://github.com/speedymrk9/spark_dbscan/tree/distance-measure-independent 。现在,它似乎工作正常,所有问题都解决了,尽管我正在继续测试它,以便在发出拉取请求之前确定它没有缺陷。

于 2015-07-03T08:25:31.360 回答
1

您可以使用SparkAI 库并执行以下操作:

import org.aizook.scala.clustering.Spark_DBSCAN.DBSCAN val cluster:Dbscan = new Dbscan(3,5,data) cluster.predict((2000,(48.3,33.1)))

`val data: RDD(Long,(Double, Double)
eps = 3
minPts = 5`
于 2014-11-10T05:53:15.890 回答
0

@Bob 那是因为(48.3,33.1)不适合集群,应该归类为噪音。我对SparkAI 库进行了更新,只要预测符合噪声,它就会返回 -1

import org.aizook.scala.clustering.Spark_DBSCAN.Dbscan
val eps = 2
val minPts = 2
val data = sc.textFile("data.txt").map(_.split(" ")).map(p => (p(0).trim.toDouble, p(1).trim.toDouble)).zipWithUniqueId().map(x => (x._2,x._1)).cache;
val cluster:Dbscan = new Dbscan(eps,minPts,data)
cluster.predict((data.count+1,(9.0,10.0)))  // Should return 1 for cluster 1
cluster.predict((data.count+2,(2.0,2.0)))   // Should return 0 for cluster 0
cluster.predict((data.count+3,(15.0,23.0))) // Should return -1 for noise

data.txt 包含您提交的数据样本:

1.1 1.2
6.1 4.8
0.1 0.1
9.0 9.0
9.1 9.1
0.4 2.1
于 2014-11-15T17:17:21.173 回答