forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-3974][MLlib] Distributed Block Matrix Abstractions
This pull request includes the abstractions for the distributed BlockMatrix representation. `BlockMatrix` will allow users to store very large matrices in small blocks of local matrices. Specific partitioners, such as `RowBasedPartitioner` and `ColumnBasedPartitioner`, are implemented in order to optimize addition and multiplication operations that will be added in a following PR. This work is based on the ml-matrix repo developed at the AMPLab at UC Berkeley, CA. https://github.com/amplab/ml-matrix Additional thanks to rezazadeh, shivaram, and mengxr for guidance on the design. Author: Burak Yavuz <[email protected]> Author: Xiangrui Meng <[email protected]> Author: Burak Yavuz <[email protected]> Author: Burak Yavuz <[email protected]> Author: Burak Yavuz <[email protected]> Closes apache#3200 from brkyvz/SPARK-3974 and squashes the following commits: a8eace2 [Burak Yavuz] Merge pull request #2 from mengxr/brkyvz-SPARK-3974 feb32a7 [Xiangrui Meng] update tests e1d3ee8 [Xiangrui Meng] minor updates 24ec7b8 [Xiangrui Meng] update grid partitioner 5eecd48 [Burak Yavuz] fixed gridPartitioner and added tests 140f20e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-3974 1694c9e [Burak Yavuz] almost finished addressing comments f9d664b [Burak Yavuz] updated API and modified partitioning scheme eebbdf7 [Burak Yavuz] preliminary changes addressing code review 1a63b20 [Burak Yavuz] [SPARK-3974] Remove setPartition method. Isn't required 1e8bb2a [Burak Yavuz] [SPARK-3974] Change return type of cache and persist 239ab4b [Burak Yavuz] [SPARK-3974] Addressed @jkbradley's comments ba414d2 [Burak Yavuz] [SPARK-3974] fixed frobenius norm ab6cde0 [Burak Yavuz] [SPARK-3974] Modifications cleaning code up, making size calculation more robust 9ae85aa [Burak Yavuz] [SPARK-3974] Made partitioner a variable inside BlockMatrix instead of a constructor variable d033861 [Burak Yavuz] [SPARK-3974] Removed SubMatrixInfo and added constructor without partitioner 49b9586 [Burak Yavuz] [SPARK-3974] Updated testing utils from master 645afbe [Burak Yavuz] [SPARK-3974] Pull latest master b05aabb [Burak Yavuz] [SPARK-3974] Updated tests to reflect changes 19c17e8 [Burak Yavuz] [SPARK-3974] Changed blockIdRow and blockIdCol 589fbb6 [Burak Yavuz] [SPARK-3974] Code review feedback addressed aa8f086 [Burak Yavuz] [SPARK-3974] Additional comments added f378e16 [Burak Yavuz] [SPARK-3974] Block Matrix Abstractions ready b693209 [Burak Yavuz] Ready for Pull request
- Loading branch information
Showing
2 changed files
with
351 additions
and
0 deletions.
There are no files selected for viewing
216 changes: 216 additions & 0 deletions
216
mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.mllib.linalg.distributed | ||
|
||
import breeze.linalg.{DenseMatrix => BDM} | ||
|
||
import org.apache.spark.{Logging, Partitioner} | ||
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix} | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.storage.StorageLevel | ||
|
||
/** | ||
* A grid partitioner, which uses a regular grid to partition coordinates. | ||
* | ||
* @param rows Number of rows. | ||
* @param cols Number of columns. | ||
* @param rowsPerPart Number of rows per partition, which may be less at the bottom edge. | ||
* @param colsPerPart Number of columns per partition, which may be less at the right edge. | ||
*/ | ||
private[mllib] class GridPartitioner( | ||
val rows: Int, | ||
val cols: Int, | ||
val rowsPerPart: Int, | ||
val colsPerPart: Int) extends Partitioner { | ||
|
||
require(rows > 0) | ||
require(cols > 0) | ||
require(rowsPerPart > 0) | ||
require(colsPerPart > 0) | ||
|
||
private val rowPartitions = math.ceil(rows / rowsPerPart).toInt | ||
private val colPartitions = math.ceil(cols / colsPerPart).toInt | ||
|
||
override val numPartitions = rowPartitions * colPartitions | ||
|
||
/** | ||
* Returns the index of the partition the input coordinate belongs to. | ||
* | ||
* @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in | ||
* multiplication. k is ignored in computing partitions. | ||
* @return The index of the partition, which the coordinate belongs to. | ||
*/ | ||
override def getPartition(key: Any): Int = { | ||
key match { | ||
case (i: Int, j: Int) => | ||
getPartitionId(i, j) | ||
case (i: Int, j: Int, _: Int) => | ||
getPartitionId(i, j) | ||
case _ => | ||
throw new IllegalArgumentException(s"Unrecognized key: $key.") | ||
} | ||
} | ||
|
||
/** Partitions sub-matrices as blocks with neighboring sub-matrices. */ | ||
private def getPartitionId(i: Int, j: Int): Int = { | ||
require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).") | ||
require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).") | ||
i / rowsPerPart + j / colsPerPart * rowPartitions | ||
} | ||
|
||
override def equals(obj: Any): Boolean = { | ||
obj match { | ||
case r: GridPartitioner => | ||
(this.rows == r.rows) && (this.cols == r.cols) && | ||
(this.rowsPerPart == r.rowsPerPart) && (this.colsPerPart == r.colsPerPart) | ||
case _ => | ||
false | ||
} | ||
} | ||
} | ||
|
||
private[mllib] object GridPartitioner { | ||
|
||
/** Creates a new [[GridPartitioner]] instance. */ | ||
def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = { | ||
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart) | ||
} | ||
|
||
/** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */ | ||
def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = { | ||
require(suggestedNumPartitions > 0) | ||
val scale = 1.0 / math.sqrt(suggestedNumPartitions) | ||
val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt | ||
val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt | ||
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart) | ||
} | ||
} | ||
|
||
/** | ||
* Represents a distributed matrix in blocks of local matrices. | ||
* | ||
* @param blocks The RDD of sub-matrix blocks (blockRowIndex, blockColIndex, sub-matrix) that form | ||
* this distributed matrix. | ||
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final | ||
* rows are not required to have the given number of rows | ||
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final | ||
* columns are not required to have the given number of columns | ||
* @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero, | ||
* the number of rows will be calculated when `numRows` is invoked. | ||
* @param nCols Number of columns of this matrix. If the supplied value is less than or equal to | ||
* zero, the number of columns will be calculated when `numCols` is invoked. | ||
*/ | ||
class BlockMatrix( | ||
val blocks: RDD[((Int, Int), Matrix)], | ||
val rowsPerBlock: Int, | ||
val colsPerBlock: Int, | ||
private var nRows: Long, | ||
private var nCols: Long) extends DistributedMatrix with Logging { | ||
|
||
private type MatrixBlock = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), sub-matrix) | ||
|
||
/** | ||
* Alternate constructor for BlockMatrix without the input of the number of rows and columns. | ||
* | ||
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix | ||
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final | ||
* rows are not required to have the given number of rows | ||
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final | ||
* columns are not required to have the given number of columns | ||
*/ | ||
def this( | ||
rdd: RDD[((Int, Int), Matrix)], | ||
rowsPerBlock: Int, | ||
colsPerBlock: Int) = { | ||
this(rdd, rowsPerBlock, colsPerBlock, 0L, 0L) | ||
} | ||
|
||
override def numRows(): Long = { | ||
if (nRows <= 0L) estimateDim() | ||
nRows | ||
} | ||
|
||
override def numCols(): Long = { | ||
if (nCols <= 0L) estimateDim() | ||
nCols | ||
} | ||
|
||
val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt | ||
val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt | ||
|
||
private[mllib] var partitioner: GridPartitioner = | ||
GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size) | ||
|
||
/** Estimates the dimensions of the matrix. */ | ||
private def estimateDim(): Unit = { | ||
val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) => | ||
(blockRowIndex.toLong * rowsPerBlock + mat.numRows, | ||
blockColIndex.toLong * colsPerBlock + mat.numCols) | ||
}.reduce { (x0, x1) => | ||
(math.max(x0._1, x1._1), math.max(x0._2, x1._2)) | ||
} | ||
if (nRows <= 0L) nRows = rows | ||
assert(rows <= nRows, s"The number of rows $rows is more than claimed $nRows.") | ||
if (nCols <= 0L) nCols = cols | ||
assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.") | ||
} | ||
|
||
/** Caches the underlying RDD. */ | ||
def cache(): this.type = { | ||
blocks.cache() | ||
this | ||
} | ||
|
||
/** Persists the underlying RDD with the specified storage level. */ | ||
def persist(storageLevel: StorageLevel): this.type = { | ||
blocks.persist(storageLevel) | ||
this | ||
} | ||
|
||
/** Collect the distributed matrix on the driver as a `DenseMatrix`. */ | ||
def toLocalMatrix(): Matrix = { | ||
require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " + | ||
s"Int.MaxValue. Currently numRows: ${numRows()}") | ||
require(numCols() < Int.MaxValue, "The number of columns of this matrix should be less than " + | ||
s"Int.MaxValue. Currently numCols: ${numCols()}") | ||
require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " + | ||
s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}") | ||
val m = numRows().toInt | ||
val n = numCols().toInt | ||
val mem = m * n / 125000 | ||
if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!") | ||
|
||
val localBlocks = blocks.collect() | ||
val values = new Array[Double](m * n) | ||
localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) => | ||
val rowOffset = blockRowIndex * rowsPerBlock | ||
val colOffset = blockColIndex * colsPerBlock | ||
submat.foreachActive { (i, j, v) => | ||
val indexOffset = (j + colOffset) * m + rowOffset + i | ||
values(indexOffset) = v | ||
} | ||
} | ||
new DenseMatrix(m, n, values) | ||
} | ||
|
||
/** Collects data and assembles a local dense breeze matrix (for test only). */ | ||
private[mllib] def toBreeze(): BDM[Double] = { | ||
val localMat = toLocalMatrix() | ||
new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray) | ||
} | ||
} |
135 changes: 135 additions & 0 deletions
135
mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.mllib.linalg.distributed | ||
|
||
import scala.util.Random | ||
|
||
import breeze.linalg.{DenseMatrix => BDM} | ||
import org.scalatest.FunSuite | ||
|
||
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix} | ||
import org.apache.spark.mllib.util.MLlibTestSparkContext | ||
|
||
class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { | ||
|
||
val m = 5 | ||
val n = 4 | ||
val rowPerPart = 2 | ||
val colPerPart = 2 | ||
val numPartitions = 3 | ||
var gridBasedMat: BlockMatrix = _ | ||
|
||
override def beforeAll() { | ||
super.beforeAll() | ||
|
||
val blocks: Seq[((Int, Int), Matrix)] = Seq( | ||
((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), | ||
((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), | ||
((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))), | ||
((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), | ||
((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) | ||
|
||
gridBasedMat = new BlockMatrix(sc.parallelize(blocks, numPartitions), rowPerPart, colPerPart) | ||
} | ||
|
||
test("size") { | ||
assert(gridBasedMat.numRows() === m) | ||
assert(gridBasedMat.numCols() === n) | ||
} | ||
|
||
test("grid partitioner") { | ||
val random = new Random() | ||
// This should generate a 4x4 grid of 1x2 blocks. | ||
val part0 = GridPartitioner(4, 7, suggestedNumPartitions = 12) | ||
val expected0 = Array( | ||
Array(0, 0, 4, 4, 8, 8, 12), | ||
Array(1, 1, 5, 5, 9, 9, 13), | ||
Array(2, 2, 6, 6, 10, 10, 14), | ||
Array(3, 3, 7, 7, 11, 11, 15)) | ||
for (i <- 0 until 4; j <- 0 until 7) { | ||
assert(part0.getPartition((i, j)) === expected0(i)(j)) | ||
assert(part0.getPartition((i, j, random.nextInt())) === expected0(i)(j)) | ||
} | ||
|
||
intercept[IllegalArgumentException] { | ||
part0.getPartition((-1, 0)) | ||
} | ||
|
||
intercept[IllegalArgumentException] { | ||
part0.getPartition((4, 0)) | ||
} | ||
|
||
intercept[IllegalArgumentException] { | ||
part0.getPartition((0, -1)) | ||
} | ||
|
||
intercept[IllegalArgumentException] { | ||
part0.getPartition((0, 7)) | ||
} | ||
|
||
val part1 = GridPartitioner(2, 2, suggestedNumPartitions = 5) | ||
val expected1 = Array( | ||
Array(0, 2), | ||
Array(1, 3)) | ||
for (i <- 0 until 2; j <- 0 until 2) { | ||
assert(part1.getPartition((i, j)) === expected1(i)(j)) | ||
assert(part1.getPartition((i, j, random.nextInt())) === expected1(i)(j)) | ||
} | ||
|
||
val part2 = GridPartitioner(2, 2, suggestedNumPartitions = 5) | ||
assert(part0 !== part2) | ||
assert(part1 === part2) | ||
|
||
val part3 = new GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2) | ||
val expected3 = Array( | ||
Array(0, 0, 2), | ||
Array(1, 1, 3)) | ||
for (i <- 0 until 2; j <- 0 until 3) { | ||
assert(part3.getPartition((i, j)) === expected3(i)(j)) | ||
assert(part3.getPartition((i, j, random.nextInt())) === expected3(i)(j)) | ||
} | ||
|
||
val part4 = GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2) | ||
assert(part3 === part4) | ||
|
||
intercept[IllegalArgumentException] { | ||
new GridPartitioner(2, 2, rowsPerPart = 0, colsPerPart = 1) | ||
} | ||
|
||
intercept[IllegalArgumentException] { | ||
GridPartitioner(2, 2, rowsPerPart = 1, colsPerPart = 0) | ||
} | ||
|
||
intercept[IllegalArgumentException] { | ||
GridPartitioner(2, 2, suggestedNumPartitions = 0) | ||
} | ||
} | ||
|
||
test("toBreeze and toLocalMatrix") { | ||
val expected = BDM( | ||
(1.0, 0.0, 0.0, 0.0), | ||
(0.0, 2.0, 1.0, 0.0), | ||
(3.0, 1.0, 1.0, 0.0), | ||
(0.0, 1.0, 2.0, 1.0), | ||
(0.0, 0.0, 1.0, 5.0)) | ||
|
||
val dense = Matrices.fromBreeze(expected).asInstanceOf[DenseMatrix] | ||
assert(gridBasedMat.toLocalMatrix() === dense) | ||
assert(gridBasedMat.toBreeze() === expected) | ||
} | ||
} |