From 4195e697d30c68be4a67a7da71d7c99c92acf14f Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Tue, 18 Mar 2014 18:36:29 -0700 Subject: [PATCH] private, accumulator --- .../org/apache/spark/mllib/linalg/PCA.scala | 31 ++++++++++--------- .../apache/spark/mllib/linalg/PCASuite.scala | 8 ++--- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala index 71c6bb979b4f7..c5fce3e6f8037 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala @@ -41,15 +41,16 @@ class PCA { } /** - * Compute PCA using the current set parameters - */ + * Compute PCA using the current set parameters + */ def compute(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = { computePCA(matrix, k) } /** - * Compute PCA using the current set parameters - */ + * Compute PCA using the parameters currently set + * See computePCA() for more details + */ def compute(matrix: RDD[Array[Double]]): Array[Array[Double]] = { computePCA(matrix, k) } @@ -68,19 +69,19 @@ class PCA { * @param k Recover k principal components * @return An nxk matrix with principal components in columns */ - def computePCA(matrix: TallSkinnyDenseMatrix, k: Int): Array[Array[Double]] = { + private def computePCA(matrix: TallSkinnyDenseMatrix, k: Int): Array[Array[Double]] = { val m = matrix.m val n = matrix.n val sc = matrix.rows.sparkContext if (m <= 0 || n <= 0) { - throw new IllegalArgumentException("Expecting a well-formed matrix") + throw new IllegalArgumentException( + "Expecting a well-formed matrix: m=" + m + " n=" + n) } computePCA(matrix.rows.map(_.data), k) } - /** * Principal Component Analysis. * Computes the top k principal component coefficients for the m-by-n data matrix X. @@ -95,20 +96,24 @@ class PCA { * @param k Recover k principal components * @return An nxk matrix of principal components */ - def computePCA(matrix: RDD[Array[Double]], k: Int): Array[Array[Double]] = { + private def computePCA(matrix: RDD[Array[Double]], k: Int): Array[Array[Double]] = { val n = matrix.first.size val sc = matrix.sparkContext - val m = matrix.count + val m = sc.accumulator(-1) // compute column sums and normalize matrix - val colSums = sc.broadcast(matrix.fold(Array.ofDim[Double](n)){ + val colSumsTemp = matrix.fold(Array.ofDim[Double](n)){ (a, b) => val am = new DoubleMatrix(a) val bm = new DoubleMatrix(b) am.addi(bm) + m += 1 a - }.map(x => x / m)).value - + } + + val normalizedColSums = colSumsTemp.map(x => x / m.value) + val colSums = sc.broadcast(normalizedColSums).value + val data = matrix.map{ x => val row = Array.ofDim[Double](n) @@ -123,7 +128,6 @@ class PCA { } } - /** * Top-level methods for calling Principal Component Analysis * NOTE: All matrices are TallSkinnyDenseMatrix format @@ -155,4 +159,3 @@ object PCA { } } - diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala index 12c0fe3ec9f26..3f0d03a329da0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/PCASuite.scala @@ -52,7 +52,7 @@ class PCASuite extends FunSuite with BeforeAndAfterAll { val m = matrix.m val n = matrix.n val ret = DoubleMatrix.zeros(m, n) - matrix.data.toArray.map(x => ret.put(x.i, x.j, x.mval)) + matrix.data.collect.map(x => ret.put(x.i, x.j, x.mval)) ret } @@ -78,7 +78,7 @@ class PCASuite extends FunSuite with BeforeAndAfterAll { (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886)) val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3))) - val coeffs = new DoubleMatrix(new PCA().computePCA(a, n)) + val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a)) assertMatrixEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs) } @@ -95,7 +95,7 @@ class PCASuite extends FunSuite with BeforeAndAfterAll { (2,0,0.9553), (2,1,-0.0649), (2,2,0.2886)) val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3))) - val coeffs = new DoubleMatrix(new PCA().computePCA(a, n)) + val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a)) assertMatrixEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs) } @@ -113,7 +113,7 @@ class PCASuite extends FunSuite with BeforeAndAfterAll { val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3))) val k = 2 - val coeffs = new DoubleMatrix(new PCA().computePCA(a, k)) + val coeffs = new DoubleMatrix(new PCA().setK(k).compute(a)) assertMatrixEquals(getDenseMatrix(SparseMatrix(realPCA,n,k)), coeffs) }