Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MLlib] [SPARK-2885] DIMSUM: All-pairs similarity #1778

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5b8cd7d
Initial files
rezazadeh Aug 4, 2014
6bebabb
remove changes to MatrixSuite
rezazadeh Aug 4, 2014
3726ca9
Remove MatrixAlgebra
rezazadeh Aug 4, 2014
654c4fb
default methods
rezazadeh Aug 4, 2014
502ce52
new interface
rezazadeh Aug 4, 2014
05e59b8
Add test
rezazadeh Aug 4, 2014
75edb25
All tests passing!
rezazadeh Aug 5, 2014
029aa9c
javadoc and new test
rezazadeh Aug 5, 2014
139c8e1
Syntax changes
rezazadeh Aug 5, 2014
41e8ece
style changes
rezazadeh Aug 5, 2014
dbc55ba
Make colMagnitudes a method in RowMatrix
rezazadeh Aug 6, 2014
f56a882
Remove changes to MultivariateOnlineSummarizer
rezazadeh Aug 6, 2014
eb1dc20
Use Double.PositiveInfinity instead of Double.Max
rezazadeh Aug 7, 2014
0f12ade
Style changes
rezazadeh Aug 31, 2014
75a0b51
Use Ints instead of Longs in the shuffle
rezazadeh Aug 31, 2014
613f261
Column magnitude summary
rezazadeh Sep 11, 2014
e9c6791
New interface and documentation
rezazadeh Sep 14, 2014
3764983
Documentation
rezazadeh Sep 14, 2014
fb296f6
renamed to normL1 and normL2
rezazadeh Sep 14, 2014
25e9d0d
Line length for style
rezazadeh Sep 14, 2014
251bb9c
Documentation
rezazadeh Sep 14, 2014
0e4eda4
Use partition index for RNG
rezazadeh Sep 20, 2014
3c4cf41
Merge branch 'master' into rezazadeh-dimsumv2
mengxr Sep 24, 2014
f2947e4
some optimization
mengxr Sep 24, 2014
254ca08
Merge remote-tracking branch 'upstream/master' into dimsumv2
rezazadeh Sep 25, 2014
2196ba5
Merge branch 'rezazadeh-dimsumv2' into dimsumv2
mengxr Sep 25, 2014
9fe17c0
organize imports
mengxr Sep 25, 2014
aea0247
Allow large thresholds to promote sparsity
rezazadeh Sep 26, 2014
3467cff
Merge remote-tracking branch 'upstream/master' into dimsumv2
rezazadeh Sep 26, 2014
976ddd4
Broadcast colMags. Avoid div by zero.
rezazadeh Sep 26, 2014
ee8bd65
Merge remote-tracking branch 'upstream/master' into dimsumv2
rezazadeh Sep 26, 2014
4eb71c6
Add excludes for normL1 and normL2
rezazadeh Sep 26, 2014
404c64c
Merge remote-tracking branch 'upstream/master' into dimsumv2
rezazadeh Sep 26, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ package org.apache.spark.mllib.linalg.distributed

import java.util.Arrays

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
import scala.collection.mutable.ListBuffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate scala imports from java ones

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separated


import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV, axpy => brzAxpy,
svd => brzSvd}
import breeze.numerics.{sqrt => brzSqrt}
import com.github.fommil.netlib.BLAS.{getInstance => blas}

import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.random.XORShiftRandom
import org.apache.spark.storage.StorageLevel

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed extra empty line

/**
Expand Down Expand Up @@ -411,6 +415,165 @@ class RowMatrix(
new RowMatrix(AB, nRows, B.numCols)
}

/**
* Compute all cosine similarities between columns of this matrix using the brute-force
* approach of computing normalized dot products.
*
* @return An n x n sparse upper-triangular matrix of cosine similarities between
* columns of this matrix.
*/
def columnSimilarities(): CoordinateMatrix = {
columnSimilarities(0.0)
}

/**
* Compute similarities between columns of this matrix using a sampling approach.
*
* The threshold parameter is a trade-off knob between estimate quality and computational cost.
*
* Setting a threshold of 0 guarantees deterministic correct results, but comes at exactly
* the same cost as the brute-force approach. Setting the threshold to positive values
* incurs strictly less computational cost than the brute-force approach, however the
* similarities computed will be estimates.
*
* The sampling guarantees relative-error correctness for those pairs of columns that have
* similarity greater than the given similarity threshold.
*
* To describe the guarantee, we set some notation:
* Let A be the smallest in magnitude non-zero element of this matrix.
* Let B be the largest in magnitude non-zero element of this matrix.
* Let L be the maximum number of non-zeros per row.
*
* For example, for {0,1} matrices: A=B=1.
* Another example, for the Netflix matrix: A=1, B=5
*
* For those column pairs that are above the threshold,
* the computed similarity is correct to within 20% relative error with probability
* at least 1 - (0.981)^10/B^
*
* The shuffle size is bounded by the *smaller* of the following two expressions:
*
* O(n log(n) L / (threshold * A))
* O(m L^2^)
*
* The latter is the cost of the brute-force approach, so for non-zero thresholds,
* the cost is always cheaper than the brute-force approach.
*
* @param threshold Set to 0 for deterministic guaranteed correctness.
* Similarities above this threshold are estimated
* with the cost vs estimate quality trade-off described above.
* @return An n x n sparse upper-triangular matrix of cosine similarities
* between columns of this matrix.
*/
def columnSimilarities(threshold: Double): CoordinateMatrix = {
require(threshold >= 0, s"Threshold cannot be negative: $threshold")

if (threshold > 1) {
logWarning(s"Threshold is greater than 1: $threshold " +
"Computation will be more efficient with promoted sparsity, " +
" however there is no correctness guarantee.")
}

val gamma = if (threshold < 1e-6) {
Double.PositiveInfinity
} else {
10 * math.log(numCols()) / threshold
}

columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma)
}

