Skip to content

Commit

Permalink
Merge branch 'master' into broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Aug 13, 2014
2 parents c7baa8c + fe47359 commit 631a827
Show file tree
Hide file tree
Showing 22 changed files with 751 additions and 500 deletions.
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,24 @@ trait CompressionCodec {


private[spark] object CompressionCodec {

private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)

def createCodec(conf: SparkConf): CompressionCodec = {
createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC))
}

def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
val ctor = Class.forName(codecName, true, Utils.getContextOrSparkClassLoader)
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
.getConstructor(classOf[SparkConf])
ctor.newInstance(conf).asInstanceOf[CompressionCodec]
}

val DEFAULT_COMPRESSION_CODEC = classOf[SnappyCompressionCodec].getName
val DEFAULT_COMPRESSION_CODEC = "snappy"
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// If our vector's size has exceeded the threshold, request more memory
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * (memoryGrowthFactor - 1)).toLong
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
// Hold the accounting lock, in case another thread concurrently puts a block that
// takes up the unrolling space we just ensured here
accountingLock.synchronized {
Expand All @@ -254,7 +254,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold = currentSize + amountToRequest
memoryThreshold += amountToRequest
}
}
elementsUnrolled += 1
Expand Down
29 changes: 22 additions & 7 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,32 @@ private[spark] object Utils extends Logging {
/** Copy all data from an InputStream to an OutputStream */
def copyStream(in: InputStream,
out: OutputStream,
closeStreams: Boolean = false)
closeStreams: Boolean = false): Long =
{
var count = 0L
try {
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
out.write(buf, 0, n)
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) {
// When both streams are File stream, use transferTo to improve copy performance.
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
val size = inChannel.size()

// In case transferTo method transferred less data than we have required.
while (count < size) {
count += inChannel.transferTo(count, size - count, outChannel)
}
} else {
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
out.write(buf, 0, n)
count += n
}
}
}
count
} finally {
if (closeStreams) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,12 +745,11 @@ private[spark] class ExternalSorter[K, V, C](
try {
out = new FileOutputStream(outputFile)
for (i <- 0 until numPartitions) {
val file = partitionWriters(i).fileSegment().file
in = new FileInputStream(file)
org.apache.spark.util.Utils.copyStream(in, out)
in = new FileInputStream(partitionWriters(i).fileSegment().file)
val size = org.apache.spark.util.Utils.copyStream(in, out, false)
in.close()
in = null
lengths(i) = file.length()
lengths(i) = size
offsets(i + 1) = offsets(i) + lengths(i)
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,33 @@ class CompressionCodecSuite extends FunSuite {
testCodec(codec)
}

test("lz4 compression codec short form") {
val codec = CompressionCodec.createCodec(conf, "lz4")
assert(codec.getClass === classOf[LZ4CompressionCodec])
testCodec(codec)
}

test("lzf compression codec") {
val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName)
assert(codec.getClass === classOf[LZFCompressionCodec])
testCodec(codec)
}

test("lzf compression codec short form") {
val codec = CompressionCodec.createCodec(conf, "lzf")
assert(codec.getClass === classOf[LZFCompressionCodec])
testCodec(codec)
}

test("snappy compression codec") {
val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName)
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}

test("snappy compression codec short form") {
val codec = CompressionCodec.createCodec(conf, "snappy")
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}
}
8 changes: 5 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,12 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.io.compression.codec</code></td>
<td>org.apache.spark.io.<br />SnappyCompressionCodec</td>
<td>snappy</td>
<td>
The codec used to compress internal data such as RDD partitions and shuffle outputs.
By default, Spark provides three codecs: <code>org.apache.spark.io.LZ4CompressionCodec</code>,
The codec used to compress internal data such as RDD partitions and shuffle outputs. By default,
Spark provides three codecs: <code>lz4</code>, <code>lzf</code>, and <code>snappy</code>. You
can also use fully qualified class names to specify the codec, e.g.
<code>org.apache.spark.io.LZ4CompressionCodec</code>,
<code>org.apache.spark.io.LZFCompressionCodec</code>,
and <code>org.apache.spark.io.SnappyCompressionCodec</code>.
</td>
Expand Down
117 changes: 29 additions & 88 deletions docs/mllib-basics.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ displayTitle: <a href="mllib-guide.html">MLlib</a> - Basics

MLlib supports local vectors and matrices stored on a single machine,
as well as distributed matrices backed by one or more RDDs.
In the current implementation, local vectors and matrices are simple data models
to serve public interfaces. The underlying linear algebra operations are provided by
Local vectors and local matrices are simple data models
that serve as public interfaces. The underlying linear algebra operations are provided by
[Breeze](http://www.scalanlp.org/) and [jblas](http://jblas.org/).
A training example used in supervised learning is called "labeled point" in MLlib.
A training example used in supervised learning is called a "labeled point" in MLlib.

## Local vector

A local vector has integer-typed and 0-based indices and double-typed values, stored on a single
machine. MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by
a double array representing its entry values, while a sparse vector is backed by two parallel
arrays: indices and values. For example, a vector $(1.0, 0.0, 3.0)$ can be represented in dense
arrays: indices and values. For example, a vector `(1.0, 0.0, 3.0)` can be represented in dense
format as `[1.0, 0.0, 3.0]` or in sparse format as `(3, [0, 2], [1.0, 3.0])`, where `3` is the size
of the vector.

Expand All @@ -44,8 +44,7 @@ val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
{% endhighlight %}

***Note***

***Note:***
Scala imports `scala.collection.immutable.Vector` by default, so you have to import
`org.apache.spark.mllib.linalg.Vector` explicitly to use MLlib's `Vector`.

Expand Down Expand Up @@ -110,8 +109,8 @@ sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])),
A labeled point is a local vector, either dense or sparse, associated with a label/response.
In MLlib, labeled points are used in supervised learning algorithms.
We use a double to store a label, so we can use labeled points in both regression and classification.
For binary classification, label should be either $0$ (negative) or $1$ (positive).
For multiclass classification, labels should be class indices staring from zero: $0, 1, 2, \ldots$.
For binary classification, a label should be either `0` (negative) or `1` (positive).
For multiclass classification, labels should be class indices starting from zero: `0, 1, 2, ...`.

