Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-7658
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed May 15, 2015
2 parents 108d56c + 9476148 commit 2eaff06
Show file tree
Hide file tree
Showing 30 changed files with 430 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,6 @@ private[worker] class Worker(
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)

case Heartbeat =>
logInfo(s"Received heartbeat from driver ${sender.path}")

case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[spark] abstract class WebUI(
}

/** Detach a handler from this UI. */
protected def detachHandler(handler: ServletContextHandler) {
def detachHandler(handler: ServletContextHandler) {
handlers -= handler
serverInfo.foreach { info =>
info.rootHandler.removeHandler(handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.xerial.snappy.buffer.CachedBufferAllocator;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -96,6 +97,13 @@ public OutputStream apply(OutputStream stream) {
@After
public void tearDown() {
Utils.deleteRecursively(tempDir);
// This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
// suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
// preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
// needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
synchronized (CachedBufferAllocator.class) {
CachedBufferAllocator.queueTable.clear();
}
final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
if (leakedMemory != 0) {
fail("Test leaked " + leakedMemory + " bytes of managed memory");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,28 +345,40 @@ private[python] class PythonMLLibAPI extends Serializable {
* Returns a list containing weights, mean and covariance of each mixture component.
*/
def trainGaussianMixture(
data: JavaRDD[Vector],
k: Int,
convergenceTol: Double,
data: JavaRDD[Vector],
k: Int,
convergenceTol: Double,
maxIterations: Int,
seed: java.lang.Long): JList[Object] = {
seed: java.lang.Long,
initialModelWeights: java.util.ArrayList[Double],
initialModelMu: java.util.ArrayList[Vector],
initialModelSigma: java.util.ArrayList[Matrix]): JList[Object] = {
val gmmAlg = new GaussianMixture()
.setK(k)
.setConvergenceTol(convergenceTol)
.setMaxIterations(maxIterations)

if (initialModelWeights != null && initialModelMu != null && initialModelSigma != null) {
val gaussians = initialModelMu.asScala.toSeq.zip(initialModelSigma.asScala.toSeq).map {
case (x, y) => new MultivariateGaussian(x.asInstanceOf[Vector], y.asInstanceOf[Matrix])
}
val initialModel = new GaussianMixtureModel(
initialModelWeights.asScala.toArray, gaussians.toArray)
gmmAlg.setInitialModel(initialModel)
}

if (seed != null) gmmAlg.setSeed(seed)

try {
val model = gmmAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK))
var wt = ArrayBuffer.empty[Double]
var mu = ArrayBuffer.empty[Vector]
var mu = ArrayBuffer.empty[Vector]
var sigma = ArrayBuffer.empty[Matrix]
for (i <- 0 until model.k) {
wt += model.weights(i)
mu += model.gaussians(i).mu
sigma += model.gaussians(i).sigma
}
}
List(Vectors.dense(wt.toArray), mu.toArray, sigma.toArray).map(_.asInstanceOf[Object]).asJava
} finally {
data.rdd.unpersist(blocking = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ import org.apache.spark.sql.{SQLContext, Row}
* are drawn from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are
* the respective mean and covariance for each Gaussian distribution i=1..k.
*
* @param weight Weights for each Gaussian distribution in the mixture, where weight(i) is
* the weight for Gaussian i, and weight.sum == 1
* @param mu Means for each Gaussian in the mixture, where mu(i) is the mean for Gaussian i
* @param sigma Covariance maxtrix for each Gaussian in the mixture, where sigma(i) is the
* covariance matrix for Gaussian i
* @param weights Weights for each Gaussian distribution in the mixture, where weights(i) is
* the weight for Gaussian i, and weights.sum == 1
* @param gaussians Array of MultivariateGaussian where gaussians(i) represents
* the Multivariate Gaussian (Normal) Distribution for Gaussian i
*/
@Experimental
class GaussianMixtureModel(
Expand Down
14 changes: 7 additions & 7 deletions python/docs/pyspark.ml.rst
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
pyspark.ml package
=====================
==================

ML Pipeline APIs
--------------
----------------

.. automodule:: pyspark.ml
:members:
:undoc-members:
:inherited-members:

pyspark.ml.param module
-------------------------
-----------------------

.. automodule:: pyspark.ml.param
:members:
Expand All @@ -34,31 +34,31 @@ pyspark.ml.classification module
:inherited-members:

pyspark.ml.recommendation module
-------------------------
--------------------------------

.. automodule:: pyspark.ml.recommendation
:members:
:undoc-members:
:inherited-members:

pyspark.ml.regression module
-------------------------
----------------------------

.. automodule:: pyspark.ml.regression
:members:
:undoc-members:
:inherited-members:

pyspark.ml.tuning module
--------------------------------
------------------------

.. automodule:: pyspark.ml.tuning
:members:
:undoc-members:
:inherited-members:

pyspark.ml.evaluation module
--------------------------------
----------------------------

.. automodule:: pyspark.ml.evaluation
:members:
Expand Down
57 changes: 37 additions & 20 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti
>>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF()
>>> model.transform(test0).head().prediction
0.0
>>> model.weights
DenseVector([5.5...])
>>> model.intercept
-2.68...
>>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF()
>>> model.transform(test1).head().prediction
1.0
Expand All @@ -67,7 +71,7 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred
threshold=0.5, probabilityCol="probability"):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \
threshold=0.5, probabilityCol="probability")
"""
super(LogisticRegression, self).__init__()
Expand All @@ -92,8 +96,8 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre
maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
threshold=0.5, probabilityCol="probability"):
"""
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \
threshold=0.5, probabilityCol="probability")
Sets params for logistic regression.
"""
Expand Down Expand Up @@ -148,6 +152,20 @@ class LogisticRegressionModel(JavaModel):
Model fitted by LogisticRegression.
"""

@property
def weights(self):
"""
Model weights.
"""
return self._call_java("weights")

@property
def intercept(self):
"""
Model intercept.
"""
return self._call_java("intercept")


class TreeClassifierParams(object):
"""
Expand Down Expand Up @@ -202,7 +220,7 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini"):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini")
"""
super(DecisionTreeClassifier, self).__init__()
Expand All @@ -224,9 +242,8 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre
impurity="gini"):
"""
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10,
impurity="gini")
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini")
Sets params for the DecisionTreeClassifier.
"""
kwargs = self.setParams._input_kwargs
Expand Down Expand Up @@ -302,9 +319,9 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini",
numTrees=20, featureSubsetStrategy="auto", seed=42):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini",
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", \
numTrees=20, featureSubsetStrategy="auto", seed=42)
"""
super(RandomForestClassifier, self).__init__()
Expand Down Expand Up @@ -337,9 +354,9 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=42,
impurity="gini", numTrees=20, featureSubsetStrategy="auto"):
"""
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=42,
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=42, \
impurity="gini", numTrees=20, featureSubsetStrategy="auto")
Sets params for linear classification.
"""
Expand Down Expand Up @@ -453,10 +470,10 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="logistic",
maxIter=20, stepSize=0.1):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="logistic",
maxIter=20, stepSize=0.1)
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \
lossType="logistic", maxIter=20, stepSize=0.1)
"""
super(GBTClassifier, self).__init__()
#: param for Loss function which GBT tries to minimize (case-insensitive).
Expand Down Expand Up @@ -484,9 +501,9 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10,
lossType="logistic", maxIter=20, stepSize=0.1):
"""
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10,
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \
lossType="logistic", maxIter=20, stepSize=0.1)
Sets params for Gradient Boosted Tree Classification.
"""
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/ml/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ class RegexTokenizer(JavaTransformer, HasInputCol, HasOutputCol):
def __init__(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+",
inputCol=None, outputCol=None):
"""
__init__(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+",
__init__(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+", \
inputCol=None, outputCol=None)
"""
super(RegexTokenizer, self).__init__()
Expand All @@ -496,7 +496,7 @@ def __init__(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+"
def setParams(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+",
inputCol=None, outputCol=None):
"""
setParams(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+",
setParams(self, minTokenLength=1, gaps=False, pattern="\\p{L}+|[^\\p{L}\\s]+", \
inputCol="input", outputCol="output")
Sets params for this RegexTokenizer.
"""
Expand Down Expand Up @@ -869,7 +869,7 @@ class Word2Vec(JavaEstimator, HasStepSize, HasMaxIter, HasSeed, HasInputCol, Has
def __init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1,
seed=42, inputCol=None, outputCol=None):
"""
__init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1,
__init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, \
seed=42, inputCol=None, outputCol=None)
"""
super(Word2Vec, self).__init__()
Expand All @@ -889,7 +889,7 @@ def __init__(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025,
def setParams(self, vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1,
seed=42, inputCol=None, outputCol=None):
"""
setParams(self, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, seed=42,
setParams(self, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, seed=42, \
inputCol=None, outputCol=None)
Sets params for this Word2Vec.
"""
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/ml/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0,
ratingCol="rating", nonnegative=False, checkpointInterval=10):
"""
__init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=0,
__init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=0, \
ratingCol="rating", nonnegative=false, checkpointInterval=10)
"""
super(ALS, self).__init__()
Expand All @@ -118,8 +118,8 @@ def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItem
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0,
ratingCol="rating", nonnegative=False, checkpointInterval=10):
"""
setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0,
setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0, \
ratingCol="rating", nonnegative=False, checkpointInterval=10)
Sets params for ALS.
"""
Expand Down
Loading

0 comments on commit 2eaff06

Please sign in to comment.