Skip to content

Commit

Permalink
[MLlib] [SPARK-2885] DIMSUM: All-pairs similarity
Browse files Browse the repository at this point in the history
# All-pairs similarity via DIMSUM
Compute all pairs of similar vectors using brute force approach, and also DIMSUM sampling approach.

Laying down some notation: we are looking for all pairs of similar columns in an m x n RowMatrix whose entries are denoted a_ij, with the i’th row denoted r_i and the j’th column denoted c_j. There is an oversampling parameter labeled ɣ that should be set to 4 log(n)/s to get provably correct results (with high probability), where s is the similarity threshold.

The algorithm is stated with a Map and Reduce, with proofs of correctness and efficiency in published papers [1] [2]. The reducer is simply the summation reducer. The mapper is more interesting, and is also the heart of the scheme. As an exercise, you should try to see why in expectation, the map-reduce below outputs cosine similarities.

![dimsumv2](https://cloud.githubusercontent.com/assets/3220351/3807272/d1d9514e-1c62-11e4-9f12-3cfdb1d78b3a.png)

[1] Bosagh-Zadeh, Reza and Carlsson, Gunnar (2013), Dimension Independent Matrix Square using MapReduce, arXiv:1304.1467 http://arxiv.org/abs/1304.1467

[2] Bosagh-Zadeh, Reza and Goel, Ashish (2012), Dimension Independent Similarity Computation, arXiv:1206.2082 http://arxiv.org/abs/1206.2082

# Testing

Tests for all invocations included.

Added L1 and L2 norm computation to MultivariateStatisticalSummary since it was needed. Added tests for both of them.

Author: Reza Zadeh <[email protected]>
Author: Xiangrui Meng <[email protected]>

Closes apache#1778 from rezazadeh/dimsumv2 and squashes the following commits:

404c64c [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2
4eb71c6 [Reza Zadeh] Add excludes for normL1 and normL2
ee8bd65 [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2
976ddd4 [Reza Zadeh] Broadcast colMags. Avoid div by zero.
3467cff [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2
aea0247 [Reza Zadeh] Allow large thresholds to promote sparsity
9fe17c0 [Xiangrui Meng] organize imports
2196ba5 [Xiangrui Meng] Merge branch 'rezazadeh-dimsumv2' into dimsumv2
254ca08 [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2
f2947e4 [Xiangrui Meng] some optimization
3c4cf41 [Xiangrui Meng] Merge branch 'master' into rezazadeh-dimsumv2
0e4eda4 [Reza Zadeh] Use partition index for RNG
251bb9c [Reza Zadeh] Documentation
25e9d0d [Reza Zadeh] Line length for style
fb296f6 [Reza Zadeh] renamed to normL1 and normL2
3764983 [Reza Zadeh] Documentation
e9c6791 [Reza Zadeh] New interface and documentation
613f261 [Reza Zadeh] Column magnitude summary
75a0b51 [Reza Zadeh] Use Ints instead of Longs in the shuffle
0f12ade [Reza Zadeh] Style changes
eb1dc20 [Reza Zadeh] Use Double.PositiveInfinity instead of Double.Max
f56a882 [Reza Zadeh] Remove changes to MultivariateOnlineSummarizer
dbc55ba [Reza Zadeh] Make colMagnitudes a method in RowMatrix
41e8ece [Reza Zadeh] style changes
139c8e1 [Reza Zadeh] Syntax changes
029aa9c [Reza Zadeh] javadoc and new test
75edb25 [Reza Zadeh] All tests passing!
05e59b8 [Reza Zadeh] Add test
502ce52 [Reza Zadeh] new interface
654c4fb [Reza Zadeh] default methods
3726ca9 [Reza Zadeh] Remove MatrixAlgebra
6bebabb [Reza Zadeh] remove changes to MatrixSuite
5b8cd7d [Reza Zadeh] Initial files
  • Loading branch information
rezazadeh authored and mengxr committed Sep 29, 2014
1 parent aedd251 commit 587a0cd
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ package org.apache.spark.mllib.linalg.distributed

import java.util.Arrays

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
import scala.collection.mutable.ListBuffer

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV, axpy => brzAxpy,
svd => brzSvd}
import breeze.numerics.{sqrt => brzSqrt}
import com.github.fommil.netlib.BLAS.{getInstance => blas}

import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.random.XORShiftRandom
import org.apache.spark.storage.StorageLevel

/**
Expand Down Expand Up @@ -411,6 +415,165 @@ class RowMatrix(
new RowMatrix(AB, nRows, B.numCols)
}

/**
* Compute all cosine similarities between columns of this matrix using the brute-force
* approach of computing normalized dot products.
*
* @return An n x n sparse upper-triangular matrix of cosine similarities between
* columns of this matrix.
*/
def columnSimilarities(): CoordinateMatrix = {
columnSimilarities(0.0)
}

/**
* Compute similarities between columns of this matrix using a sampling approach.
*
* The threshold parameter is a trade-off knob between estimate quality and computational cost.
*
* Setting a threshold of 0 guarantees deterministic correct results, but comes at exactly
* the same cost as the brute-force approach. Setting the threshold to positive values
* incurs strictly less computational cost than the brute-force approach, however the
* similarities computed will be estimates.
*
* The sampling guarantees relative-error correctness for those pairs of columns that have
* similarity greater than the given similarity threshold.
*
* To describe the guarantee, we set some notation:
* Let A be the smallest in magnitude non-zero element of this matrix.
* Let B be the largest in magnitude non-zero element of this matrix.
* Let L be the maximum number of non-zeros per row.
*
* For example, for {0,1} matrices: A=B=1.
* Another example, for the Netflix matrix: A=1, B=5
*
* For those column pairs that are above the threshold,
* the computed similarity is correct to within 20% relative error with probability
* at least 1 - (0.981)^10/B^
*
* The shuffle size is bounded by the *smaller* of the following two expressions:
*
* O(n log(n) L / (threshold * A))
* O(m L^2^)
*
* The latter is the cost of the brute-force approach, so for non-zero thresholds,
* the cost is always cheaper than the brute-force approach.
*
* @param threshold Set to 0 for deterministic guaranteed correctness.
* Similarities above this threshold are estimated
* with the cost vs estimate quality trade-off described above.
* @return An n x n sparse upper-triangular matrix of cosine similarities
* between columns of this matrix.
*/
def columnSimilarities(threshold: Double): CoordinateMatrix = {
require(threshold >= 0, s"Threshold cannot be negative: $threshold")

if (threshold > 1) {
logWarning(s"Threshold is greater than 1: $threshold " +
"Computation will be more efficient with promoted sparsity, " +
" however there is no correctness guarantee.")
}

val gamma = if (threshold < 1e-6) {
Double.PositiveInfinity
} else {
10 * math.log(numCols()) / threshold
}

columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma)
}

/**
* Find all similar columns using the DIMSUM sampling algorithm, described in two papers
*
* http://arxiv.org/abs/1206.2082
* http://arxiv.org/abs/1304.1467
*
* @param colMags A vector of column magnitudes
* @param gamma The oversampling parameter. For provable results, set to 10 * log(n) / s,
* where s is the smallest similarity score to be estimated,
* and n is the number of columns
* @return An n x n sparse upper-triangular matrix of cosine similarities
* between columns of this matrix.
*/
private[mllib] def columnSimilaritiesDIMSUM(
colMags: Array[Double],
gamma: Double): CoordinateMatrix = {
require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma")
require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension")
val sg = math.sqrt(gamma) // sqrt(gamma) used many times

// Don't divide by zero for those columns with zero magnitude
val colMagsCorrected = colMags.map(x => if (x == 0) 1.0 else x)

val sc = rows.context
val pBV = sc.broadcast(colMagsCorrected.map(c => sg / c))
val qBV = sc.broadcast(colMagsCorrected.map(c => math.min(sg, c)))

val sims = rows.mapPartitionsWithIndex { (indx, iter) =>
val p = pBV.value
val q = qBV.value

val rand = new XORShiftRandom(indx)
val scaled = new Array[Double](p.size)
iter.flatMap { row =>
val buf = new ListBuffer[((Int, Int), Double)]()
row match {
case sv: SparseVector =>
val nnz = sv.indices.size
var k = 0
while (k < nnz) {
scaled(k) = sv.values(k) / q(sv.indices(k))
k += 1
}
k = 0
while (k < nnz) {
val i = sv.indices(k)
val iVal = scaled(k)
if (iVal != 0 && rand.nextDouble() < p(i)) {
var l = k + 1
while (l < nnz) {
val j = sv.indices(l)
val jVal = scaled(l)
if (jVal != 0 && rand.nextDouble() < p(j)) {
buf += (((i, j), iVal * jVal))
}
l += 1
}
}
k += 1
}
case dv: DenseVector =>
val n = dv.values.size
var i = 0
while (i < n) {
scaled(i) = dv.values(i) / q(i)
i += 1
}
i = 0
while (i < n) {
val iVal = scaled(i)
if (iVal != 0 && rand.nextDouble() < p(i)) {
var j = i + 1
while (j < n) {
val jVal = scaled(j)
if (jVal != 0 && rand.nextDouble() < p(j)) {
buf += (((i, j), iVal * jVal))
}
j += 1
}
}
i += 1
}
}
buf
}
}.reduceByKey(_ + _).map { case ((i, j), sim) =>
MatrixEntry(i.toLong, j.toLong, sim)
}
new CoordinateMatrix(sims, numCols(), numCols())
}

private[mllib] override def toBreeze(): BDM[Double] = {
val m = numRows().toInt
val n = numCols().toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
private var n = 0
private var currMean: BDV[Double] = _
private var currM2n: BDV[Double] = _
private var currM2: BDV[Double] = _
private var currL1: BDV[Double] = _
private var totalCnt: Long = 0
private var nnz: BDV[Double] = _
private var currMax: BDV[Double] = _
Expand All @@ -60,6 +62,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S

currMean = BDV.zeros[Double](n)
currM2n = BDV.zeros[Double](n)
currM2 = BDV.zeros[Double](n)
currL1 = BDV.zeros[Double](n)
nnz = BDV.zeros[Double](n)
currMax = BDV.fill(n)(Double.MinValue)
currMin = BDV.fill(n)(Double.MaxValue)
Expand All @@ -81,6 +85,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
val tmpPrevMean = currMean(i)
currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0)
currM2n(i) += (value - currMean(i)) * (value - tmpPrevMean)
currM2(i) += value * value
currL1(i) += math.abs(value)

nnz(i) += 1.0
}
Expand All @@ -97,7 +103,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
* @return This MultivariateOnlineSummarizer object.
*/
def merge(other: MultivariateOnlineSummarizer): this.type = {
if (this.totalCnt != 0 && other.totalCnt != 0) {
if (this.totalCnt != 0 && other.totalCnt != 0) {
require(n == other.n, s"Dimensions mismatch when merging with another summarizer. " +
s"Expecting $n but got ${other.n}.")
totalCnt += other.totalCnt
Expand All @@ -114,6 +120,15 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) /
(nnz(i) + other.nnz(i))
}
// merge m2 together
if (nnz(i) + other.nnz(i) != 0.0) {
currM2(i) += other.currM2(i)
}
// merge l1 together
if (nnz(i) + other.nnz(i) != 0.0) {
currL1(i) += other.currL1(i)
}

if (currMax(i) < other.currMax(i)) {
currMax(i) = other.currMax(i)
}
Expand All @@ -127,6 +142,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
this.n = other.n
this.currMean = other.currMean.copy
this.currM2n = other.currM2n.copy
this.currM2 = other.currM2.copy
this.currL1 = other.currL1.copy
this.totalCnt = other.totalCnt
this.nnz = other.nnz.copy
this.currMax = other.currMax.copy
Expand Down Expand Up @@ -198,4 +215,23 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
}
Vectors.fromBreeze(currMin)
}

override def normL2: Vector = {
require(totalCnt > 0, s"Nothing has been added to this summarizer.")

val realMagnitude = BDV.zeros[Double](n)

var i = 0
while (i < currM2.size) {
realMagnitude(i) = math.sqrt(currM2(i))
i += 1
}

Vectors.fromBreeze(realMagnitude)
}

override def normL1: Vector = {
require(totalCnt > 0, s"Nothing has been added to this summarizer.")
Vectors.fromBreeze(currL1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,14 @@ trait MultivariateStatisticalSummary {
* Minimum value of each column.
*/
def min: Vector

/**
* Euclidean magnitude of each column
*/
def normL2: Vector

/**
* L1 norm of each column
*/
def normL1: Vector
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,40 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
}
}

test("similar columns") {
val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94))
val expected = BDM(
(0.0, 54.0, 72.0),
(0.0, 0.0, 78.0),
(0.0, 0.0, 0.0))

for (i <- 0 until n; j <- 0 until n) {
expected(i, j) /= (colMags(i) * colMags(j))
}

for (mat <- Seq(denseMat, sparseMat)) {
val G = mat.columnSimilarities(0.11).toBreeze()
for (i <- 0 until n; j <- 0 until n) {
if (expected(i, j) > 0) {
val actual = expected(i, j)
val estimate = G(i, j)
assert(math.abs(actual - estimate) / actual < 0.2,
s"Similarities not close enough: $actual vs $estimate")
}
}
}

for (mat <- Seq(denseMat, sparseMat)) {
val G = mat.columnSimilarities()
assert(closeToZero(G.toBreeze() - expected))
}

for (mat <- Seq(denseMat, sparseMat)) {
val G = mat.columnSimilaritiesDIMSUM(colMags.toArray, 150.0)
assert(closeToZero(G.toBreeze() - expected))
}
}

test("svd of a full-rank matrix") {
for (mat <- Seq(denseMat, sparseMat)) {
for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
Expand Down Expand Up @@ -190,6 +224,9 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
assert(summary.numNonzeros === Vectors.dense(3.0, 3.0, 4.0), "nnz mismatch")
assert(summary.max === Vectors.dense(9.0, 7.0, 8.0), "max mismatch")
assert(summary.min === Vectors.dense(0.0, 0.0, 1.0), "column mismatch.")
assert(summary.normL2 === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)),
"magnitude mismatch.")
assert(summary.normL1 === Vectors.dense(18.0, 12.0, 16.0), "L1 norm mismatch")
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@ object MimaExcludes {
MimaBuild.excludeSparkPackage("graphx")
) ++
MimaBuild.excludeSparkClass("mllib.linalg.Matrix") ++
MimaBuild.excludeSparkClass("mllib.linalg.Vector")
MimaBuild.excludeSparkClass("mllib.linalg.Vector") ++
Seq(
// Added normL1 and normL2 to trait MultivariateStatisticalSummary
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2")
)

case v if v.startsWith("1.1") =>
Seq(
Expand Down

0 comments on commit 587a0cd

Please sign in to comment.