<div class="codetabs">

Expand Down Expand Up @@ -172,7 +171,7 @@ neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
It is very common in practice to have sparse training data. MLlib supports reading training
examples stored in `LIBSVM` format, which is the default format used by
[`LIBSVM`](http://www.csie.ntu.edu.tw/~cjlin/libsvm/) and
[`LIBLINEAR`](http://www.csie.ntu.edu.tw/~cjlin/liblinear/). It is a text format. Each line
[`LIBLINEAR`](http://www.csie.ntu.edu.tw/~cjlin/liblinear/). It is a text format in which each line
represents a labeled sparse feature vector using the following format:

~~~
Expand Down Expand Up @@ -226,23 +225,22 @@ examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
## Local matrix

A local matrix has integer-typed row and column indices and double-typed values, stored on a single
machine. MLlib supports dense matrix, whose entry values are stored in a single double array in
machine. MLlib supports dense matrices, whose entry values are stored in a single double array in
column major. For example, the following matrix `\[ \begin{pmatrix}
1.0 & 2.0 \\
3.0 & 4.0 \\
5.0 & 6.0
\end{pmatrix}
\]`
is stored in a one-dimensional array `[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]` with the matrix size `(3, 2)`.
We are going to add sparse matrix in the next release.

<div class="codetabs">
<div data-lang="scala" markdown="1">

The base class of local matrices is
[`Matrix`](api/scala/index.html#org.apache.spark.mllib.linalg.Matrix), and we provide one
implementation: [`DenseMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.DenseMatrix).
Sparse matrix will be added in the next release. We recommend using the factory methods implemented
We recommend using the factory methods implemented
in [`Matrices`](api/scala/index.html#org.apache.spark.mllib.linalg.Matrices) to create local
matrices.

Expand All @@ -259,7 +257,7 @@ val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
The base class of local matrices is
[`Matrix`](api/java/org/apache/spark/mllib/linalg/Matrix.html), and we provide one
implementation: [`DenseMatrix`](api/java/org/apache/spark/mllib/linalg/DenseMatrix.html).
Sparse matrix will be added in the next release. We recommend using the factory methods implemented
We recommend using the factory methods implemented
in [`Matrices`](api/java/org/apache/spark/mllib/linalg/Matrices.html) to create local
matrices.

Expand All @@ -279,28 +277,30 @@ Matrix dm = Matrices.dense(3, 2, new double[] {1.0, 3.0, 5.0, 2.0, 4.0, 6.0});
A distributed matrix has long-typed row and column indices and double-typed values, stored
distributively in one or more RDDs. It is very important to choose the right format to store large
and distributed matrices. Converting a distributed matrix to a different format may require a
global shuffle, which is quite expensive. We implemented three types of distributed matrices in
this release and will add more types in the future.
global shuffle, which is quite expensive. Three types of distributed matrices have been implemented
so far.

The basic type is called `RowMatrix`. A `RowMatrix` is a row-oriented distributed
matrix without meaningful row indices, e.g., a collection of feature vectors.
It is backed by an RDD of its rows, where each row is a local vector.
We assume that the number of columns is not huge for a `RowMatrix`.
We assume that the number of columns is not huge for a `RowMatrix` so that a single
local vector can be reasonably communicated to the driver and can also be stored /
operated on using a single node.
An `IndexedRowMatrix` is similar to a `RowMatrix` but with row indices,
which can be used for identifying rows and joins.
A `CoordinateMatrix` is a distributed matrix stored in [coordinate list (COO)](https://en.wikipedia.org/wiki/Sparse_matrix) format,
which can be used for identifying rows and executing joins.
A `CoordinateMatrix` is a distributed matrix stored in [coordinate list (COO)](https://en.wikipedia.org/wiki/Sparse_matrix#Coordinate_list_.28COO.29) format,
backed by an RDD of its entries.

***Note***

The underlying RDDs of a distributed matrix must be deterministic, because we cache the matrix size.
It is always error-prone to have non-deterministic RDDs.
In general the use of non-deterministic RDDs can lead to errors.

### RowMatrix

A `RowMatrix` is a row-oriented distributed matrix without meaningful row indices, backed by an RDD
of its rows, where each row is a local vector. This is similar to `data matrix` in the context of
multivariate statistics. Since each row is represented by a local vector, the number of columns is
of its rows, where each row is a local vector.
Since each row is represented by a local vector, the number of columns is
limited by the integer range but it should be much smaller in practice.

<div class="codetabs">
Expand Down Expand Up @@ -344,70 +344,10 @@ long n = mat.numCols();
</div>
</div>

#### Multivariate summary statistics

We provide column summary statistics for `RowMatrix`.
If the number of columns is not large, say, smaller than 3000, you can also compute
the covariance matrix as a local matrix, which requires $\mathcal{O}(n^2)$ storage where $n$ is the
number of columns. The total CPU time is $\mathcal{O}(m n^2)$, where $m$ is the number of rows,
which could be faster if the rows are sparse.

<div class="codetabs">
<div data-lang="scala" markdown="1">

[`RowMatrix#computeColumnSummaryStatistics`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.RowMatrix) returns an instance of
[`MultivariateStatisticalSummary`](api/scala/index.html#org.apache.spark.mllib.stat.MultivariateStatisticalSummary),
which contains the column-wise max, min, mean, variance, and number of nonzeros, as well as the
total count.

{% highlight scala %}
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary

val mat: RowMatrix = ... // a RowMatrix

// Compute column summary statistics.
val summary: MultivariateStatisticalSummary = mat.computeColumnSummaryStatistics()
println(summary.mean) // a dense vector containing the mean value for each column
println(summary.variance) // column-wise variance
println(summary.numNonzeros) // number of nonzeros in each column

// Compute the covariance matrix.
val cov: Matrix = mat.computeCovariance()
{% endhighlight %}
</div>

<div data-lang="java" markdown="1">

[`RowMatrix#computeColumnSummaryStatistics`](api/java/org/apache/spark/mllib/linalg/distributed/RowMatrix.html#computeColumnSummaryStatistics()) returns an instance of
[`MultivariateStatisticalSummary`](api/java/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.html),
which contains the column-wise max, min, mean, variance, and number of nonzeros, as well as the
total count.

{% highlight java %}
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary;

RowMatrix mat = ... // a RowMatrix

// Compute column summary statistics.
MultivariateStatisticalSummary summary = mat.computeColumnSummaryStatistics();
System.out.println(summary.mean()); // a dense vector containing the mean value for each column
System.out.println(summary.variance()); // column-wise variance
System.out.println(summary.numNonzeros()); // number of nonzeros in each column

// Compute the covariance matrix.
Matrix cov = mat.computeCovariance();
{% endhighlight %}
</div>
</div>

### IndexedRowMatrix

An `IndexedRowMatrix` is similar to a `RowMatrix` but with meaningful row indices. It is backed by
an RDD of indexed rows, which each row is represented by its index (long-typed) and a local vector.
an RDD of indexed rows, so that each row is represented by its index (long-typed) and a local vector.

<div class="codetabs">
<div data-lang="scala" markdown="1">
Expand Down Expand Up @@ -467,7 +407,7 @@ RowMatrix rowMat = mat.toRowMatrix();

A `CoordinateMatrix` is a distributed matrix backed by an RDD of its entries. Each entry is a tuple
of `(i: Long, j: Long, value: Double)`, where `i` is the row index, `j` is the column index, and
`value` is the entry value. A `CoordinateMatrix` should be used only in the case when both
`value` is the entry value. A `CoordinateMatrix` should be used only when both
dimensions of the matrix are huge and the matrix is very sparse.

<div class="codetabs">
Expand All @@ -477,9 +417,9 @@ A
[`CoordinateMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.CoordinateMatrix)
can be created from an `RDD[MatrixEntry]` instance, where
[`MatrixEntry`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.MatrixEntry) is a
wrapper over `(Long, Long, Double)`. A `CoordinateMatrix` can be converted to a `IndexedRowMatrix`
with sparse rows by calling `toIndexedRowMatrix`. In this release, we do not provide other
computation for `CoordinateMatrix`.
wrapper over `(Long, Long, Double)`. A `CoordinateMatrix` can be converted to an `IndexedRowMatrix`
with sparse rows by calling `toIndexedRowMatrix`. Other computations for
`CoordinateMatrix` are not currently supported.

{% highlight scala %}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
Expand All @@ -503,8 +443,9 @@ A
[`CoordinateMatrix`](api/java/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.html)
can be created from a `JavaRDD<MatrixEntry>` instance, where
[`MatrixEntry`](api/java/org/apache/spark/mllib/linalg/distributed/MatrixEntry.html) is a
wrapper over `(long, long, double)`. A `CoordinateMatrix` can be converted to a `IndexedRowMatrix`
with sparse rows by calling `toIndexedRowMatrix`.
wrapper over `(long, long, double)`. A `CoordinateMatrix` can be converted to an `IndexedRowMatrix`
with sparse rows by calling `toIndexedRowMatrix`. Other computations for
`CoordinateMatrix` are not currently supported.

{% highlight java %}
import org.apache.spark.api.java.JavaRDD;
Expand Down
Loading

0 comments on commit 631a827

Please sign in to comment.