/**
* Find all similar columns using the DIMSUM sampling algorithm, described in two papers
*
* http://arxiv.org/abs/1206.2082
* http://arxiv.org/abs/1304.1467
*
* @param colMags A vector of column magnitudes
* @param gamma The oversampling parameter. For provable results, set to 10 * log(n) / s,
* where s is the smallest similarity score to be estimated,
* and n is the number of columns
* @return An n x n sparse upper-triangular matrix of cosine similarities
* between columns of this matrix.
*/
private[mllib] def columnSimilaritiesDIMSUM(
colMags: Array[Double],
gamma: Double): CoordinateMatrix = {
require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma")
require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension")
val sg = math.sqrt(gamma) // sqrt(gamma) used many times

// Don't divide by zero for those columns with zero magnitude
val colMagsCorrected = colMags.map(x => if (x == 0) 1.0 else x)

val sc = rows.context
val pBV = sc.broadcast(colMagsCorrected.map(c => sg / c))
val qBV = sc.broadcast(colMagsCorrected.map(c => math.min(sg, c)))

val sims = rows.mapPartitionsWithIndex { (indx, iter) =>
val p = pBV.value
val q = qBV.value

val rand = new XORShiftRandom(indx)
val scaled = new Array[Double](p.size)
iter.flatMap { row =>
val buf = new ListBuffer[((Int, Int), Double)]()
row match {
case sv: SparseVector =>
val nnz = sv.indices.size
var k = 0
while (k < nnz) {
scaled(k) = sv.values(k) / q(sv.indices(k))
k += 1
}
k = 0
while (k < nnz) {
val i = sv.indices(k)
val iVal = scaled(k)
if (iVal != 0 && rand.nextDouble() < p(i)) {
var l = k + 1
while (l < nnz) {
val j = sv.indices(l)
val jVal = scaled(l)
if (jVal != 0 && rand.nextDouble() < p(j)) {
buf += (((i, j), iVal * jVal))
}
l += 1
}
}
k += 1
}
case dv: DenseVector =>
val n = dv.values.size
var i = 0
while (i < n) {
scaled(i) = dv.values(i) / q(i)
i += 1
}
i = 0
while (i < n) {
val iVal = scaled(i)
if (iVal != 0 && rand.nextDouble() < p(i)) {
var j = i + 1
while (j < n) {
val jVal = scaled(j)
if (jVal != 0 && rand.nextDouble() < p(j)) {
buf += (((i, j), iVal * jVal))
}
j += 1
}
}
i += 1
}
}
buf
}
}.reduceByKey(_ + _).map { case ((i, j), sim) =>
MatrixEntry(i.toLong, j.toLong, sim)
}
new CoordinateMatrix(sims, numCols(), numCols())
}

private[mllib] override def toBreeze(): BDM[Double] = {
val m = numRows().toInt
val n = numCols().toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
private var n = 0
private var currMean: BDV[Double] = _
private var currM2n: BDV[Double] = _
private var currM2: BDV[Double] = _
private var currL1: BDV[Double] = _
private var totalCnt: Long = 0
private var nnz: BDV[Double] = _
private var currMax: BDV[Double] = _
Expand All @@ -60,6 +62,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S

currMean = BDV.zeros[Double](n)
currM2n = BDV.zeros[Double](n)
currM2 = BDV.zeros[Double](n)
currL1 = BDV.zeros[Double](n)
nnz = BDV.zeros[Double](n)
currMax = BDV.fill(n)(Double.MinValue)
currMin = BDV.fill(n)(Double.MaxValue)
Expand All @@ -81,6 +85,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
val tmpPrevMean = currMean(i)
currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0)
currM2n(i) += (value - currMean(i)) * (value - tmpPrevMean)
currM2(i) += value * value
currL1(i) += math.abs(value)

