diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index a7cfee6d01711..1e0493c4855e0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -41,6 +41,12 @@ import org.apache.spark.rdd.RDD * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. */ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround { + /** + * Create a JavaSparkContext that loads settings from system properties (for instance, when + * launching with ./bin/spark-submit). + */ + def this() = this(new SparkContext()) + /** * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters */ diff --git a/docs/README.md b/docs/README.md index f1eb644f93406..fd7ba4e0d72ea 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,23 +1,31 @@ Welcome to the Spark documentation! -This readme will walk you through navigating and building the Spark documentation, which is included here with the Spark source code. You can also find documentation specific to release versions of Spark at http://spark.apache.org/documentation.html. +This readme will walk you through navigating and building the Spark documentation, which is included +here with the Spark source code. You can also find documentation specific to release versions of +Spark at http://spark.apache.org/documentation.html. -Read on to learn more about viewing documentation in plain text (i.e., markdown) or building the documentation yourself. Why build it yourself? So that you have the docs that corresponds to whichever version of Spark you currently have checked out of revision control. +Read on to learn more about viewing documentation in plain text (i.e., markdown) or building the +documentation yourself. Why build it yourself? So that you have the docs that corresponds to +whichever version of Spark you currently have checked out of revision control. ## Generating the Documentation HTML -We include the Spark documentation as part of the source (as opposed to using a hosted wiki, such as the github wiki, as the definitive documentation) to enable the documentation to evolve along with the source code and be captured by revision control (currently git). This way the code automatically includes the version of the documentation that is relevant regardless of which version or release you have checked out or downloaded. +We include the Spark documentation as part of the source (as opposed to using a hosted wiki, such as +the github wiki, as the definitive documentation) to enable the documentation to evolve along with +the source code and be captured by revision control (currently git). This way the code automatically +includes the version of the documentation that is relevant regardless of which version or release +you have checked out or downloaded. -In this directory you will find textfiles formatted using Markdown, with an ".md" suffix. You can read those text files directly if you want. Start with index.md. +In this directory you will find textfiles formatted using Markdown, with an ".md" suffix. You can +read those text files directly if you want. Start with index.md. -The markdown code can be compiled to HTML using the -[Jekyll tool](http://jekyllrb.com). +The markdown code can be compiled to HTML using the [Jekyll tool](http://jekyllrb.com). To use the `jekyll` command, you will need to have Jekyll installed. The easiest way to do this is via a Ruby Gem, see the [jekyll installation instructions](http://jekyllrb.com/docs/installation). If not already installed, you need to install `kramdown` with `sudo gem install kramdown`. -Execute `jekyll` from the `docs/` directory. Compiling the site with Jekyll will create a directory called -`_site` containing index.html as well as the rest of the compiled files. +Execute `jekyll` from the `docs/` directory. Compiling the site with Jekyll will create a directory +called `_site` containing index.html as well as the rest of the compiled files. You can modify the default Jekyll build as follows: @@ -30,9 +38,11 @@ You can modify the default Jekyll build as follows: ## Pygments -We also use pygments (http://pygments.org) for syntax highlighting in documentation markdown pages, so you will also need to install that (it requires Python) by running `sudo easy_install Pygments`. +We also use pygments (http://pygments.org) for syntax highlighting in documentation markdown pages, +so you will also need to install that (it requires Python) by running `sudo easy_install Pygments`. -To mark a block of code in your markdown to be syntax highlighted by jekyll during the compile phase, use the following sytax: +To mark a block of code in your markdown to be syntax highlighted by jekyll during the compile +phase, use the following sytax: {% highlight scala %} // Your scala code goes here, you can replace scala with many other @@ -43,8 +53,15 @@ To mark a block of code in your markdown to be syntax highlighted by jekyll duri You can build just the Spark scaladoc by running `sbt/sbt doc` from the SPARK_PROJECT_ROOT directory. -Similarly, you can build just the PySpark epydoc by running `epydoc --config epydoc.conf` from the SPARK_PROJECT_ROOT/pyspark directory. Documentation is only generated for classes that are listed as public in `__init__.py`. +Similarly, you can build just the PySpark epydoc by running `epydoc --config epydoc.conf` from the +SPARK_PROJECT_ROOT/pyspark directory. Documentation is only generated for classes that are listed as +public in `__init__.py`. -When you run `jekyll` in the `docs` directory, it will also copy over the scaladoc for the various Spark subprojects into the `docs` directory (and then also into the `_site` directory). We use a jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it may take some time as it generates all of the scaladoc. The jekyll plugin also generates the PySpark docs using [epydoc](http://epydoc.sourceforge.net/). +When you run `jekyll` in the `docs` directory, it will also copy over the scaladoc for the various +Spark subprojects into the `docs` directory (and then also into the `_site` directory). We use a +jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it +may take some time as it generates all of the scaladoc. The jekyll plugin also generates the +PySpark docs using [epydoc](http://epydoc.sourceforge.net/). -NOTE: To skip the step of building and copying over the Scala and Python API docs, run `SKIP_API=1 jekyll`. +NOTE: To skip the step of building and copying over the Scala and Python API docs, run `SKIP_API=1 +jekyll`. diff --git a/docs/configuration.md b/docs/configuration.md index f68f8d116c666..d8a2360a7b3f3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -6,6 +6,8 @@ title: Spark Configuration * This will become a table of contents (this text will be scraped). {:toc} +Spark provides several locations to configure the system: + # Spark Properties Spark properties control most application settings and are configured separately for each diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py index f0b46cd28b7aa..1a7c4c51f48cd 100755 --- a/examples/src/main/python/als.py +++ b/examples/src/main/python/als.py @@ -29,22 +29,25 @@ LAMBDA = 0.01 # regularization np.random.seed(42) + def rmse(R, ms, us): diff = R - ms * us.T return np.sqrt(np.sum(np.power(diff, 2)) / M * U) + def update(i, vec, mat, ratings): uu = mat.shape[0] ff = mat.shape[1] - + XtX = mat.T * mat Xty = mat.T * ratings[i, :].T - + for j in range(ff): - XtX[j,j] += LAMBDA * uu - + XtX[j, j] += LAMBDA * uu + return np.linalg.solve(XtX, Xty) + if __name__ == "__main__": """ Usage: als [M] [U] [F] [iterations] [slices]" @@ -57,10 +60,10 @@ def update(i, vec, mat, ratings): slices = int(sys.argv[5]) if len(sys.argv) > 5 else 2 print "Running ALS with M=%d, U=%d, F=%d, iters=%d, slices=%d\n" % \ - (M, U, F, ITERATIONS, slices) + (M, U, F, ITERATIONS, slices) R = matrix(rand(M, F)) * matrix(rand(U, F).T) - ms = matrix(rand(M ,F)) + ms = matrix(rand(M, F)) us = matrix(rand(U, F)) Rb = sc.broadcast(R) @@ -71,8 +74,9 @@ def update(i, vec, mat, ratings): ms = sc.parallelize(range(M), slices) \ .map(lambda x: update(x, msb.value[x, :], usb.value, Rb.value)) \ .collect() - ms = matrix(np.array(ms)[:, :, 0]) # collect() returns a list, so array ends up being - # a 3-d array, we take the first 2 dims for the matrix + # collect() returns a list, so array ends up being + # a 3-d array, we take the first 2 dims for the matrix + ms = matrix(np.array(ms)[:, :, 0]) msb = sc.broadcast(ms) us = sc.parallelize(range(U), slices) \ diff --git a/examples/src/main/python/kmeans.py b/examples/src/main/python/kmeans.py index fc16586c28a46..988fc45baf3bc 100755 --- a/examples/src/main/python/kmeans.py +++ b/examples/src/main/python/kmeans.py @@ -59,7 +59,7 @@ def closestPoint(p, centers): while tempDist > convergeDist: closest = data.map( - lambda p : (closestPoint(p, kPoints), (p, 1))) + lambda p: (closestPoint(p, kPoints), (p, 1))) pointStats = closest.reduceByKey( lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2)) newPoints = pointStats.map( diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py index 0f22d0b32319e..6c33deabfd6ea 100755 --- a/examples/src/main/python/logistic_regression.py +++ b/examples/src/main/python/logistic_regression.py @@ -60,8 +60,8 @@ def readPointBatch(iterator): # Compute logistic regression gradient for a matrix of data points def gradient(matrix, w): - Y = matrix[:,0] # point labels (first column of input file) - X = matrix[:,1:] # point coordinates + Y = matrix[:, 0] # point labels (first column of input file) + X = matrix[:, 1:] # point coordinates # For each point (x, y), compute gradient function, then sum these up return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1) diff --git a/examples/src/main/python/pagerank.py b/examples/src/main/python/pagerank.py index d350fa46fa49a..0b96343158d44 100755 --- a/examples/src/main/python/pagerank.py +++ b/examples/src/main/python/pagerank.py @@ -15,9 +15,8 @@ # limitations under the License. # -#!/usr/bin/env python - -import re, sys +import re +import sys from operator import add from pyspark import SparkContext @@ -26,7 +25,8 @@ def computeContribs(urls, rank): """Calculates URL contributions to the rank of other URLs.""" num_urls = len(urls) - for url in urls: yield (url, rank / num_urls) + for url in urls: + yield (url, rank / num_urls) def parseNeighbors(urls): @@ -59,8 +59,8 @@ def parseNeighbors(urls): # Calculates and updates URL ranks continuously using PageRank algorithm. for iteration in xrange(int(sys.argv[2])): # Calculates URL contributions to the rank of other URLs. - contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)): - computeContribs(urls, rank)) + contribs = links.join(ranks).flatMap( + lambda (url, (urls, rank)): computeContribs(urls, rank)) # Re-calculates URL ranks based on neighbor contributions. ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15) diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index 234720b55fa49..21d94a2cd4b64 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -29,9 +29,11 @@ sc = SparkContext(appName="PythonPi") slices = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * slices + def f(_): x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 < 1 else 0 + count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add) print "Pi is roughly %f" % (4.0 * count / n) diff --git a/examples/src/main/python/sort.py b/examples/src/main/python/sort.py index 4913ee926aa03..41d00c1b79133 100755 --- a/examples/src/main/python/sort.py +++ b/examples/src/main/python/sort.py @@ -27,8 +27,8 @@ sc = SparkContext(appName="PythonSort") lines = sc.textFile(sys.argv[1], 1) sortedCount = lines.flatMap(lambda x: x.split(' ')) \ - .map(lambda x: (int(x), 1)) \ - .sortByKey(lambda x: x) + .map(lambda x: (int(x), 1)) \ + .sortByKey(lambda x: x) # This is just a demo on how to bring all the sorted data back to a single node. # In reality, we wouldn't want to collect all the data to the driver node. output = sortedCount.collect() diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index e6f0953810ed7..802a27a8da14d 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -56,7 +56,8 @@ # # Sparse double vector format: # -# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] [nonzeros*8 bytes of values] +# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] \ +# [nonzeros*8 bytes of values] # # Double matrix format: # @@ -110,18 +111,18 @@ def _serialize_double_vector(v): return _serialize_sparse_vector(v) else: raise TypeError("_serialize_double_vector called on a %s; " - "wanted ndarray or SparseVector" % type(v)) + "wanted ndarray or SparseVector" % type(v)) def _serialize_dense_vector(v): """Serialize a dense vector given as a NumPy array.""" if v.ndim != 1: raise TypeError("_serialize_double_vector called on a %ddarray; " - "wanted a 1darray" % v.ndim) + "wanted a 1darray" % v.ndim) if v.dtype != float64: if numpy.issubdtype(v.dtype, numpy.complex): raise TypeError("_serialize_double_vector called on an ndarray of %s; " - "wanted ndarray of float64" % v.dtype) + "wanted ndarray of float64" % v.dtype) v = v.astype(float64) length = v.shape[0] ba = bytearray(5 + 8 * length) @@ -158,10 +159,10 @@ def _deserialize_double_vector(ba): """ if type(ba) != bytearray: raise TypeError("_deserialize_double_vector called on a %s; " - "wanted bytearray" % type(ba)) + "wanted bytearray" % type(ba)) if len(ba) < 5: raise TypeError("_deserialize_double_vector called on a %d-byte array, " - "which is too short" % len(ba)) + "which is too short" % len(ba)) if ba[0] == DENSE_VECTOR_MAGIC: return _deserialize_dense_vector(ba) elif ba[0] == SPARSE_VECTOR_MAGIC: @@ -175,7 +176,7 @@ def _deserialize_dense_vector(ba): """Deserialize a dense vector into a numpy array.""" if len(ba) < 5: raise TypeError("_deserialize_dense_vector called on a %d-byte array, " - "which is too short" % len(ba)) + "which is too short" % len(ba)) length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0] if len(ba) != 8 * length + 5: raise TypeError("_deserialize_dense_vector called on bytearray " @@ -187,7 +188,7 @@ def _deserialize_sparse_vector(ba): """Deserialize a sparse vector into a MLlib SparseVector object.""" if len(ba) < 9: raise TypeError("_deserialize_sparse_vector called on a %d-byte array, " - "which is too short" % len(ba)) + "which is too short" % len(ba)) header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32) size = header[0] nonzeros = header[1] @@ -205,7 +206,7 @@ def _serialize_double_matrix(m): if m.dtype != float64: if numpy.issubdtype(m.dtype, numpy.complex): raise TypeError("_serialize_double_matrix called on an ndarray of %s; " - "wanted ndarray of float64" % m.dtype) + "wanted ndarray of float64" % m.dtype) m = m.astype(float64) rows = m.shape[0] cols = m.shape[1] @@ -225,10 +226,10 @@ def _deserialize_double_matrix(ba): """Deserialize a double matrix from a mutually understood format.""" if type(ba) != bytearray: raise TypeError("_deserialize_double_matrix called on a %s; " - "wanted bytearray" % type(ba)) + "wanted bytearray" % type(ba)) if len(ba) < 9: raise TypeError("_deserialize_double_matrix called on a %d-byte array, " - "which is too short" % len(ba)) + "which is too short" % len(ba)) if ba[0] != DENSE_MATRIX_MAGIC: raise TypeError("_deserialize_double_matrix called on bytearray " "with wrong magic") @@ -267,7 +268,7 @@ def _copyto(array, buffer, offset, shape, dtype): def _get_unmangled_rdd(data, serializer): dataBytes = data.map(serializer) dataBytes._bypass_serializer = True - dataBytes.cache() # TODO: users should unpersist() this later! + dataBytes.cache() # TODO: users should unpersist() this later! return dataBytes @@ -293,14 +294,14 @@ def _linear_predictor_typecheck(x, coeffs): if type(x) == ndarray: if x.ndim == 1: if x.shape != coeffs.shape: - raise RuntimeError("Got array of %d elements; wanted %d" - % (numpy.shape(x)[0], coeffs.shape[0])) + raise RuntimeError("Got array of %d elements; wanted %d" % ( + numpy.shape(x)[0], coeffs.shape[0])) else: raise RuntimeError("Bulk predict not yet supported.") elif type(x) == SparseVector: if x.size != coeffs.shape[0]: - raise RuntimeError("Got sparse vector of size %d; wanted %d" - % (x.size, coeffs.shape[0])) + raise RuntimeError("Got sparse vector of size %d; wanted %d" % ( + x.size, coeffs.shape[0])) elif (type(x) == RDD): raise RuntimeError("Bulk predict not yet supported.") else: @@ -315,7 +316,7 @@ def _get_initial_weights(initial_weights, data): if type(initial_weights) == ndarray: if initial_weights.ndim != 1: raise TypeError("At least one data element has " - + initial_weights.ndim + " dimensions, which is not 1") + + initial_weights.ndim + " dimensions, which is not 1") initial_weights = numpy.zeros([initial_weights.shape[0]]) elif type(initial_weights) == SparseVector: initial_weights = numpy.zeros([initial_weights.size]) @@ -333,10 +334,10 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights): raise RuntimeError("JVM call result had unexpected length") elif type(ans[0]) != bytearray: raise RuntimeError("JVM call result had first element of type " - + type(ans[0]).__name__ + " which is not bytearray") + + type(ans[0]).__name__ + " which is not bytearray") elif type(ans[1]) != float: raise RuntimeError("JVM call result had second element of type " - + type(ans[0]).__name__ + " which is not float") + + type(ans[0]).__name__ + " which is not float") return klass(_deserialize_double_vector(ans[0]), ans[1]) @@ -450,8 +451,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, - optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 6772e4337ef39..1c0c536c4fb3d 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -29,6 +29,7 @@ from pyspark.mllib.regression import LabeledPoint, LinearModel from math import exp, log + class LogisticRegressionModel(LinearModel): """A linear binary classification model derived from logistic regression. @@ -68,14 +69,14 @@ def predict(self, x): class LogisticRegressionWithSGD(object): @classmethod - def train(cls, data, iterations=100, step=1.0, - miniBatchFraction=1.0, initialWeights=None): + def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None): """Train a logistic regression model on the given data.""" sc = data.context - return _regression_train_wrapper(sc, lambda d, i: - sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd, - iterations, step, miniBatchFraction, i), - LogisticRegressionModel, data, initialWeights) + train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD( + d._jrdd, iterations, step, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_func, LogisticRegressionModel, data, + initialWeights) + class SVMModel(LinearModel): """A support vector machine. @@ -106,16 +107,17 @@ def predict(self, x): margin = _dot(x, self._coeff) + self._intercept return 1 if margin >= 0 else 0 + class SVMWithSGD(object): @classmethod def train(cls, data, iterations=100, step=1.0, regParam=1.0, miniBatchFraction=1.0, initialWeights=None): """Train a support vector machine on the given data.""" sc = data.context - return _regression_train_wrapper(sc, lambda d, i: - sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd, - iterations, step, regParam, miniBatchFraction, i), - SVMModel, data, initialWeights) + train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD( + d._jrdd, iterations, step, regParam, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights) + class NaiveBayesModel(object): """ @@ -156,6 +158,7 @@ def predict(self, x): """Return the most likely class for a data vector x""" return self.labels[numpy.argmax(self.pi + _dot(x, self.theta.transpose()))] + class NaiveBayes(object): @classmethod def train(cls, data, lambda_=1.0): @@ -186,8 +189,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, - optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index f65088c9170e0..b380e8f6c8725 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -30,7 +30,8 @@ class KMeansModel(object): """A clustering model derived from the k-means method. >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) - >>> model = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") + >>> model = KMeans.train( + ... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0])) True >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0])) @@ -76,18 +77,17 @@ def predict(self, x): class KMeans(object): @classmethod - def train(cls, data, k, maxIterations=100, runs=1, - initializationMode="k-means||"): + def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"): """Train a k-means clustering model.""" sc = data.context dataBytes = _get_unmangled_double_vector_rdd(data) - ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd, - k, maxIterations, runs, initializationMode) + ans = sc._jvm.PythonMLLibAPI().trainKMeansModel( + dataBytes._jrdd, k, maxIterations, runs, initializationMode) if len(ans) != 1: raise RuntimeError("JVM call result had unexpected length") elif type(ans[0]) != bytearray: raise RuntimeError("JVM call result had first element of type " - + type(ans[0]) + " which is not bytearray") + + type(ans[0]) + " which is not bytearray") matrix = _deserialize_double_matrix(ans[0]) return KMeansModel([row for row in matrix]) @@ -96,8 +96,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, - optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 7511ca7573ddb..276684272068b 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -54,7 +54,7 @@ def __init__(self, size, *args): if len(args) == 1: pairs = args[0] if type(pairs) == dict: - pairs = pairs.items() + pairs = pairs.items() pairs = sorted(pairs) self.indices = array([p[0] for p in pairs], dtype=int32) self.values = array([p[1] for p in pairs], dtype=float64) @@ -88,7 +88,7 @@ def dot(self, other): result += self.values[i] * other[self.indices[i]] return result elif other.ndim == 2: - results = [self.dot(other[:,i]) for i in xrange(other.shape[1])] + results = [self.dot(other[:, i]) for i in xrange(other.shape[1])] return array(results) else: raise Exception("Cannot call dot with %d-dimensional array" % other.ndim) @@ -135,7 +135,7 @@ def squared_distance(self, other): return result else: raise Exception("Cannot call squared_distance with %d-dimensional array" % - other.ndim) + other.ndim) else: result = 0.0 i, j = 0, 0 @@ -184,15 +184,14 @@ def __eq__(self, other): """ return (isinstance(other, self.__class__) - and other.size == self.size - and array_equal(other.indices, self.indices) - and array_equal(other.values, self.values)) + and other.size == self.size + and array_equal(other.indices, self.indices) + and array_equal(other.values, self.values)) def __ne__(self, other): return not self.__eq__(other) - class Vectors(object): """ Factory methods for working with vectors. Note that dense vectors diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index f4a83f0209e27..6c385042ffa5f 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -24,6 +24,7 @@ _serialize_tuple, RatingDeserializer from pyspark.rdd import RDD + class MatrixFactorizationModel(object): """A matrix factorisation model trained by regularized alternating least-squares. @@ -55,32 +56,34 @@ def predictAll(self, usersProducts): return RDD(self._java_model.predict(usersProductsJRDD._jrdd), self._context, RatingDeserializer()) + class ALS(object): @classmethod def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) - mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd, - rank, iterations, lambda_, blocks) + mod = sc._jvm.PythonMLLibAPI().trainALSModel( + ratingBytes._jrdd, rank, iterations, lambda_, blocks) return MatrixFactorizationModel(sc, mod) @classmethod def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01): sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) - mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd, - rank, iterations, lambda_, blocks, alpha) + mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel( + ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha) return MatrixFactorizationModel(sc, mod) + def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, - optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) + if __name__ == "__main__": _test() diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 266b31d3fab0e..bc7de6d2e8958 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -113,10 +113,9 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None): """Train a linear regression model on the given data.""" sc = data.context - return _regression_train_wrapper(sc, lambda d, i: - sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( - d._jrdd, iterations, step, miniBatchFraction, i), - LinearRegressionModel, data, initialWeights) + train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( + d._jrdd, iterations, step, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights) class LassoModel(LinearRegressionModelBase): @@ -157,10 +156,9 @@ def train(cls, data, iterations=100, step=1.0, regParam=1.0, miniBatchFraction=1.0, initialWeights=None): """Train a Lasso regression model on the given data.""" sc = data.context - return _regression_train_wrapper(sc, lambda d, i: - sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(d._jrdd, - iterations, step, regParam, miniBatchFraction, i), - LassoModel, data, initialWeights) + train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD( + d._jrdd, iterations, step, regParam, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_f, LassoModel, data, initialWeights) class RidgeRegressionModel(LinearRegressionModelBase): @@ -201,18 +199,16 @@ def train(cls, data, iterations=100, step=1.0, regParam=1.0, miniBatchFraction=1.0, initialWeights=None): """Train a ridge regression model on the given data.""" sc = data.context - return _regression_train_wrapper(sc, lambda d, i: - sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(d._jrdd, - iterations, step, regParam, miniBatchFraction, i), - RidgeRegressionModel, data, initialWeights) + train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD( + d._jrdd, iterations, step, regParam, miniBatchFraction, i) + return _regression_train_wrapper(sc, train_func, RidgeRegressionModel, data, initialWeights) def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, - optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 1ee96bb4af37b..37ccf1d590743 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -23,7 +23,7 @@ import unittest from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \ - _deserialize_double_vector, _dot, _squared_distance + _deserialize_double_vector, _dot, _squared_distance from pyspark.mllib.linalg import SparseVector from pyspark.mllib.regression import LabeledPoint from pyspark.tests import PySparkTestCase @@ -46,12 +46,9 @@ def test_serialize(self): self.assertTrue(sv is _convert_vector(sv)) self.assertTrue(dv is _convert_vector(dv)) self.assertTrue(array_equal(dv, _convert_vector(lst))) - self.assertEquals(sv, - _deserialize_double_vector(_serialize_double_vector(sv))) - self.assertTrue(array_equal(dv, - _deserialize_double_vector(_serialize_double_vector(dv)))) - self.assertTrue(array_equal(dv, - _deserialize_double_vector(_serialize_double_vector(lst)))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(sv))) + self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(dv)))) + self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(lst)))) def test_dot(self): sv = SparseVector(4, {1: 1, 3: 2}) @@ -132,7 +129,7 @@ def test_classification(self): def test_regression(self): from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \ - RidgeRegressionWithSGD + RidgeRegressionWithSGD data = [ LabeledPoint(-1.0, [0, -1]), LabeledPoint(1.0, [0, 1]), @@ -179,14 +176,10 @@ def test_serialize(self): self.assertEquals(sv, _convert_vector(lil.tocoo())) self.assertEquals(sv, _convert_vector(lil.tocsr())) self.assertEquals(sv, _convert_vector(lil.todok())) - self.assertEquals(sv, - _deserialize_double_vector(_serialize_double_vector(lil))) - self.assertEquals(sv, - _deserialize_double_vector(_serialize_double_vector(lil.tocsc()))) - self.assertEquals(sv, - _deserialize_double_vector(_serialize_double_vector(lil.tocsr()))) - self.assertEquals(sv, - _deserialize_double_vector(_serialize_double_vector(lil.todok()))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc()))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr()))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok()))) def test_dot(self): from scipy.sparse import lil_matrix @@ -265,7 +258,7 @@ def test_classification(self): def test_regression(self): from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \ - RidgeRegressionWithSGD + RidgeRegressionWithSGD data = [ LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})), LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})), diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 50d0cdd087625..0e5f4520b9402 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -21,6 +21,7 @@ from pyspark.mllib.regression import LabeledPoint from pyspark.mllib._common import _convert_vector + class MLUtils: """ Helper methods to load, save and pre-process data used in MLlib. @@ -44,7 +45,6 @@ def _parse_libsvm_line(line, multiclass): values[i] = float(value) return label, indices, values - @staticmethod def _convert_labeled_point_to_libsvm(p): """Converts a LabeledPoint to a string in LIBSVM format.""" @@ -62,7 +62,6 @@ def _convert_labeled_point_to_libsvm(p): " but got " % type(v)) return " ".join(items) - @staticmethod def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=None): """ @@ -135,7 +134,6 @@ def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=Non numFeatures = parsed.map(lambda x: 0 if x[1].size == 0 else x[1][-1]).reduce(max) + 1 return parsed.map(lambda x: LabeledPoint(x[0], Vectors.sparse(numFeatures, x[1], x[2]))) - @staticmethod def saveAsLibSVMFile(data, dir): """ diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index fa4b9c7b688ea..b4e9618cc25b5 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -23,14 +23,14 @@ class SQLContext: - """ - Main entry point for SparkSQL functionality. A SQLContext can be used create L{SchemaRDD}s, - register L{SchemaRDD}s as tables, execute sql over tables, cache tables, and read parquet files. + """Main entry point for SparkSQL functionality. + + A SQLContext can be used create L{SchemaRDD}s, register L{SchemaRDD}s as + tables, execute SQL over tables, cache tables, and read parquet files. """ def __init__(self, sparkContext, sqlContext = None): - """ - Create a new SQLContext. + """Create a new SQLContext. @param sparkContext: The SparkContext to wrap. @@ -63,18 +63,20 @@ def __init__(self, sparkContext, sqlContext = None): @property def _ssql_ctx(self): - """ - Accessor for the JVM SparkSQL context. Subclasses can override this property to provide - their own JVM Contexts. + """Accessor for the JVM SparkSQL context. + + Subclasses can override this property to provide their own + JVM Contexts. """ if not hasattr(self, '_scala_SQLContext'): self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc()) return self._scala_SQLContext def inferSchema(self, rdd): - """ - Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to - determine the fields names and types, and then use that to extract all the dictionaries. + """Infer and apply a schema to an RDD of L{dict}s. + + We peek at the first row of the RDD to determine the fields names + and types, and then use that to extract all the dictionaries. >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, @@ -92,9 +94,10 @@ def inferSchema(self, rdd): return SchemaRDD(srdd, self) def registerRDDAsTable(self, rdd, tableName): - """ - Registers the given RDD as a temporary table in the catalog. Temporary tables exist only - during the lifetime of this instance of SQLContext. + """Registers the given RDD as a temporary table in the catalog. + + Temporary tables exist only during the lifetime of this instance of + SQLContext. >>> srdd = sqlCtx.inferSchema(rdd) >>> sqlCtx.registerRDDAsTable(srdd, "table1") @@ -106,8 +109,7 @@ def registerRDDAsTable(self, rdd, tableName): raise ValueError("Can only register SchemaRDD as table") def parquetFile(self, path): - """ - Loads a Parquet file, returning the result as a L{SchemaRDD}. + """Loads a Parquet file, returning the result as a L{SchemaRDD}. >>> import tempfile, shutil >>> parquetFile = tempfile.mkdtemp() @@ -122,8 +124,7 @@ def parquetFile(self, path): return SchemaRDD(jschema_rdd, self) def sql(self, sqlQuery): - """ - Executes a SQL query using Spark, returning the result as a L{SchemaRDD}. + """Return a L{SchemaRDD} representing the result of the given query. >>> srdd = sqlCtx.inferSchema(rdd) >>> sqlCtx.registerRDDAsTable(srdd, "table1") @@ -135,8 +136,7 @@ def sql(self, sqlQuery): return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) def table(self, tableName): - """ - Returns the specified table as a L{SchemaRDD}. + """Returns the specified table as a L{SchemaRDD}. >>> srdd = sqlCtx.inferSchema(rdd) >>> sqlCtx.registerRDDAsTable(srdd, "table1") @@ -147,23 +147,19 @@ def table(self, tableName): return SchemaRDD(self._ssql_ctx.table(tableName), self) def cacheTable(self, tableName): - """ - Caches the specified table in-memory. - """ + """Caches the specified table in-memory.""" self._ssql_ctx.cacheTable(tableName) def uncacheTable(self, tableName): - """ - Removes the specified table from the in-memory cache. - """ + """Removes the specified table from the in-memory cache.""" self._ssql_ctx.uncacheTable(tableName) class HiveContext(SQLContext): - """ - An instance of the Spark SQL execution engine that integrates with data stored in Hive. - Configuration for Hive is read from hive-site.xml on the classpath. It supports running both SQL - and HiveQL commands. + """A variant of Spark SQL that integrates with data stored in Hive. + + Configuration for Hive is read from hive-site.xml on the classpath. + It supports running both SQL and HiveQL commands. """ @property @@ -193,9 +189,10 @@ def hql(self, hqlQuery): class LocalHiveContext(HiveContext): - """ - Starts up an instance of hive where metadata is stored locally. An in-process metadata data is - created with data stored in ./metadata. Warehouse data is stored in in ./warehouse. + """Starts up an instance of hive where metadata is stored locally. + + An in-process metadata data is created with data stored in ./metadata. + Warehouse data is stored in in ./warehouse. >>> import os >>> hiveCtx = LocalHiveContext(sc) @@ -228,8 +225,10 @@ def _get_hive_ctx(self): # TODO: Investigate if it is more efficient to use a namedtuple. One problem is that named tuples # are custom classes that must be generated per Schema. class Row(dict): - """ - An extended L{dict} that takes a L{dict} in its constructor, and exposes those items as fields. + """A row in L{SchemaRDD}. + + An extended L{dict} that takes a L{dict} in its constructor, and + exposes those items as fields. >>> r = Row({"hello" : "world", "foo" : "bar"}) >>> r.hello @@ -245,13 +244,16 @@ def __init__(self, d): class SchemaRDD(RDD): - """ - An RDD of L{Row} objects that has an associated schema. The underlying JVM object is a SchemaRDD, - not a PythonRDD, so we can utilize the relational query api exposed by SparkSQL. + """An RDD of L{Row} objects that has an associated schema. - For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the L{SchemaRDD} is not operated on - directly, as it's underlying implementation is a RDD composed of Java objects. Instead it is - converted to a PythonRDD in the JVM, on which Python operations can be done. + The underlying JVM object is a SchemaRDD, not a PythonRDD, so we can + utilize the relational query api exposed by SparkSQL. + + For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the + L{SchemaRDD} is not operated on directly, as it's underlying + implementation is a RDD composed of Java objects. Instead it is + converted to a PythonRDD in the JVM, on which Python operations can + be done. """ def __init__(self, jschema_rdd, sql_ctx): @@ -266,8 +268,9 @@ def __init__(self, jschema_rdd, sql_ctx): @property def _jrdd(self): - """ - Lazy evaluation of PythonRDD object. Only done when a user calls methods defined by the + """Lazy evaluation of PythonRDD object. + + Only done when a user calls methods defined by the L{pyspark.rdd.RDD} super class (map, filter, etc.). """ if not hasattr(self, '_lazy_jrdd'): @@ -279,10 +282,10 @@ def _id(self): return self._jrdd.id() def saveAsParquetFile(self, path): - """ - Saves the contents of this L{SchemaRDD} as a parquet file, preserving the schema. Files - that are written out using this method can be read back in as a SchemaRDD using the - L{SQLContext.parquetFile} method. + """Save the contents as a Parquet file, preserving the schema. + + Files that are written out using this method can be read back in as + a SchemaRDD using the L{SQLContext.parquetFile} method. >>> import tempfile, shutil >>> parquetFile = tempfile.mkdtemp() @@ -296,9 +299,10 @@ def saveAsParquetFile(self, path): self._jschema_rdd.saveAsParquetFile(path) def registerAsTable(self, name): - """ - Registers this RDD as a temporary table using the given name. The lifetime of this temporary - table is tied to the L{SQLContext} that was used to create this SchemaRDD. + """Registers this RDD as a temporary table using the given name. + + The lifetime of this temporary table is tied to the L{SQLContext} + that was used to create this SchemaRDD. >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.registerAsTable("test") @@ -309,24 +313,22 @@ def registerAsTable(self, name): self._jschema_rdd.registerAsTable(name) def insertInto(self, tableName, overwrite = False): - """ - Inserts the contents of this SchemaRDD into the specified table, - optionally overwriting any existing data. + """Inserts the contents of this SchemaRDD into the specified table. + + Optionally overwriting any existing data. """ self._jschema_rdd.insertInto(tableName, overwrite) def saveAsTable(self, tableName): - """ - Creates a new table with the contents of this SchemaRDD. - """ + """Creates a new table with the contents of this SchemaRDD.""" self._jschema_rdd.saveAsTable(tableName) def count(self): - """ - Return the number of elements in this RDD. Unlike the base RDD - implementation of count, this implementation leverages the query - optimizer to compute the count on the SchemaRDD, which supports - features such as filter pushdown. + """Return the number of elements in this RDD. + + Unlike the base RDD implementation of count, this implementation + leverages the query optimizer to compute the count on the SchemaRDD, + which supports features such as filter pushdown. >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.count() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 9883ebc0b3c62..e855f36256bc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -59,7 +59,7 @@ import java.util.{Map => JMap} * // Importing the SQL context gives access to all the SQL functions and implicit conversions. * import sqlContext._ * - * val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_\$i"))) + * val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))) * // Any RDD containing case classes can be registered as a table. The schema of the table is * // automatically inferred using scala reflection. * rdd.registerAsTable("records") @@ -204,6 +204,20 @@ class SchemaRDD( new SchemaRDD(sqlContext, Aggregate(groupingExprs, aliasedExprs, logicalPlan)) } + /** + * Performs an aggregation over all Rows in this RDD. + * This is equivalent to a groupBy with no grouping expressions. + * + * {{{ + * schemaRDD.aggregate(Sum('sales) as 'totalSales) + * }}} + * + * @group Query + */ + def aggregate(aggregateExprs: Expression*): SchemaRDD = { + groupBy()(aggregateExprs: _*) + } + /** * Applies a qualifier to the attributes of this relation. Can be used to disambiguate attributes * with the same name, for example, when performing self-joins. @@ -281,7 +295,7 @@ class SchemaRDD( * supports features such as filter pushdown. */ @Experimental - override def count(): Long = groupBy()(Count(Literal(1))).collect().head.getLong(0) + override def count(): Long = aggregate(Count(Literal(1))).collect().head.getLong(0) /** * :: Experimental :: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index 94ba13b14b33d..692569a73ffcf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -39,6 +39,14 @@ class DslQuerySuite extends QueryTest { testData2.groupBy('a)('a, Sum('b)), Seq((1,3),(2,3),(3,3)) ) + checkAnswer( + testData2.groupBy('a)('a, Sum('b) as 'totB).aggregate(Sum('totB)), + 9 + ) + checkAnswer( + testData2.aggregate(Sum('b)), + 9 + ) } test("select *") {