Skip to content

Commit

Permalink
Use partition index for RNG
Browse files Browse the repository at this point in the history
  • Loading branch information
rezazadeh committed Sep 20, 2014
1 parent 251bb9c commit 0e4eda4
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.mllib.linalg.distributed

import java.util.Arrays

import scala.collection.mutable.ListBuffer

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
Expand All @@ -33,7 +34,6 @@ import org.apache.spark.Logging
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}


/**
* :: Experimental ::
* Represents a row-oriented distributed Matrix with no meaningful row indices.
Expand Down Expand Up @@ -405,7 +405,7 @@ class RowMatrix(
}

/**
* Compute all similarities between columns of this matrix using a sampling approach.
* Compute similarities between columns of this matrix using a sampling approach.
*
* The threshold parameter is a trade-off knob between estimate quality and computational cost.
*
Expand All @@ -420,14 +420,14 @@ class RowMatrix(
* To describe the guarantee, we set some notation:
* Let A be the smallest in magnitude non-zero element of this matrix.
* Let B be the largest in magnitude non-zero element of this matrix.
* Let L be the number of non-zeros per row.
* Let L be the maximum number of non-zeros per row.
*
* For example, for {0,1} matrices: A=B=1.
* Another example, for the Netflix matrix: A=1, B=5
*
* For those column pairs that are above the threshold,
* the computed similarity is correct to within 20% relative error with probability
* at least 1 - (0.981)^(100/B)
* at least 1 - (0.981)^(10/B)
*
* The shuffle size is bounded by the *smaller* of the following two expressions:
*
Expand All @@ -437,18 +437,19 @@ class RowMatrix(
* The latter is the cost of the brute-force approach, so for non-zero thresholds,
* the cost is always cheaper than the brute-force approach.
*
* @param threshold Similarities above this threshold are probably computed correctly.
* Set to 0 for deterministic guaranteed correctness.
* @param threshold Set to 0 for deterministic guaranteed correctness.
* Similarities above this threshold are estimated
* with the cost vs estimate quality trade-off described above.
* @return An n x n sparse upper-triangular matrix of cosine similarities
* between columns of this matrix.
*/
def similarColumns(threshold: Double): CoordinateMatrix = {
require(threshold >= 0 && threshold <= 1, s"Threshold not in [0,1]: $threshold")

val gamma = if (math.abs(threshold) < 1e-6) {
val gamma = if (threshold < 1e-6) {
Double.PositiveInfinity
} else {
100 * math.log(numCols()) / threshold
10 * math.log(numCols()) / threshold
}

similarColumnsDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma)
Expand All @@ -468,32 +469,34 @@ class RowMatrix(
* between columns of this matrix.
*/
private[mllib] def similarColumnsDIMSUM(colMags: Array[Double],
gamma: Double): CoordinateMatrix = {
gamma: Double): CoordinateMatrix = {
require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma")
require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension")

val sg = math.sqrt(gamma) // sqrt(gamma) used many times

val sims = rows.flatMap { row =>
val buf = new ListBuffer[((Int, Int), Double)]()
row.toBreeze.activeIterator.foreach {
case (_, 0.0) => // Skip explicit zero elements.
case (i, iVal) =>
val rand = new scala.util.Random(iVal.toLong)
val ci = colMags(i)
if (rand.nextDouble < sg / ci) {
row.toBreeze.activeIterator.foreach {
case (_, 0.0) => // Skip explicit zero elements.
case (j, jVal) =>
val cj = colMags(j)
if (i < j && rand.nextDouble < sg / cj) {
val contrib = ((i, j), (iVal * jVal) / (math.min(sg, ci) * math.min(sg, cj)))
buf += contrib
}
val sims = rows.mapPartitionsWithIndex { (indx, iter) =>
val rand = new scala.util.Random(indx)
iter.flatMap { row =>
val buf = new ListBuffer[((Int, Int), Double)]()
row.toBreeze.activeIterator.foreach {
case (_, 0.0) => // Skip explicit zero elements.
case (i, iVal) =>
val ci = colMags(i)
if (rand.nextDouble < sg / ci) {
row.toBreeze.activeIterator.foreach {
case (_, 0.0) => // Skip explicit zero elements.
case (j, jVal) =>
val cj = colMags(j)
if (i < j && rand.nextDouble < sg / cj) {
val contrib = ((i, j), (iVal * jVal) / (math.min(sg, ci) * math.min(sg, cj)))
buf += contrib
}
}
}
}
}
buf
}
buf
}.reduceByKey(_ + _).map { case ((i, j), sim) =>
MatrixEntry(i.toLong, j.toLong, sim)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,17 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
(0.0, 0.0, 78.0),
(0.0, 0.0, 0.0))

for(i <- 0 until n) for(j <- 0 until n) {
for (i <- 0 until n; j <- 0 until n) {
expected(i, j) /= (colMags(i) * colMags(j))
}

for (mat <- Seq(denseMat, sparseMat)) {
val G = mat.similarColumns(0.1).toBreeze()
for(i <- 0 until n) for(j <- 0 until n) {
val G = mat.similarColumns(0.11).toBreeze()
for (i <- 0 until n; j <- 0 until n) {
if (expected(i, j) > 0) {
val actual = expected(i, j)
val estimate = G(i, j)
assert(math.abs(actual - estimate) / actual < 0.1,
assert(math.abs(actual - estimate) / actual < 0.2,
s"Similarities not close enough: $actual vs $estimate")
}
}
Expand Down

0 comments on commit 0e4eda4

Please sign in to comment.