Skip to content

Commit

Permalink
fixed gridPartitioner and added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Jan 27, 2015
1 parent 140f20e commit 5eecd48
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.util.Utils

import org.apache.spark.{Logging, Partitioner}
import org.apache.spark.mllib.linalg._
Expand All @@ -39,16 +40,17 @@ private[mllib] class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
suggestedNumPartitions: Int) extends Partitioner {
private val totalBlocks = numRowBlocks.toLong * numColBlocks
// Having the number of partitions greater than the number of sub matrices does not help
override val numPartitions = math.min(suggestedNumPartitions, numRowBlocks * numColBlocks)
override val numPartitions = math.min(suggestedNumPartitions, totalBlocks).toInt

val totalBlocks = numRowBlocks.toLong * numColBlocks
// Gives the number of blocks that need to be in each partition
val targetNumBlocksPerPartition = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
private val blockLengthsPerPartition = findOptimalBlockLengths
// Number of neighboring blocks to take in each row
val numRowBlocksPerPartition = math.ceil(numRowBlocks * 1.0 / targetNumBlocksPerPartition).toInt
private val numRowBlocksPerPartition = blockLengthsPerPartition._1
// Number of neighboring blocks to take in each column
val numColBlocksPerPartition = math.ceil(numColBlocks * 1.0 / targetNumBlocksPerPartition).toInt
private val numColBlocksPerPartition = blockLengthsPerPartition._2
// Number of rows of partitions
private val blocksPerRow = math.ceil(numRowBlocks * 1.0 / numRowBlocksPerPartition).toInt

/**
* Returns the index of the partition the SubMatrix belongs to. Tries to achieve block wise
Expand All @@ -73,11 +75,54 @@ private[mllib] class GridPartitioner(

/** Partitions sub-matrices as blocks with neighboring sub-matrices. */
private def getPartitionId(blockRowIndex: Int, blockColIndex: Int): Int = {
require(0 <= blockRowIndex && blockRowIndex < numRowBlocks, "The blockRowIndex in the key " +
s"must be in the range 0 <= blockRowIndex < numRowBlocks. blockRowIndex: $blockRowIndex," +
s"numRowBlocks: $numRowBlocks")
require(0 <= blockRowIndex && blockColIndex < numColBlocks, "The blockColIndex in the key " +
s"must be in the range 0 <= blockRowIndex < numColBlocks. blockColIndex: $blockColIndex, " +
s"numColBlocks: $numColBlocks")
// Coordinates of the block
val i = blockRowIndex / numRowBlocksPerPartition
val j = blockColIndex / numColBlocksPerPartition
val blocksPerRow = math.ceil(numRowBlocks * 1.0 / numRowBlocksPerPartition).toInt
j * blocksPerRow + i
// The mod shouldn't be required but is added as a guarantee for possible corner cases
Utils.nonNegativeMod(j * blocksPerRow + i, numPartitions)
}

/** Tries to calculate the optimal number of blocks that should be in each partition. */
private def findOptimalBlockLengths: (Int, Int) = {
// Gives the optimal number of blocks that need to be in each partition
val targetNumBlocksPerPartition = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
// Number of neighboring blocks to take in each row
var m = math.ceil(math.sqrt(targetNumBlocksPerPartition)).toInt
// Number of neighboring blocks to take in each column
var n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt
// Try to make m and n close to each other while making sure that we don't exceed the number
// of partitions
var numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m)
var numBlocksForCols = math.ceil(numColBlocks * 1.0 / n)
while ((numBlocksForRows * numBlocksForCols > numPartitions) && (m * n != 0)) {
if (numRowBlocks <= numColBlocks) {
m += 1
n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt
} else {
n += 1
m = math.ceil(targetNumBlocksPerPartition * 1.0 / n).toInt
}
numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m)
numBlocksForCols = math.ceil(numColBlocks * 1.0 / n)
}
// If a good partitioning scheme couldn't be found, set the side with the smaller dimension to
// 1 and the other to the number of targetNumBlocksPerPartition
if (m * n == 0) {
if (numRowBlocks <= numColBlocks) {
m = 1
n = targetNumBlocksPerPartition
} else {
n = 1
m = targetNumBlocksPerPartition
}
}
(m, n)
}

