diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 6d7ead9f9b71a..de80367ba9211 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -18,125 +18,88 @@ package org.apache.spark.mllib.linalg.distributed import breeze.linalg.{DenseMatrix => BDM} -import org.apache.spark.util.Utils import org.apache.spark.{Logging, Partitioner} -import org.apache.spark.mllib.linalg._ -import org.apache.spark.mllib.rdd.RDDFunctions._ +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel /** - * A grid partitioner, which stores every block in a separate partition. + * A grid partitioner, which uses a regular grid to partition coordinates. * - * @param numRowBlocks Number of blocks that form the rows of the matrix. - * @param numColBlocks Number of blocks that form the columns of the matrix. - * @param suggestedNumPartitions Number of partitions to partition the rdd into. The final number - * of partitions will be set to `min(suggestedNumPartitions, - * numRowBlocks * numColBlocks)`, because setting the number of - * partitions greater than the number of sub matrices is not useful. + * @param rows Number of rows. + * @param cols Number of columns. + * @param rowsPerPart Number of rows per partition, which may be less at the bottom edge. + * @param colsPerPart Number of columns per partition, which may be less at the right edge. */ private[mllib] class GridPartitioner( - val numRowBlocks: Int, - val numColBlocks: Int, - suggestedNumPartitions: Int) extends Partitioner { - private val totalBlocks = numRowBlocks.toLong * numColBlocks - // Having the number of partitions greater than the number of sub matrices does not help - override val numPartitions = math.min(suggestedNumPartitions, totalBlocks).toInt - - private val blockLengthsPerPartition = findOptimalBlockLengths - // Number of neighboring blocks to take in each row - private val numRowBlocksPerPartition = blockLengthsPerPartition._1 - // Number of neighboring blocks to take in each column - private val numColBlocksPerPartition = blockLengthsPerPartition._2 - // Number of rows of partitions - private val blocksPerRow = math.ceil(numRowBlocks * 1.0 / numRowBlocksPerPartition).toInt + val rows: Int, + val cols: Int, + val rowsPerPart: Int, + val colsPerPart: Int) extends Partitioner { + + require(rows > 0) + require(cols > 0) + require(rowsPerPart > 0) + require(colsPerPart > 0) + + private val rowPartitions = math.ceil(rows / rowsPerPart).toInt + private val colPartitions = math.ceil(cols / colsPerPart).toInt + + override val numPartitions = rowPartitions * colPartitions /** - * Returns the index of the partition the SubMatrix belongs to. Tries to achieve block wise - * partitioning. + * Returns the index of the partition the input coordinate belongs to. * - * @param key The key for the SubMatrix. Can be its position in the grid (its column major index) - * or a tuple of three integers that are the final row index after the multiplication, - * the index of the block to multiply with, and the final column index after the + * @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in * multiplication. - * @return The index of the partition, which the SubMatrix belongs to. + * @return The index of the partition, which the coordinate belongs to. */ override def getPartition(key: Any): Int = { key match { - case (blockRowIndex: Int, blockColIndex: Int) => - getPartitionId(blockRowIndex, blockColIndex) - case (blockRowIndex: Int, innerIndex: Int, blockColIndex: Int) => - getPartitionId(blockRowIndex, blockColIndex) + case (i: Int, j: Int) => + getPartitionId(i, j) + case (i: Int, j: Int, _) => + getPartitionId(i, j) case _ => - throw new IllegalArgumentException(s"Unrecognized key. key: $key") + throw new IllegalArgumentException(s"Unrecognized key: $key") } } /** Partitions sub-matrices as blocks with neighboring sub-matrices. */ - private def getPartitionId(blockRowIndex: Int, blockColIndex: Int): Int = { - require(0 <= blockRowIndex && blockRowIndex < numRowBlocks, "The blockRowIndex in the key " + - s"must be in the range 0 <= blockRowIndex < numRowBlocks. blockRowIndex: $blockRowIndex," + - s"numRowBlocks: $numRowBlocks") - require(0 <= blockRowIndex && blockColIndex < numColBlocks, "The blockColIndex in the key " + - s"must be in the range 0 <= blockRowIndex < numColBlocks. blockColIndex: $blockColIndex, " + - s"numColBlocks: $numColBlocks") - // Coordinates of the block - val i = blockRowIndex / numRowBlocksPerPartition - val j = blockColIndex / numColBlocksPerPartition - // The mod shouldn't be required but is added as a guarantee for possible corner cases - Utils.nonNegativeMod(j * blocksPerRow + i, numPartitions) - } - - /** Tries to calculate the optimal number of blocks that should be in each partition. */ - private def findOptimalBlockLengths: (Int, Int) = { - // Gives the optimal number of blocks that need to be in each partition - val targetNumBlocksPerPartition = math.ceil(totalBlocks * 1.0 / numPartitions).toInt - // Number of neighboring blocks to take in each row - var m = math.ceil(math.sqrt(targetNumBlocksPerPartition)).toInt - // Number of neighboring blocks to take in each column - var n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt - // Try to make m and n close to each other while making sure that we don't exceed the number - // of partitions - var numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m) - var numBlocksForCols = math.ceil(numColBlocks * 1.0 / n) - while ((numBlocksForRows * numBlocksForCols > numPartitions) && (m * n != 0)) { - if (numRowBlocks <= numColBlocks) { - m += 1 - n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt - } else { - n += 1 - m = math.ceil(targetNumBlocksPerPartition * 1.0 / n).toInt - } - numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m) - numBlocksForCols = math.ceil(numColBlocks * 1.0 / n) - } - // If a good partitioning scheme couldn't be found, set the side with the smaller dimension to - // 1 and the other to the number of targetNumBlocksPerPartition - if (m * n == 0) { - if (numRowBlocks <= numColBlocks) { - m = 1 - n = targetNumBlocksPerPartition - } else { - n = 1 - m = targetNumBlocksPerPartition - } - } - (m, n) + private def getPartitionId(i: Int, j: Int): Int = { + require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).") + require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).") + i / rowsPerPart + j / colsPerPart * rowPartitions } /** Checks whether the partitioners have the same characteristics */ override def equals(obj: Any): Boolean = { obj match { case r: GridPartitioner => - (this.numRowBlocks == r.numRowBlocks) && (this.numColBlocks == r.numColBlocks) && - (this.numPartitions == r.numPartitions) + (this.rows == r.rows) && (this.cols == r.cols) && + (this.rowsPerPart == r.rowsPerPart) && (this.colsPerPart == r.colsPerPart) case _ => false } } } +private[mllib] object GridPartitioner { + + def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = { + new GridPartitioner(rows, cols, rowsPerPart, colsPerPart) + } + + def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = { + require(suggestedNumPartitions > 0) + val scale = 1.0 / math.sqrt(suggestedNumPartitions) + val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt + val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt + new GridPartitioner(rows, cols, rowsPerPart, colsPerPart) + } +} + /** * Represents a distributed matrix in blocks of local matrices. * @@ -191,7 +154,7 @@ class BlockMatrix( val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt private[mllib] var partitioner: GridPartitioner = - new GridPartitioner(numRowBlocks, numColBlocks, rdd.partitions.length) + GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = rdd.partitions.size) /** Returns the dimensions of the matrix. */ private def getDim: (Long, Long) = { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index a5016731d1f20..b52057beb7703 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -17,8 +17,10 @@ package org.apache.spark.mllib.linalg.distributed -import org.scalatest.FunSuite +import scala.util.Random + import breeze.linalg.{DenseMatrix => BDM} +import org.scalatest.FunSuite import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix} import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -51,50 +53,72 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { assert(gridBasedMat.numCols() === n) } - test("grid partitioner partitioning") { - val partitioner = gridBasedMat.partitioner - assert(partitioner.getPartition((0, 0)) === 0) - assert(partitioner.getPartition((0, 1)) === 0) - assert(partitioner.getPartition((1, 0)) === 1) - assert(partitioner.getPartition((1, 1)) === 1) - assert(partitioner.getPartition((2, 0)) === 2) - assert(partitioner.getPartition((2, 1)) === 2) - assert(partitioner.getPartition((1, 0, 1)) === 1) - assert(partitioner.getPartition((2, 0, 0)) === 2) - - val part2 = new GridPartitioner(10, 20, 10) - assert(part2.getPartition((0, 0)) === 0) - assert(part2.getPartition((0, 1)) === 0) - assert(part2.getPartition((0, 6)) === 2) - assert(part2.getPartition((3, 7)) === 2) - assert(part2.getPartition((3, 8)) === 4) - assert(part2.getPartition((3, 13)) === 6) - assert(part2.getPartition((9, 14)) === 7) - assert(part2.getPartition((9, 15)) === 7) - assert(part2.getPartition((9, 19)) === 9) + test("grid partitioner") { + val random = new Random() + // This should generate a 4x4 grid of 1x2 blocks. + val part0 = GridPartitioner(4, 7, suggestedNumPartitions = 12) + val expected0 = Array( + Array(0, 0, 4, 4, 8, 8, 12), + Array(1, 1, 5, 5, 9, 9, 13), + Array(2, 2, 6, 6, 10, 10, 14), + Array(3, 3, 7, 7, 11, 11, 15)) + for (i <- 0 until 4; j <- 0 until 7) { + assert(part0.getPartition((i, j)) === expected0(i)(j)) + assert(part0.getPartition((i, j, random.nextInt())) === expected0(i)(j)) + } + + intercept[IllegalArgumentException] { + part0.getPartition((-1, 0)) + } + + intercept[IllegalArgumentException] { + part0.getPartition((4, 0)) + } + + intercept[IllegalArgumentException] { + part0.getPartition((0, -1)) + } intercept[IllegalArgumentException] { - part2.getPartition((-1, 0)) + part0.getPartition((0, 7)) + } + + val part1 = GridPartitioner(2, 2, suggestedNumPartitions = 5) + val expected1 = Array( + Array(0, 2), + Array(1, 3)) + for (i <- 0 until 2; j <- 0 until 2) { + assert(part1.getPartition((i, j)) === expected1(i)(j)) + assert(part1.getPartition((i, j, random.nextInt())) === expected1(i)(j)) } + val part2 = GridPartitioner(2, 2, suggestedNumPartitions = 5) + assert(part0 !== part2) + assert(part1 === part2) + + val part3 = new GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2) + val expected3 = Array( + Array(0, 0, 2), + Array(1, 1, 3)) + for (i <- 0 until 2; j <- 0 until 3) { + assert(part3.getPartition((i, j)) === expected3(i)(j)) + assert(part3.getPartition((i, j, random.nextInt())) === expected3(i)(j)) + } + + val part4 = GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2) + assert(part3 === part4) + intercept[IllegalArgumentException] { - part2.getPartition((10, 0)) + new GridPartitioner(2, 2, rowsPerPart = 0, colsPerPart = 1) } intercept[IllegalArgumentException] { - part2.getPartition((9, 20)) + GridPartitioner(2, 2, rowsPerPart = 1, colsPerPart = 0) } - val part3 = new GridPartitioner(20, 10, 10) - assert(part3.getPartition((0, 0)) === 0) - assert(part3.getPartition((1, 0)) === 0) - assert(part3.getPartition((6, 0)) === 1) - assert(part3.getPartition((7, 3)) === 1) - assert(part3.getPartition((8, 3)) === 2) - assert(part3.getPartition((13, 3)) === 3) - assert(part3.getPartition((14, 9)) === 8) - assert(part3.getPartition((15, 9)) === 8) - assert(part3.getPartition((19, 9)) === 9) + intercept[IllegalArgumentException] { + GridPartitioner(2, 2, suggestedNumPartitions = 0) + } } test("toBreeze and toLocalMatrix") {