nnz(i) += 1.0
}
Expand All @@ -97,7 +103,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
* @return This MultivariateOnlineSummarizer object.
*/
def merge(other: MultivariateOnlineSummarizer): this.type = {
if (this.totalCnt != 0 && other.totalCnt != 0) {
if (this.totalCnt != 0 && other.totalCnt != 0) {
require(n == other.n, s"Dimensions mismatch when merging with another summarizer. " +
s"Expecting $n but got ${other.n}.")
totalCnt += other.totalCnt
Expand All @@ -114,6 +120,15 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) /
(nnz(i) + other.nnz(i))
}
// merge m2 together
if (nnz(i) + other.nnz(i) != 0.0) {
currM2(i) += other.currM2(i)
}
// merge l1 together
if (nnz(i) + other.nnz(i) != 0.0) {
currL1(i) += other.currL1(i)
}

if (currMax(i) < other.currMax(i)) {
currMax(i) = other.currMax(i)
}
Expand All @@ -127,6 +142,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
this.n = other.n
this.currMean = other.currMean.copy
this.currM2n = other.currM2n.copy
this.currM2 = other.currM2.copy
this.currL1 = other.currL1.copy
this.totalCnt = other.totalCnt
this.nnz = other.nnz.copy
this.currMax = other.currMax.copy
Expand Down Expand Up @@ -198,4 +215,23 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
}
Vectors.fromBreeze(currMin)
}

override def normL2: Vector = {
require(totalCnt > 0, s"Nothing has been added to this summarizer.")

val realMagnitude = BDV.zeros[Double](n)

var i = 0
while (i < currM2.size) {
realMagnitude(i) = math.sqrt(currM2(i))
i += 1
}

Vectors.fromBreeze(realMagnitude)
}

override def normL1: Vector = {
require(totalCnt > 0, s"Nothing has been added to this summarizer.")
Vectors.fromBreeze(currL1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,14 @@ trait MultivariateStatisticalSummary {
* Minimum value of each column.
*/
def min: Vector

/**
* Euclidean magnitude of each column
*/
def normL2: Vector

/**
* L1 norm of each column
*/
def normL1: Vector
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mean, stddev, and variance are summary statistics of the random variable. The 1-norm of a random variable should be E[|X|] instead of \sum_i |x_i|. See http://www.math.uah.edu/stat/expect/Spaces.html . I suggest changing the method names to norm1 and norm2 and outputs the average instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For general vectors and matrices, L1 and L2 norms are widely accepted as summaries of a vector and are standard linear algebra: http://en.wikipedia.org/wiki/Norm_(mathematics)#p-norm

RowMatrix is a general matrix, not necessarily holding random variable samples in its columns.
By using the names normL1 and normL2 it is clear what we're talking about - there is no mistaking what L1 means. Whereas if I were to name them norm1 and use some averaging, I think the name is ambiguous and requires the user to know we're assuming the matrix holds random variable samples in its columns. There is no need to confuse the user to make an assumption we don't need.

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,40 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
}
}

test("similar columns") {
val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94))
val expected = BDM(
(0.0, 54.0, 72.0),
(0.0, 0.0, 78.0),
(0.0, 0.0, 0.0))

for (i <- 0 until n; j <- 0 until n) {
expected(i, j) /= (colMags(i) * colMags(j))
}

for (mat <- Seq(denseMat, sparseMat)) {
val G = mat.columnSimilarities(0.11).toBreeze()
for (i <- 0 until n; j <- 0 until n) {
if (expected(i, j) > 0) {
val actual = expected(i, j)
val estimate = G(i, j)
assert(math.abs(actual - estimate) / actual < 0.2,
s"Similarities not close enough: $actual vs $estimate")
}
}
}

for (mat <- Seq(denseMat, sparseMat)) {
val G = mat.columnSimilarities()
assert(closeToZero(G.toBreeze() - expected))
}

for (mat <- Seq(denseMat, sparseMat)) {
val G = mat.columnSimilaritiesDIMSUM(colMags.toArray, 150.0)
assert(closeToZero(G.toBreeze() - expected))
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no test for obtaining partial similar pairs. Do you mind adding one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test for partial similar pairs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the test output only a subset of column pairs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is estimating some column pairs i.e. I looked at the output and checked that some pairs don't have their similarity perfectly computed.

test("svd of a full-rank matrix") {
for (mat <- Seq(denseMat, sparseMat)) {
for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
Expand Down Expand Up @@ -190,6 +224,9 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
assert(summary.numNonzeros === Vectors.dense(3.0, 3.0, 4.0), "nnz mismatch")
assert(summary.max === Vectors.dense(9.0, 7.0, 8.0), "max mismatch")
assert(summary.min === Vectors.dense(0.0, 0.0, 1.0), "column mismatch.")
assert(summary.normL2 === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)),
"magnitude mismatch.")
assert(summary.normL1 === Vectors.dense(18.0, 12.0, 16.0), "L1 norm mismatch")
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@ object MimaExcludes {
MimaBuild.excludeSparkPackage("graphx")
) ++
MimaBuild.excludeSparkClass("mllib.linalg.Matrix") ++
MimaBuild.excludeSparkClass("mllib.linalg.Vector")
MimaBuild.excludeSparkClass("mllib.linalg.Vector") ++
Seq(
// Added normL1 and normL2 to trait MultivariateStatisticalSummary
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2")
)

case v if v.startsWith("1.1") =>
Seq(
Expand Down