/** Checks whether the partitioners have the same characteristics */
Expand Down Expand Up @@ -148,8 +193,6 @@ class BlockMatrix(
private[mllib] var partitioner: GridPartitioner =
new GridPartitioner(numRowBlocks, numColBlocks, rdd.partitions.length)



/** Returns the dimensions of the matrix. */
private def getDim: (Long, Long) = {
val (rows, cols) = rdd.map { case ((blockRowIndex, blockColIndex), mat) =>
Expand Down Expand Up @@ -177,27 +220,21 @@ class BlockMatrix(
s"Int.MaxValue. Currently numRows: ${numRows()}")
require(numCols() < Int.MaxValue, "The number of columns of this matrix should be less than " +
s"Int.MaxValue. Currently numCols: ${numCols()}")
require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " +
s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}")
val nRows = numRows().toInt
val nCols = numCols().toInt
val mem = nRows.toLong * nCols / 125000
val mem = nRows * nCols / 125000
if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!")

val parts = rdd.collect()
val values = new Array[Double](nRows * nCols)
parts.foreach { case ((blockRowIndex, blockColIndex), block) =>
val rowOffset = blockRowIndex * rowsPerBlock
val colOffset = blockColIndex * colsPerBlock
var j = 0
val mat = block.toArray
while (j < block.numCols) {
var i = 0
val indStart = (j + colOffset) * nRows + rowOffset
val matStart = j * block.numRows
while (i < block.numRows) {
values(indStart + i) = mat(matStart + i)
i += 1
}
j += 1
block.foreachActive { (i, j, v) =>
val indexOffset = (j + colOffset) * nRows + rowOffset + i
values(indexOffset) = v
}
}
new DenseMatrix(nRows, nCols, values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,13 @@ import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
import org.apache.spark.mllib.util.MLlibTestSparkContext

// Input values for the tests
private object BlockMatrixSuite {
class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {

val m = 5
val n = 4
val rowPerPart = 2
val colPerPart = 2
val numRowBlocks = 3
val numColBlocks = 2
}

class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {

val m = BlockMatrixSuite.m
val n = BlockMatrixSuite.n
val rowPerPart = BlockMatrixSuite.rowPerPart
val colPerPart = BlockMatrixSuite.colPerPart
val numRowBlocks = BlockMatrixSuite.numRowBlocks
val numColBlocks = BlockMatrixSuite.numColBlocks
val numPartitions = 3
var gridBasedMat: BlockMatrix = _
type SubMatrix = ((Int, Int), Matrix)

Expand All @@ -54,14 +43,58 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
new SubMatrix((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
new SubMatrix((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))

gridBasedMat = new BlockMatrix(sc.parallelize(entries, 2), numRowBlocks, numColBlocks,
rowPerPart, colPerPart)
gridBasedMat = new BlockMatrix(sc.parallelize(entries, numPartitions), rowPerPart, colPerPart)
}

test("size and frobenius norm") {
test("size") {
assert(gridBasedMat.numRows() === m)
assert(gridBasedMat.numCols() === n)
assert(gridBasedMat.normFro() === 7.0)
}

test("grid partitioner partitioning") {
val partitioner = gridBasedMat.partitioner
assert(partitioner.getPartition((0, 0)) === 0)
assert(partitioner.getPartition((0, 1)) === 0)
assert(partitioner.getPartition((1, 0)) === 1)
assert(partitioner.getPartition((1, 1)) === 1)
assert(partitioner.getPartition((2, 0)) === 2)
assert(partitioner.getPartition((2, 1)) === 2)
assert(partitioner.getPartition((1, 0, 1)) === 1)
assert(partitioner.getPartition((2, 0, 0)) === 2)

val part2 = new GridPartitioner(10, 20, 10)
assert(part2.getPartition((0, 0)) === 0)
assert(part2.getPartition((0, 1)) === 0)
assert(part2.getPartition((0, 6)) === 2)
assert(part2.getPartition((3, 7)) === 2)
assert(part2.getPartition((3, 8)) === 4)
assert(part2.getPartition((3, 13)) === 6)
assert(part2.getPartition((9, 14)) === 7)
assert(part2.getPartition((9, 15)) === 7)
assert(part2.getPartition((9, 19)) === 9)

intercept[IllegalArgumentException] {
part2.getPartition((-1, 0))
}

intercept[IllegalArgumentException] {
part2.getPartition((10, 0))
}

intercept[IllegalArgumentException] {
part2.getPartition((9, 20))
}

val part3 = new GridPartitioner(20, 10, 10)
assert(part3.getPartition((0, 0)) === 0)
assert(part3.getPartition((1, 0)) === 0)
assert(part3.getPartition((6, 0)) === 1)
assert(part3.getPartition((7, 3)) === 1)
assert(part3.getPartition((8, 3)) === 2)
assert(part3.getPartition((13, 3)) === 3)
assert(part3.getPartition((14, 9)) === 8)
assert(part3.getPartition((15, 9)) === 8)
assert(part3.getPartition((19, 9)) === 9)
}

test("toBreeze and toLocalMatrix") {
Expand Down

0 comments on commit 5eecd48

Please sign in to comment.