Skip to content

Commit

Permalink
almost finished addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Jan 27, 2015
1 parent f9d664b commit 1694c9e
Showing 1 changed file with 42 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,25 @@ import org.apache.spark.storage.StorageLevel
*
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param suggestedNumPartitions Number of partitions to partition the rdd into. The final number
* of partitions will be set to `min(suggestedNumPartitions,
* numRowBlocks * numColBlocks)`, because setting the number of
* partitions greater than the number of sub matrices is not useful.
*/
private[mllib] class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
val numParts: Int) extends Partitioner {
suggestedNumPartitions: Int) extends Partitioner {
// Having the number of partitions greater than the number of sub matrices does not help
override val numPartitions = math.min(numParts, numRowBlocks * numColBlocks)
override val numPartitions = math.min(suggestedNumPartitions, numRowBlocks * numColBlocks)

val totalBlocks = numRowBlocks.toLong * numColBlocks
// Gives the number of blocks that need to be in each partition
val targetNumBlocksPerPartition = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
// Number of neighboring blocks to take in each row
val numRowBlocksPerPartition = math.ceil(numRowBlocks * 1.0 / targetNumBlocksPerPartition).toInt
// Number of neighboring blocks to take in each column
val numColBlocksPerPartition = math.ceil(numColBlocks * 1.0 / targetNumBlocksPerPartition).toInt

/**
* Returns the index of the partition the SubMatrix belongs to. Tries to achieve block wise
Expand All @@ -51,27 +63,20 @@ private[mllib] class GridPartitioner(
override def getPartition(key: Any): Int = {
key match {
case (blockRowIndex: Int, blockColIndex: Int) =>
getBlockId(blockRowIndex, blockColIndex)
getPartitionId(blockRowIndex, blockColIndex)
case (blockRowIndex: Int, innerIndex: Int, blockColIndex: Int) =>
getBlockId(blockRowIndex, blockColIndex)
getPartitionId(blockRowIndex, blockColIndex)
case _ =>
throw new IllegalArgumentException(s"Unrecognized key. key: $key")
}
}

/** Partitions sub-matrices as blocks with neighboring sub-matrices. */
private def getBlockId(blockRowIndex: Int, blockColIndex: Int): Int = {
val totalBlocks = numRowBlocks * numColBlocks
// Gives the number of blocks that need to be in each partition
val partitionRatio = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
// Number of neighboring blocks to take in each row
val subBlocksPerRow = math.ceil(numRowBlocks * 1.0 / partitionRatio).toInt
// Number of neighboring blocks to take in each column
val subBlocksPerCol = math.ceil(numColBlocks * 1.0 / partitionRatio).toInt
private def getPartitionId(blockRowIndex: Int, blockColIndex: Int): Int = {
// Coordinates of the block
val i = blockRowIndex / subBlocksPerRow
val j = blockColIndex / subBlocksPerCol
val blocksPerRow = math.ceil(numRowBlocks * 1.0 / subBlocksPerRow).toInt
val i = blockRowIndex / numRowBlocksPerPartition
val j = blockColIndex / numColBlocksPerPartition
val blocksPerRow = math.ceil(numRowBlocks * 1.0 / numRowBlocksPerPartition).toInt
j * blocksPerRow + i
}

Expand All @@ -91,10 +96,10 @@ private[mllib] class GridPartitioner(
* Represents a distributed matrix in blocks of local matrices.
*
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param nRows Number of rows of this matrix
* @param nCols Number of columns of this matrix
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
* the number of rows will be calculated when `numRows` is invoked.
* @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
* zero, the number of columns will be calculated when `numCols` is invoked.
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
* rows are not required to have the given number of rows
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final
Expand All @@ -104,8 +109,6 @@ class BlockMatrix(
val rdd: RDD[((Int, Int), Matrix)],
private var nRows: Long,
private var nCols: Long,
val numRowBlocks: Int,
val numColBlocks: Int,
val rowsPerBlock: Int,
val colsPerBlock: Int) extends DistributedMatrix with Logging {

Expand All @@ -115,25 +118,18 @@ class BlockMatrix(
* Alternate constructor for BlockMatrix without the input of the number of rows and columns.
*
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
* rows are not required to have the given number of rows
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final
* columns are not required to have the given number of columns
*/
def this(
rdd: RDD[((Int, Int), Matrix)],
numRowBlocks: Int,
numColBlocks: Int,
rowsPerBlock: Int,
colsPerBlock: Int) = {
this(rdd, 0L, 0L, numRowBlocks, numColBlocks, rowsPerBlock, colsPerBlock)
this(rdd, 0L, 0L, rowsPerBlock, colsPerBlock)
}

private[mllib] var partitioner: GridPartitioner =
new GridPartitioner(numRowBlocks, numColBlocks, rdd.partitions.length)

private lazy val dims: (Long, Long) = getDim

override def numRows(): Long = {
Expand All @@ -146,48 +142,21 @@ class BlockMatrix(
nCols
}

val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt
val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt

private[mllib] var partitioner: GridPartitioner =
new GridPartitioner(numRowBlocks, numColBlocks, rdd.partitions.length)



/** Returns the dimensions of the matrix. */
private def getDim: (Long, Long) = {
case class MatrixMetaData(var rowIndex: Int, var colIndex: Int,
var numRows: Int, var numCols: Int)
// picks the sizes of the matrix with the maximum indices
def pickSizeByGreaterIndex(example: MatrixMetaData, base: MatrixMetaData): MatrixMetaData = {
if (example.rowIndex > base.rowIndex) {
base.rowIndex = example.rowIndex
base.numRows = example.numRows
}
if (example.colIndex > base.colIndex) {
base.colIndex = example.colIndex
base.numCols = example.numCols
}
base
}

// Aggregate will return an error if the rdd is empty
val lastRowCol = rdd.treeAggregate(new MatrixMetaData(0, 0, 0, 0))(
seqOp = (c, v) => (c, v) match { case (base, ((blockXInd, blockYInd), mat)) =>
pickSizeByGreaterIndex(
new MatrixMetaData(blockXInd, blockYInd, mat.numRows, mat.numCols), base)
},
combOp = (c1, c2) => (c1, c2) match {
case (res1, res2) =>
pickSizeByGreaterIndex(res1, res2)
})
// We add the size of the edge matrices, because they can be less than the specified
// rowsPerBlock or colsPerBlock.
(lastRowCol.rowIndex.toLong * rowsPerBlock + lastRowCol.numRows,
lastRowCol.colIndex.toLong * colsPerBlock + lastRowCol.numCols)
}
val (rows, cols) = rdd.map { case ((blockRowIndex, blockColIndex), mat) =>
(blockRowIndex * rowsPerBlock + mat.numRows, blockColIndex * colsPerBlock + mat.numCols)
}.reduce((x0, x1) => (math.max(x0._1, x1._1), math.max(x0._2, x1._2)))

/** Returns the Frobenius Norm of the matrix */
def normFro(): Double = {
math.sqrt(rdd.map { mat => mat._2 match {
case sparse: SparseMatrix =>
sparse.values.map(x => math.pow(x, 2)).sum
case dense: DenseMatrix =>
dense.values.map(x => math.pow(x, 2)).sum
}
}.reduce(_ + _))
(math.max(rows, nRows), math.max(cols, nCols))
}

/** Cache the underlying RDD. */
Expand All @@ -210,14 +179,14 @@ class BlockMatrix(
s"Int.MaxValue. Currently numCols: ${numCols()}")
val nRows = numRows().toInt
val nCols = numCols().toInt
val mem = nRows * nCols * 8 / 1000000
val mem = nRows.toLong * nCols / 125000
if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!")

val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1))
val parts = rdd.collect()
val values = new Array[Double](nRows * nCols)
parts.foreach { case ((rowIndex, colIndex), block) =>
val rowOffset = rowIndex * rowsPerBlock
val colOffset = colIndex * colsPerBlock
parts.foreach { case ((blockRowIndex, blockColIndex), block) =>
val rowOffset = blockRowIndex * rowsPerBlock
val colOffset = blockColIndex * colsPerBlock
var j = 0
val mat = block.toArray
while (j < block.numCols) {
Expand Down

0 comments on commit 1694c9e

Please sign in to comment.