Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into config-cleanup
Browse files Browse the repository at this point in the history
Conflicts:
	docs/configuration.md
  • Loading branch information
pwendell committed May 26, 2014
2 parents 3289ea4 + b6d22af commit fdff7fc
Show file tree
Hide file tree
Showing 20 changed files with 228 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ import org.apache.spark.rdd.RDD
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
*/
class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
/**
* Create a JavaSparkContext that loads settings from system properties (for instance, when
* launching with ./bin/spark-submit).
*/
def this() = this(new SparkContext())

/**
* @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters
*/
Expand Down
43 changes: 30 additions & 13 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
Welcome to the Spark documentation!

This readme will walk you through navigating and building the Spark documentation, which is included here with the Spark source code. You can also find documentation specific to release versions of Spark at http://spark.apache.org/documentation.html.
This readme will walk you through navigating and building the Spark documentation, which is included
here with the Spark source code. You can also find documentation specific to release versions of
Spark at http://spark.apache.org/documentation.html.

Read on to learn more about viewing documentation in plain text (i.e., markdown) or building the documentation yourself. Why build it yourself? So that you have the docs that corresponds to whichever version of Spark you currently have checked out of revision control.
Read on to learn more about viewing documentation in plain text (i.e., markdown) or building the
documentation yourself. Why build it yourself? So that you have the docs that corresponds to
whichever version of Spark you currently have checked out of revision control.

## Generating the Documentation HTML

We include the Spark documentation as part of the source (as opposed to using a hosted wiki, such as the github wiki, as the definitive documentation) to enable the documentation to evolve along with the source code and be captured by revision control (currently git). This way the code automatically includes the version of the documentation that is relevant regardless of which version or release you have checked out or downloaded.
We include the Spark documentation as part of the source (as opposed to using a hosted wiki, such as
the github wiki, as the definitive documentation) to enable the documentation to evolve along with
the source code and be captured by revision control (currently git). This way the code automatically
includes the version of the documentation that is relevant regardless of which version or release
you have checked out or downloaded.

In this directory you will find textfiles formatted using Markdown, with an ".md" suffix. You can read those text files directly if you want. Start with index.md.
In this directory you will find textfiles formatted using Markdown, with an ".md" suffix. You can
read those text files directly if you want. Start with index.md.

