diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index e6688c1ccbb22..23b54e51f5b7a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -39,10 +39,14 @@ object EigenValueDecomposition { * * @param mul a function that multiplies the symmetric matrix with a DenseVector. * @param n dimension of the square matrix (maximum Int.MaxValue). - * @param k number of leading eigenvalues required. + * @param k number of leading eigenvalues required, 0 < k < n. * @param tol tolerance of the eigs computation. * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors - * (columns of the matrix). The number of computed eigenvalues might be smaller than k. + * (columns of the matrix). + * @note The number of computed eigenvalues might be smaller than k when some Ritz values do not + * satisfy the convergence criterion specified by tol (see ARPACK Users Guide, Chapter 4.6 + * for more details). The maximum number of Arnoldi update iterations is set to 300 in this + * function. */ private[mllib] def symmetricEigs(mul: DenseVector => DenseVector, n: Int, k: Int, tol: Double) : (BDV[Double], BDM[Double]) = { @@ -55,10 +59,10 @@ object EigenValueDecomposition { val tolW = new doubleW(tol) // number of desired eigenvalues, 0 < nev < n val nev = new intW(k) - // nev Lanczos vectors are generated are generated in the first iteration - // ncv-nev Lanczos vectors are generated are generated in each subsequent iteration + // nev Lanczos vectors are generated in the first iteration + // ncv-nev Lanczos vectors are generated in each subsequent iteration // ncv must be smaller than n - val ncv = scala.math.min(2 * k, n) + val ncv = math.min(2 * k, n) // "I" for standard eigenvalue problem, "G" for generalized eigenvalue problem val bmat = "I" @@ -75,7 +79,7 @@ object EigenValueDecomposition { var ido = new intW(0) var info = new intW(0) - var resid:Array[Double] = new Array[Double](n) + var resid = new Array[Double](n) var v = new Array[Double](n * ncv) var workd = new Array[Double](n * 3) var workl = new Array[Double](ncv * (ncv + 8)) @@ -128,19 +132,20 @@ object EigenValueDecomposition { // number of computed eigenvalues, might be smaller than k val computed = iparam(4) - val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map{ + val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map { r => (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n)) } // sort the eigen-pairs in descending order - val sortedEigenPairs = eigenPairs.sortBy(-1 * _._1) + val sortedEigenPairs = eigenPairs.sortBy(- _._1) // copy eigenvectors in descending order of eigenvalues val sortedU = BDM.zeros[Double](n, computed) - sortedEigenPairs.zipWithIndex.map{ + sortedEigenPairs.zipWithIndex.foreach { r => { + val b = r._2 * n for (i <- 0 until n) { - sortedU.data(r._2 * n + i) = r._1._2(i) + sortedU.data(b + i) = r._1._2(i) } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index ecd25c1273cb4..fbe822c53e1ce 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -207,13 +207,14 @@ class RowMatrix( * @param v a local DenseVector whose length must match the number of columns of this matrix. * @return a local DenseVector representing the product. */ - private[mllib] def multiplyGramianMatrix(v: DenseVector): DenseVector = { + private[mllib] def multiplyGramianMatrixBy(v: DenseVector): DenseVector = { val n = numCols().toInt + val vbr = rows.context.broadcast(v.toBreeze) val bv = rows.aggregate(BDV.zeros[Double](n))( seqOp = (U, r) => { val rBrz = r.toBreeze - val a = rBrz.dot(v.toBreeze) + val a = rBrz.dot(vbr.value) rBrz match { case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U) case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U) @@ -223,7 +224,7 @@ class RowMatrix( combOp = (U1, U2) => U1 += U2 ) - new DenseVector(bv.data) + Vectors.fromBreeze(bv).asInstanceOf[DenseVector] } /** @@ -259,7 +260,11 @@ class RowMatrix( k: Int, computeU: Boolean = false, rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { - computeSVD(k, computeU, rCond, 1e-9) + if (numCols() < 100) { + computeSVD(k, computeU, rCond, 1e-9, true) + } else { + computeSVD(k, computeU, rCond, 1e-9, false) + } } /** @@ -274,7 +279,7 @@ class RowMatrix( * The decomposition is computed by providing a function that multiples a vector with A'A to * ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V. * Then we compute U via easy matrix multiplication as U = A * (V * S^{-1}). - * Note that this approach requires `O(nnz(A))` time. + * Note that this approach requires approximately `O(k * nnz(A))` time. * * ARPACK requires k to be strictly less than n. Thus when the requested eigenvalues k = n, a * non-sparse implementation will be used, which requires `n^2` doubles to fit in memory and @@ -294,21 +299,27 @@ class RowMatrix( * are treated as zero, where sigma(0) is the largest singular value. * @param tol the numerical tolerance of svd computation. Larger tolerance means fewer iterations, * but less accurate result. + * @param isDenseSVD invoke dense SVD implementation when isDenseSVD = true. This requires + * `O(n^2)` memory and `O(n^3)` time. For a skinny matrix (m >> n) with small n, + * dense implementation might be faster. * @return SingularValueDecomposition(U, s, V) */ def computeSVD( k: Int, computeU: Boolean, rCond: Double, - tol: Double): SingularValueDecomposition[RowMatrix, Matrix] = { + tol: Double, + isDenseSVD: Boolean): SingularValueDecomposition[RowMatrix, Matrix] = { val n = numCols().toInt require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") - val (sigmaSquares: BDV[Double], u: BDM[Double]) = if (k < n) { - EigenValueDecomposition.symmetricEigs(multiplyGramianMatrix, n, k, tol) + val (sigmaSquares: BDV[Double], u: BDM[Double]) = if (!isDenseSVD && k < n) { + EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol) } else { - logWarning(s"Request full SVD (k = n = $k), while ARPACK requires k strictly less than n. " + - s"Using non-sparse implementation.") + if (!isDenseSVD && k == n) { + logWarning(s"Request full SVD (k = n = $k), while ARPACK requires k strictly less than " + + s"n. Using non-sparse implementation.") + } val G = computeGramianMatrix() val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 8014428b02d59..e2ff423ca7f79 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -95,38 +95,42 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { } test("svd of a full-rank matrix") { - for (mat <- Seq(denseMat, sparseMat)) { - val localMat = mat.toBreeze() - val (localU, localSigma, localVt) = brzSvd(localMat) - val localV: BDM[Double] = localVt.t.toDenseMatrix - for (k <- 1 to n) { - val svd = mat.computeSVD(k, computeU = true) - val U = svd.U - val s = svd.s - val V = svd.V - assert(U.numRows() === m) - assert(U.numCols() === k) - assert(s.size === k) - assert(V.numRows === n) - assert(V.numCols === k) - assertColumnEqualUpToSign(U.toBreeze(), localU, k) - assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) - assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) + for (denseSVD <- Seq(true, false)) { + for (mat <- Seq(denseMat, sparseMat)) { + val localMat = mat.toBreeze() + val (localU, localSigma, localVt) = brzSvd(localMat) + val localV: BDM[Double] = localVt.t.toDenseMatrix + for (k <- 1 to n) { + val svd = mat.computeSVD(k, true, 1e-9, 1e-9, denseSVD) + val U = svd.U + val s = svd.s + val V = svd.V + assert(U.numRows() === m) + assert(U.numCols() === k) + assert(s.size === k) + assert(V.numRows === n) + assert(V.numCols === k) + assertColumnEqualUpToSign(U.toBreeze(), localU, k) + assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) + assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) + } + val svdWithoutU = mat.computeSVD(n - 1, false, 1e-9, 1e-9, denseSVD) + assert(svdWithoutU.U === null) } - val svdWithoutU = mat.computeSVD(n - 1) - assert(svdWithoutU.U === null) } } test("svd of a low-rank matrix") { - val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) - val mat = new RowMatrix(rows, 4, 3) - val svd = mat.computeSVD(2, computeU = true) - assert(svd.s.size === 1, "should not return zero singular values") - assert(svd.U.numRows() === 4) - assert(svd.U.numCols() === 1) - assert(svd.V.numRows === 3) - assert(svd.V.numCols === 1) + for (denseSVD <- Seq(true, false)) { + val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) + val mat = new RowMatrix(rows, 4, 3) + val svd = mat.computeSVD(2, true, 1e-9, 1e-9, denseSVD) + assert(svd.s.size === 1, "should not return zero singular values") + assert(svd.U.numRows() === 4) + assert(svd.U.numCols() === 1) + assert(svd.V.numRows === 3) + assert(svd.V.numCols === 1) + } } def closeToZero(G: BDM[Double]): Boolean = {