diff --git a/python/docs/pyspark.rst b/python/docs/pyspark.rst index e81be3b6cb796..0df12c49ad033 100644 --- a/python/docs/pyspark.rst +++ b/python/docs/pyspark.rst @@ -9,6 +9,7 @@ Subpackages pyspark.sql pyspark.streaming + pyspark.ml pyspark.mllib Contents diff --git a/python/pyspark/ml/__init__.py b/python/pyspark/ml/__init__.py index 5b484f7392521..0a065b3155e1a 100644 --- a/python/pyspark/ml/__init__.py +++ b/python/pyspark/ml/__init__.py @@ -15,310 +15,7 @@ # limitations under the License. # -from abc import ABCMeta, abstractmethod, abstractproperty +from pyspark.ml.param import * +from pyspark.ml.pipeline import * -from pyspark import SparkContext -from pyspark.sql import SchemaRDD, inherit_doc # TODO: move inherit_doc to Spark Core -from pyspark.ml.param import Param, Params -from pyspark.ml.util import Identifiable - -__all__ = ["Pipeline", "Transformer", "Estimator", "param", "feature", "classification"] - - -def _jvm(): - """ - Returns the JVM view associated with SparkContext. Must be called - after SparkContext is initialized. - """ - jvm = SparkContext._jvm - if jvm: - return jvm - else: - raise AttributeError("Cannot load _jvm from SparkContext. Is SparkContext initialized?") - - -@inherit_doc -class PipelineStage(Params): - """ - A stage in a pipeline, either an :py:class:`Estimator` or a - :py:class:`Transformer`. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(PipelineStage, self).__init__() - - -@inherit_doc -class Estimator(PipelineStage): - """ - Abstract class for estimators that fit models to data. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(Estimator, self).__init__() - - @abstractmethod - def fit(self, dataset, params={}): - """ - Fits a model to the input dataset with optional parameters. - - :param dataset: input dataset, which is an instance of - :py:class:`pyspark.sql.SchemaRDD` - :param params: an optional param map that overwrites embedded - params - :returns: fitted model - """ - raise NotImplementedError() - - -@inherit_doc -class Transformer(PipelineStage): - """ - Abstract class for transformers that transform one dataset into - another. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(Transformer, self).__init__() - - @abstractmethod - def transform(self, dataset, params={}): - """ - Transforms the input dataset with optional parameters. - - :param dataset: input dataset, which is an instance of - :py:class:`pyspark.sql.SchemaRDD` - :param params: an optional param map that overwrites embedded - params - :returns: transformed dataset - """ - raise NotImplementedError() - - -@inherit_doc -class Model(Transformer): - """ - Abstract class for models fitted by :py:class:`Estimator`s. - """ - - ___metaclass__ = ABCMeta - - def __init__(self): - super(Model, self).__init__() - - -@inherit_doc -class Pipeline(Estimator): - """ - A simple pipeline, which acts as an estimator. A Pipeline consists - of a sequence of stages, each of which is either an - :py:class:`Estimator` or a :py:class:`Transformer`. When - :py:meth:`Pipeline.fit` is called, the stages are executed in - order. If a stage is an :py:class:`Estimator`, its - :py:meth:`Estimator.fit` method will be called on the input - dataset to fit a model. Then the model, which is a transformer, - will be used to transform the dataset as the input to the next - stage. If a stage is a :py:class:`Transformer`, its - :py:meth:`Transformer.transform` method will be called to produce - the dataset for the next stage. The fitted model from a - :py:class:`Pipeline` is an :py:class:`PipelineModel`, which - consists of fitted models and transformers, corresponding to the - pipeline stages. If there are no stages, the pipeline acts as an - identity transformer. - """ - - def __init__(self): - super(Pipeline, self).__init__() - #: Param for pipeline stages. - self.stages = Param(self, "stages", "pipeline stages") - - def setStages(self, value): - """ - Set pipeline stages. - :param value: a list of transformers or estimators - :return: the pipeline instance - """ - self.paramMap[self.stages] = value - return self - - def getStages(self): - """ - Get pipeline stages. - """ - if self.stages in self.paramMap: - return self.paramMap[self.stages] - - def fit(self, dataset, params={}): - paramMap = self._merge_params(params) - stages = paramMap[self.stages] - for stage in stages: - if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)): - raise ValueError( - "Cannot recognize a pipeline stage of type %s." % type(stage).__name__) - indexOfLastEstimator = -1 - for i, stage in enumerate(stages): - if isinstance(stage, Estimator): - indexOfLastEstimator = i - transformers = [] - for i, stage in enumerate(stages): - if i <= indexOfLastEstimator: - if isinstance(stage, Transformer): - transformers.append(stage) - dataset = stage.transform(dataset, paramMap) - else: # must be an Estimator - model = stage.fit(dataset, paramMap) - transformers.append(model) - if i < indexOfLastEstimator: - dataset = model.transform(dataset, paramMap) - else: - transformers.append(stage) - return PipelineModel(transformers) - - -@inherit_doc -class PipelineModel(Model): - """ - Represents a compiled pipeline with transformers and fitted models. - """ - - def __init__(self, transformers): - super(PipelineModel, self).__init__() - self.transformers = transformers - - def transform(self, dataset, params={}): - paramMap = self._merge_params(params) - for t in self.transformers: - dataset = t.transform(dataset, paramMap) - return dataset - - -@inherit_doc -class JavaWrapper(Params): - """ - Utility class to help create wrapper classes from Java/Scala - implementations of pipeline components. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(JavaWrapper, self).__init__() - - @abstractproperty - def _java_class(self): - """ - Fully-qualified class name of the wrapped Java component. - """ - raise NotImplementedError - - def _java_obj(self): - """ - Returns or creates a Java object. - """ - java_obj = _jvm() - for name in self._java_class.split("."): - java_obj = getattr(java_obj, name) - return java_obj() - - def _transfer_params_to_java(self, params, java_obj): - """ - Transforms the embedded params and additional params to the - input Java object. - :param params: additional params (overwriting embedded values) - :param java_obj: Java object to receive the params - """ - paramMap = self._merge_params(params) - for param in self.params: - if param in paramMap: - java_obj.set(param.name, paramMap[param]) - - def _empty_java_param_map(self): - """ - Returns an empty Java ParamMap reference. - """ - return _jvm().org.apache.spark.ml.param.ParamMap() - - def _create_java_param_map(self, params, java_obj): - paramMap = self._empty_java_param_map() - for param, value in params.items(): - if param.parent is self: - paramMap.put(java_obj.getParam(param.name), value) - return paramMap - - -@inherit_doc -class JavaEstimator(Estimator, JavaWrapper): - """ - Base class for :py:class:`Estimator`s that wrap Java/Scala - implementations. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(JavaEstimator, self).__init__() - - @abstractmethod - def _create_model(self, java_model): - """ - Creates a model from the input Java model reference. - """ - raise NotImplementedError - - def _fit_java(self, dataset, params={}): - """ - Fits a Java model to the input dataset. - :param dataset: input dataset, which is an instance of - :py:class:`pyspark.sql.SchemaRDD` - :param params: additional params (overwriting embedded values) - :return: fitted Java model - """ - java_obj = self._java_obj() - self._transfer_params_to_java(params, java_obj) - return java_obj.fit(dataset._jschema_rdd, self._empty_java_param_map()) - - def fit(self, dataset, params={}): - java_model = self._fit_java(dataset, params) - return self._create_model(java_model) - - -@inherit_doc -class JavaTransformer(Transformer, JavaWrapper): - """ - Base class for :py:class:`Transformer`s that wrap Java/Scala - implementations. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(JavaTransformer, self).__init__() - - def transform(self, dataset, params={}): - java_obj = self._java_obj() - self._transfer_params_to_java({}, java_obj) - java_param_map = self._create_java_param_map(params, java_obj) - return SchemaRDD(java_obj.transform(dataset._jschema_rdd, java_param_map), - dataset.sql_ctx) - - -@inherit_doc -class JavaModel(JavaTransformer): - """ - Base class for :py:class:`Model`s that wrap Java/Scala - implementations. - """ - - __metaclass__ = ABCMeta - - def __init__(self): - super(JavaTransformer, self).__init__() - - def _java_obj(self): - return self._java_model +__all__ = ["Pipeline", "Transformer", "Estimator"] diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 0384c39b5d5ab..4b8f7fbe7d0ae 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -15,12 +15,14 @@ # limitations under the License. # -from pyspark.sql import inherit_doc -from pyspark.ml import JavaEstimator, JavaModel +from pyspark.ml.util import JavaEstimator, JavaModel, inherit_doc from pyspark.ml.param.shared import HasFeaturesCol, HasLabelCol, HasPredictionCol, HasMaxIter,\ HasRegParam +__all__ = ['LogisticRegression', 'LogisticRegressionModel'] + + @inherit_doc class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasMaxIter, HasRegParam): @@ -43,32 +45,17 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti >>> print model.transform(test1).first().prediction 1.0 """ - - def __init__(self): - super(LogisticRegression, self).__init__() - - @property - def _java_class(self): - return "org.apache.spark.ml.classification.LogisticRegression" + _java_class = "org.apache.spark.ml.classification.LogisticRegression" def _create_model(self, java_model): return LogisticRegressionModel(java_model) -@inherit_doc class LogisticRegressionModel(JavaModel): """ Model fitted by LogisticRegression. """ - def __init__(self, java_model): - super(LogisticRegressionModel, self).__init__() - self._java_model = java_model - - @property - def _java_class(self): - return "org.apache.spark.ml.classification.LogisticRegressionModel" - if __name__ == "__main__": import doctest diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 45191c8fcb3dc..c5ea41922e1b6 100644 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -15,11 +15,13 @@ # limitations under the License. # -from pyspark.sql import inherit_doc -from pyspark.ml import JavaTransformer +from pyspark.ml.util import JavaTransformer, inherit_doc from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasNumFeatures +__all__ = ['Tokenizer', 'HashingTF'] + + @inherit_doc class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol): """ @@ -36,12 +38,7 @@ class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol): Row(text=u'a b c', tokens=[u'a', u'b', u'c']) """ - def __init__(self): - super(Tokenizer, self).__init__() - - @property - def _java_class(self): - return "org.apache.spark.ml.feature.Tokenizer" + _java_class = "org.apache.spark.ml.feature.Tokenizer" @inherit_doc @@ -62,12 +59,7 @@ class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures): (5,[2,3,4],[1.0,1.0,1.0]) """ - def __init__(self): - super(HashingTF, self).__init__() - - @property - def _java_class(self): - return "org.apache.spark.ml.feature.HashingTF" + _java_class = "org.apache.spark.ml.feature.HashingTF" if __name__ == "__main__": diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py index 80c62c8cdb5d0..2396e20d23fa4 100644 --- a/python/pyspark/ml/param/__init__.py +++ b/python/pyspark/ml/param/__init__.py @@ -19,7 +19,7 @@ from pyspark.ml.util import Identifiable -__all__ = ["Param"] +__all__ = ['Param', 'Params'] class Param(object): diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py new file mode 100644 index 0000000000000..3ef5a201a04b8 --- /dev/null +++ b/python/pyspark/ml/pipeline.py @@ -0,0 +1,153 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from abc import ABCMeta, abstractmethod + +from pyspark.sql import inherit_doc # TODO: move inherit_doc to Spark Core +from pyspark.ml.param import Param, Params + +__all__ = ['Estimator', 'Transformer', 'Pipeline', 'PipelineModel'] + + +@inherit_doc +class Estimator(Params): + """ + Abstract class for estimators that fit models to data. + """ + + __metaclass__ = ABCMeta + + @abstractmethod + def fit(self, dataset, params={}): + """ + Fits a model to the input dataset with optional parameters. + + :param dataset: input dataset, which is an instance of + :py:class:`pyspark.sql.SchemaRDD` + :param params: an optional param map that overwrites embedded + params + :returns: fitted model + """ + raise NotImplementedError() + + +@inherit_doc +class Transformer(Params): + """ + Abstract class for transformers that transform one dataset into + another. + """ + + __metaclass__ = ABCMeta + + @abstractmethod + def transform(self, dataset, params={}): + """ + Transforms the input dataset with optional parameters. + + :param dataset: input dataset, which is an instance of + :py:class:`pyspark.sql.SchemaRDD` + :param params: an optional param map that overwrites embedded + params + :returns: transformed dataset + """ + raise NotImplementedError() + + +@inherit_doc +class Pipeline(Estimator): + """ + A simple pipeline, which acts as an estimator. A Pipeline consists + of a sequence of stages, each of which is either an + :py:class:`Estimator` or a :py:class:`Transformer`. When + :py:meth:`Pipeline.fit` is called, the stages are executed in + order. If a stage is an :py:class:`Estimator`, its + :py:meth:`Estimator.fit` method will be called on the input + dataset to fit a model. Then the model, which is a transformer, + will be used to transform the dataset as the input to the next + stage. If a stage is a :py:class:`Transformer`, its + :py:meth:`Transformer.transform` method will be called to produce + the dataset for the next stage. The fitted model from a + :py:class:`Pipeline` is an :py:class:`PipelineModel`, which + consists of fitted models and transformers, corresponding to the + pipeline stages. If there are no stages, the pipeline acts as an + identity transformer. + """ + + def __init__(self): + super(Pipeline, self).__init__() + #: Param for pipeline stages. + self.stages = Param(self, "stages", "pipeline stages") + + def setStages(self, value): + """ + Set pipeline stages. + :param value: a list of transformers or estimators + :return: the pipeline instance + """ + self.paramMap[self.stages] = value + return self + + def getStages(self): + """ + Get pipeline stages. + """ + if self.stages in self.paramMap: + return self.paramMap[self.stages] + + def fit(self, dataset, params={}): + paramMap = self._merge_params(params) + stages = paramMap[self.stages] + for stage in stages: + if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)): + raise ValueError( + "Cannot recognize a pipeline stage of type %s." % type(stage).__name__) + indexOfLastEstimator = -1 + for i, stage in enumerate(stages): + if isinstance(stage, Estimator): + indexOfLastEstimator = i + transformers = [] + for i, stage in enumerate(stages): + if i <= indexOfLastEstimator: + if isinstance(stage, Transformer): + transformers.append(stage) + dataset = stage.transform(dataset, paramMap) + else: # must be an Estimator + model = stage.fit(dataset, paramMap) + transformers.append(model) + if i < indexOfLastEstimator: + dataset = model.transform(dataset, paramMap) + else: + transformers.append(stage) + return PipelineModel(transformers) + + +@inherit_doc +class PipelineModel(Transformer): + """ + Represents a compiled pipeline with transformers and fitted models. + """ + + def __init__(self, transformers): + super(PipelineModel, self).__init__() + self.transformers = transformers + + def transform(self, dataset, params={}): + paramMap = self._merge_params(params) + for t in self.transformers: + dataset = t.transform(dataset, paramMap) + return dataset diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 2f5f2d18d5d01..a154b51c11843 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -32,7 +32,7 @@ from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase from pyspark.sql import SchemaRDD -from pyspark.ml import Transformer, Estimator, Model, Pipeline +from pyspark.ml import Transformer, Estimator, Pipeline from pyspark.ml.param import Param @@ -76,7 +76,7 @@ def fit(self, dataset, params={}): return model -class MockModel(MockTransformer, Model): +class MockModel(MockTransformer, Transformer): def __init__(self): super(MockModel, self).__init__() diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py index 5d74088b0b13e..82e1f9fa087e7 100644 --- a/python/pyspark/ml/util.py +++ b/python/pyspark/ml/util.py @@ -16,6 +16,12 @@ # import uuid +from abc import ABCMeta + +from pyspark import SparkContext +from pyspark.sql import DataFrame +from pyspark.ml.param import Params +from pyspark.ml.pipeline import Estimator, Transformer class Identifiable(object): @@ -28,8 +34,144 @@ def __init__(self): #: concatenates the class name, "-", and 8 random hex chars. self.uid = type(self).__name__ + "-" + uuid.uuid4().hex[:8] - def __str__(self): + def __repr__(self): return self.uid - def __repr__(self): - return str(self) + +def inherit_doc(cls): + for name, func in vars(cls).items(): + # only inherit docstring for public functions + if name.startswith("_"): + continue + if not func.__doc__: + for parent in cls.__bases__: + parent_func = getattr(parent, name, None) + if parent_func and getattr(parent_func, "__doc__", None): + func.__doc__ = parent_func.__doc__ + break + return cls + + +def _jvm(): + """ + Returns the JVM view associated with SparkContext. Must be called + after SparkContext is initialized. + """ + jvm = SparkContext._jvm + if jvm: + return jvm + else: + raise AttributeError("Cannot load _jvm from SparkContext. Is SparkContext initialized?") + + +@inherit_doc +class JavaWrapper(Params): + """ + Utility class to help create wrapper classes from Java/Scala + implementations of pipeline components. + """ + + __metaclass__ = ABCMeta + + #: Fully-qualified class name of the wrapped Java component. + _java_class = None + + def _java_obj(self): + """ + Returns or creates a Java object. + """ + java_obj = _jvm() + for name in self._java_class.split("."): + java_obj = getattr(java_obj, name) + return java_obj() + + def _transfer_params_to_java(self, params, java_obj): + """ + Transforms the embedded params and additional params to the + input Java object. + :param params: additional params (overwriting embedded values) + :param java_obj: Java object to receive the params + """ + paramMap = self._merge_params(params) + for param in self.params: + if param in paramMap: + java_obj.set(param.name, paramMap[param]) + + def _empty_java_param_map(self): + """ + Returns an empty Java ParamMap reference. + """ + return _jvm().org.apache.spark.ml.param.ParamMap() + + def _create_java_param_map(self, params, java_obj): + paramMap = self._empty_java_param_map() + for param, value in params.items(): + if param.parent is self: + paramMap.put(java_obj.getParam(param.name), value) + return paramMap + + +@inherit_doc +class JavaEstimator(Estimator, JavaWrapper): + """ + Base class for :py:class:`Estimator`s that wrap Java/Scala + implementations. + """ + + __metaclass__ = ABCMeta + + def _create_model(self, java_model): + """ + Creates a model from the input Java model reference. + """ + return JavaModel(java_model) + + def _fit_java(self, dataset, params={}): + """ + Fits a Java model to the input dataset. + :param dataset: input dataset, which is an instance of + :py:class:`pyspark.sql.SchemaRDD` + :param params: additional params (overwriting embedded values) + :return: fitted Java model + """ + java_obj = self._java_obj() + self._transfer_params_to_java(params, java_obj) + return java_obj.fit(dataset._jschema_rdd, self._empty_java_param_map()) + + def fit(self, dataset, params={}): + java_model = self._fit_java(dataset, params) + return self._create_model(java_model) + + +@inherit_doc +class JavaTransformer(Transformer, JavaWrapper): + """ + Base class for :py:class:`Transformer`s that wrap Java/Scala + implementations. + """ + + __metaclass__ = ABCMeta + + def transform(self, dataset, params={}): + java_obj = self._java_obj() + self._transfer_params_to_java({}, java_obj) + java_param_map = self._create_java_param_map(params, java_obj) + return DataFrame(java_obj.transform(dataset._jschema_rdd, java_param_map), + dataset.sql_ctx) + + +@inherit_doc +class JavaModel(JavaTransformer): + """ + Base class for :py:class:`Model`s that wrap Java/Scala + implementations. + """ + + __metaclass__ = ABCMeta + + def __init__(self, java_model): + super(JavaTransformer, self).__init__() + self._java_model = java_model + + def _java_obj(self): + return self._java_model diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 7d7550c854b2f..c3a6938f56864 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -1794,20 +1794,6 @@ def __repr__(self): return "" % ", ".join(self) -def inherit_doc(cls): - for name, func in vars(cls).items(): - # only inherit docstring for public functions - if name.startswith("_"): - continue - if not func.__doc__: - for parent in cls.__bases__: - parent_func = getattr(parent, name, None) - if parent_func and getattr(parent_func, "__doc__", None): - func.__doc__ = parent_func.__doc__ - break - return cls - - class DataFrame(object): """A collection of rows that have the same columns.