The markdown code can be compiled to HTML using the
[Jekyll tool](http://jekyllrb.com).
The markdown code can be compiled to HTML using the [Jekyll tool](http://jekyllrb.com).
To use the `jekyll` command, you will need to have Jekyll installed.
The easiest way to do this is via a Ruby Gem, see the
[jekyll installation instructions](http://jekyllrb.com/docs/installation).
If not already installed, you need to install `kramdown` with `sudo gem install kramdown`.
Execute `jekyll` from the `docs/` directory. Compiling the site with Jekyll will create a directory called
`_site` containing index.html as well as the rest of the compiled files.
Execute `jekyll` from the `docs/` directory. Compiling the site with Jekyll will create a directory
called `_site` containing index.html as well as the rest of the compiled files.

You can modify the default Jekyll build as follows:

Expand All @@ -30,9 +38,11 @@ You can modify the default Jekyll build as follows:

## Pygments

We also use pygments (http://pygments.org) for syntax highlighting in documentation markdown pages, so you will also need to install that (it requires Python) by running `sudo easy_install Pygments`.
We also use pygments (http://pygments.org) for syntax highlighting in documentation markdown pages,
so you will also need to install that (it requires Python) by running `sudo easy_install Pygments`.

To mark a block of code in your markdown to be syntax highlighted by jekyll during the compile phase, use the following sytax:
To mark a block of code in your markdown to be syntax highlighted by jekyll during the compile
phase, use the following sytax:

{% highlight scala %}
// Your scala code goes here, you can replace scala with many other
Expand All @@ -43,8 +53,15 @@ To mark a block of code in your markdown to be syntax highlighted by jekyll duri

You can build just the Spark scaladoc by running `sbt/sbt doc` from the SPARK_PROJECT_ROOT directory.

Similarly, you can build just the PySpark epydoc by running `epydoc --config epydoc.conf` from the SPARK_PROJECT_ROOT/pyspark directory. Documentation is only generated for classes that are listed as public in `__init__.py`.
Similarly, you can build just the PySpark epydoc by running `epydoc --config epydoc.conf` from the
SPARK_PROJECT_ROOT/pyspark directory. Documentation is only generated for classes that are listed as
public in `__init__.py`.

When you run `jekyll` in the `docs` directory, it will also copy over the scaladoc for the various Spark subprojects into the `docs` directory (and then also into the `_site` directory). We use a jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it may take some time as it generates all of the scaladoc. The jekyll plugin also generates the PySpark docs using [epydoc](http://epydoc.sourceforge.net/).
When you run `jekyll` in the `docs` directory, it will also copy over the scaladoc for the various
Spark subprojects into the `docs` directory (and then also into the `_site` directory). We use a
jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it
may take some time as it generates all of the scaladoc. The jekyll plugin also generates the
PySpark docs using [epydoc](http://epydoc.sourceforge.net/).

NOTE: To skip the step of building and copying over the Scala and Python API docs, run `SKIP_API=1 jekyll`.
NOTE: To skip the step of building and copying over the Scala and Python API docs, run `SKIP_API=1
jekyll`.
2 changes: 2 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ title: Spark Configuration
* This will become a table of contents (this text will be scraped).
{:toc}

Spark provides several locations to configure the system:

# Spark Properties

Spark properties control most application settings and are configured separately for each
Expand Down
20 changes: 12 additions & 8 deletions examples/src/main/python/als.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,25 @@
LAMBDA = 0.01 # regularization
np.random.seed(42)


def rmse(R, ms, us):
diff = R - ms * us.T
return np.sqrt(np.sum(np.power(diff, 2)) / M * U)


def update(i, vec, mat, ratings):
uu = mat.shape[0]
ff = mat.shape[1]

XtX = mat.T * mat
Xty = mat.T * ratings[i, :].T

for j in range(ff):
XtX[j,j] += LAMBDA * uu
XtX[j, j] += LAMBDA * uu

return np.linalg.solve(XtX, Xty)


if __name__ == "__main__":
"""
Usage: als [M] [U] [F] [iterations] [slices]"
Expand All @@ -57,10 +60,10 @@ def update(i, vec, mat, ratings):
slices = int(sys.argv[5]) if len(sys.argv) > 5 else 2

print "Running ALS with M=%d, U=%d, F=%d, iters=%d, slices=%d\n" % \
(M, U, F, ITERATIONS, slices)
(M, U, F, ITERATIONS, slices)

R = matrix(rand(M, F)) * matrix(rand(U, F).T)
ms = matrix(rand(M ,F))
ms = matrix(rand(M, F))
us = matrix(rand(U, F))

Rb = sc.broadcast(R)
Expand All @@ -71,8 +74,9 @@ def update(i, vec, mat, ratings):
ms = sc.parallelize(range(M), slices) \
.map(lambda x: update(x, msb.value[x, :], usb.value, Rb.value)) \
.collect()
ms = matrix(np.array(ms)[:, :, 0]) # collect() returns a list, so array ends up being
# a 3-d array, we take the first 2 dims for the matrix
# collect() returns a list, so array ends up being
# a 3-d array, we take the first 2 dims for the matrix
ms = matrix(np.array(ms)[:, :, 0])
msb = sc.broadcast(ms)

us = sc.parallelize(range(U), slices) \
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/python/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def closestPoint(p, centers):

while tempDist > convergeDist:
closest = data.map(
lambda p : (closestPoint(p, kPoints), (p, 1)))
lambda p: (closestPoint(p, kPoints), (p, 1)))
pointStats = closest.reduceByKey(
lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
newPoints = pointStats.map(
Expand Down
4 changes: 2 additions & 2 deletions examples/src/main/python/logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def readPointBatch(iterator):

# Compute logistic regression gradient for a matrix of data points
def gradient(matrix, w):
Y = matrix[:,0] # point labels (first column of input file)
X = matrix[:,1:] # point coordinates
Y = matrix[:, 0] # point labels (first column of input file)
X = matrix[:, 1:] # point coordinates
# For each point (x, y), compute gradient function, then sum these up
return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)

Expand Down
12 changes: 6 additions & 6 deletions examples/src/main/python/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
# limitations under the License.
#

#!/usr/bin/env python

import re, sys
import re
import sys
from operator import add

from pyspark import SparkContext
Expand All @@ -26,7 +25,8 @@
def computeContribs(urls, rank):
"""Calculates URL contributions to the rank of other URLs."""
num_urls = len(urls)
for url in urls: yield (url, rank / num_urls)
for url in urls:
yield (url, rank / num_urls)


def parseNeighbors(urls):
Expand Down Expand Up @@ -59,8 +59,8 @@ def parseNeighbors(urls):
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in xrange(int(sys.argv[2])):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)):
computeContribs(urls, rank))
contribs = links.join(ranks).flatMap(
lambda (url, (urls, rank)): computeContribs(urls, rank))

# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
Expand Down
2 changes: 2 additions & 0 deletions examples/src/main/python/pi.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
sc = SparkContext(appName="PythonPi")
slices = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * slices

def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
print "Pi is roughly %f" % (4.0 * count / n)
4 changes: 2 additions & 2 deletions examples/src/main/python/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
sc = SparkContext(appName="PythonSort")
lines = sc.textFile(sys.argv[1], 1)
sortedCount = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (int(x), 1)) \
.sortByKey(lambda x: x)
.map(lambda x: (int(x), 1)) \
.sortByKey(lambda x: x)
# This is just a demo on how to bring all the sorted data back to a single node.
# In reality, we wouldn't want to collect all the data to the driver node.
output = sortedCount.collect()
Expand Down
42 changes: 21 additions & 21 deletions python/pyspark/mllib/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
#
# Sparse double vector format:
#
# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] [nonzeros*8 bytes of values]
# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] \
# [nonzeros*8 bytes of values]
#
# Double matrix format:
#
Expand Down Expand Up @@ -110,18 +111,18 @@ def _serialize_double_vector(v):
return _serialize_sparse_vector(v)
else:
raise TypeError("_serialize_double_vector called on a %s; "
"wanted ndarray or SparseVector" % type(v))
"wanted ndarray or SparseVector" % type(v))


def _serialize_dense_vector(v):
"""Serialize a dense vector given as a NumPy array."""
if v.ndim != 1:
raise TypeError("_serialize_double_vector called on a %ddarray; "
"wanted a 1darray" % v.ndim)
"wanted a 1darray" % v.ndim)
if v.dtype != float64:
if numpy.issubdtype(v.dtype, numpy.complex):
raise TypeError("_serialize_double_vector called on an ndarray of %s; "
"wanted ndarray of float64" % v.dtype)
"wanted ndarray of float64" % v.dtype)
v = v.astype(float64)
length = v.shape[0]
ba = bytearray(5 + 8 * length)
Expand Down Expand Up @@ -158,10 +159,10 @@ def _deserialize_double_vector(ba):
"""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_vector called on a %s; "
"wanted bytearray" % type(ba))
"wanted bytearray" % type(ba))
if len(ba) < 5:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
"which is too short" % len(ba))
"which is too short" % len(ba))
if ba[0] == DENSE_VECTOR_MAGIC:
return _deserialize_dense_vector(ba)
elif ba[0] == SPARSE_VECTOR_MAGIC:
Expand All @@ -175,7 +176,7 @@ def _deserialize_dense_vector(ba):
"""Deserialize a dense vector into a numpy array."""
if len(ba) < 5:
raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
"which is too short" % len(ba))
"which is too short" % len(ba))
length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0]
if len(ba) != 8 * length + 5:
raise TypeError("_deserialize_dense_vector called on bytearray "
Expand All @@ -187,7 +188,7 @@ def _deserialize_sparse_vector(ba):
"""Deserialize a sparse vector into a MLlib SparseVector object."""
if len(ba) < 9:
raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
"which is too short" % len(ba))
"which is too short" % len(ba))
header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
size = header[0]
nonzeros = header[1]
Expand All @@ -205,7 +206,7 @@ def _serialize_double_matrix(m):
if m.dtype != float64:
if numpy.issubdtype(m.dtype, numpy.complex):
raise TypeError("_serialize_double_matrix called on an ndarray of %s; "
"wanted ndarray of float64" % m.dtype)
"wanted ndarray of float64" % m.dtype)
m = m.astype(float64)
rows = m.shape[0]
cols = m.shape[1]
Expand All @@ -225,10 +226,10 @@ def _deserialize_double_matrix(ba):
"""Deserialize a double matrix from a mutually understood format."""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_matrix called on a %s; "
"wanted bytearray" % type(ba))
"wanted bytearray" % type(ba))
if len(ba) < 9:
raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
"which is too short" % len(ba))
"which is too short" % len(ba))
if ba[0] != DENSE_MATRIX_MAGIC:
raise TypeError("_deserialize_double_matrix called on bytearray "
"with wrong magic")
Expand Down Expand Up @@ -267,7 +268,7 @@ def _copyto(array, buffer, offset, shape, dtype):
def _get_unmangled_rdd(data, serializer):
dataBytes = data.map(serializer)
dataBytes._bypass_serializer = True
dataBytes.cache() # TODO: users should unpersist() this later!
dataBytes.cache() # TODO: users should unpersist() this later!
return dataBytes


Expand All @@ -293,14 +294,14 @@ def _linear_predictor_typecheck(x, coeffs):
if type(x) == ndarray:
if x.ndim == 1:
if x.shape != coeffs.shape:
raise RuntimeError("Got array of %d elements; wanted %d"
% (numpy.shape(x)[0], coeffs.shape[0]))
raise RuntimeError("Got array of %d elements; wanted %d" % (
numpy.shape(x)[0], coeffs.shape[0]))
else:
raise RuntimeError("Bulk predict not yet supported.")
elif type(x) == SparseVector:
if x.size != coeffs.shape[0]:
raise RuntimeError("Got sparse vector of size %d; wanted %d"
% (x.size, coeffs.shape[0]))
raise RuntimeError("Got sparse vector of size %d; wanted %d" % (
x.size, coeffs.shape[0]))
elif (type(x) == RDD):
raise RuntimeError("Bulk predict not yet supported.")
else:
Expand All @@ -315,7 +316,7 @@ def _get_initial_weights(initial_weights, data):
if type(initial_weights) == ndarray:
if initial_weights.ndim != 1:
raise TypeError("At least one data element has "
+ initial_weights.ndim + " dimensions, which is not 1")
+ initial_weights.ndim + " dimensions, which is not 1")
initial_weights = numpy.zeros([initial_weights.shape[0]])
elif type(initial_weights) == SparseVector:
initial_weights = numpy.zeros([initial_weights.size])
Expand All @@ -333,10 +334,10 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
raise RuntimeError("JVM call result had unexpected length")
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
+ type(ans[0]).__name__ + " which is not bytearray")
+ type(ans[0]).__name__ + " which is not bytearray")
elif type(ans[1]) != float:
raise RuntimeError("JVM call result had second element of type "
+ type(ans[0]).__name__ + " which is not float")
+ type(ans[0]).__name__ + " which is not float")
return klass(_deserialize_double_vector(ans[0]), ans[1])


Expand Down Expand Up @@ -450,8 +451,7 @@ def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
Expand Down
Loading

0 comments on commit fdff7fc

Please sign in to comment.