Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30144][ML][PySpark] Make MultilayerPerceptronClassificationModel extend MultilayerPerceptronParams #26838

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.util.VersionUtils.majorMinorVersion

/** Params for Multilayer Perceptron. */
private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
Expand Down Expand Up @@ -247,7 +248,7 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
}
trainer.setStackSize($(blockSize))
val mlpModel = trainer.train(data)
new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)
new MultilayerPerceptronClassificationModel(uid, mlpModel.weights)
}
}

Expand All @@ -273,31 +274,22 @@ object MultilayerPerceptronClassifier
* Each layer has sigmoid activation function, output layer has softmax.
*
* @param uid uid
* @param layers array of layer sizes including input and output layers
* @param weights the weights of layers
*/
@Since("1.5.0")
class MultilayerPerceptronClassificationModel private[ml] (
@Since("1.5.0") override val uid: String,
@Since("1.5.0") val layers: Array[Int],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we update migration guild?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Sean, this question is for you.

@Since("2.0.0") val weights: Vector)
extends ProbabilisticClassificationModel[Vector, MultilayerPerceptronClassificationModel]
with Serializable with MLWritable {
with MultilayerPerceptronParams with Serializable with MLWritable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this change. But do we use MultilayerPerceptronClassificationModel in executors? Like not every classification model extends Serializable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this. Seems only the tree related model extends Serializable.


@Since("1.6.0")
override val numFeatures: Int = layers.head
override lazy val numFeatures: Int = $(layers).head

private[ml] val mlpModel = FeedForwardTopology
.multiLayerPerceptron(layers, softmaxOnTop = true)
@transient private[ml] lazy val mlpModel = FeedForwardTopology
.multiLayerPerceptron($(layers), softmaxOnTop = true)
.model(weights)

/**
* Returns layers in a Java List.
*/
private[ml] def javaLayers: java.util.List[Int] = {
layers.toList.asJava
}

/**
* Predict label for the given features.
* This internal method is used to implement `transform()` and output [[predictionCol]].
Expand All @@ -308,7 +300,8 @@ class MultilayerPerceptronClassificationModel private[ml] (

@Since("1.5.0")
override def copy(extra: ParamMap): MultilayerPerceptronClassificationModel = {
val copied = new MultilayerPerceptronClassificationModel(uid, layers, weights).setParent(parent)
val copied = new MultilayerPerceptronClassificationModel(uid, weights)
.setParent(parent)
copyValues(copied, extra)
}

Expand All @@ -322,11 +315,11 @@ class MultilayerPerceptronClassificationModel private[ml] (

override protected def predictRaw(features: Vector): Vector = mlpModel.predictRaw(features)

override def numClasses: Int = layers.last
override def numClasses: Int = $(layers).last

@Since("3.0.0")
override def toString: String = {
s"MultilayerPerceptronClassificationModel: uid=$uid, numLayers=${layers.length}, " +
s"MultilayerPerceptronClassificationModel: uid=$uid, numLayers=${$(layers).length}, " +
s"numClasses=$numClasses, numFeatures=$numFeatures"
}
}
Expand All @@ -347,13 +340,13 @@ object MultilayerPerceptronClassificationModel
class MultilayerPerceptronClassificationModelWriter(
instance: MultilayerPerceptronClassificationModel) extends MLWriter {

private case class Data(layers: Array[Int], weights: Vector)
private case class Data(weights: Vector)

override protected def saveImpl(path: String): Unit = {
// Save metadata and Params
DefaultParamsWriter.saveMetadata(instance, path, sc)
// Save model data: layers, weights
val data = Data(instance.layers, instance.weights)
// Save model data: weights
val data = Data(instance.weights)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}
Expand All @@ -367,13 +360,21 @@ object MultilayerPerceptronClassificationModel

override def load(path: String): MultilayerPerceptronClassificationModel = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val (majorVersion, _) = majorMinorVersion(metadata.sparkVersion)

val dataPath = new Path(path, "data").toString
val data = sparkSession.read.parquet(dataPath).select("layers", "weights").head()
val layers = data.getAs[Seq[Int]](0).toArray
val weights = data.getAs[Vector](1)
val model = new MultilayerPerceptronClassificationModel(metadata.uid, layers, weights)

val df = sparkSession.read.parquet(dataPath)
val model = if (majorVersion < 3) { // model prior to 3.0.0
val data = df.select("layers", "weights").head()
val layers = data.getAs[Seq[Int]](0).toArray
val weights = data.getAs[Vector](1)
val model = new MultilayerPerceptronClassificationModel(metadata.uid, weights)
model.set("layers", layers)
} else {
val data = df.select("weights").head()
val weights = data.getAs[Vector](0)
new MultilayerPerceptronClassificationModel(metadata.uid, weights)
}
metadata.getAndSetParams(model)
model
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[r] class MultilayerPerceptronClassifierWrapper private (
pipeline.stages(1).asInstanceOf[MultilayerPerceptronClassificationModel]

lazy val weights: Array[Double] = mlpModel.weights.toArray
lazy val layers: Array[Int] = mlpModel.layers
lazy val layers: Array[Int] = mlpModel.getLayers

def transform(dataset: Dataset[_]): DataFrame = {
pipeline.transform(dataset)
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"class":"org.apache.spark.ml.feature.HashingTF","timestamp":1577833408759,"sparkVersion":"2.4.4","uid":"hashingTF_f4565fe7f7da","paramMap":{"numFeatures":100,"outputCol":"features","inputCol":"words","binary":true},"defaultParamMap":{"numFeatures":262144,"outputCol":"hashingTF_f4565fe7f7da__output","binary":false}}
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"class":"org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel","timestamp":1577833765310,"sparkVersion":"2.4.4","uid":"mlpc_30aa2f44dacc","paramMap":{},"defaultParamMap":{"rawPredictionCol":"rawPrediction","predictionCol":"prediction","probabilityCol":"probability","labelCol":"label","featuresCol":"features"}}
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1577831053235,"sparkVersion":"2.4.4","uid":"myStringIndexerModel","paramMap":{"inputCol":"myInputCol","outputCol":"myOutputCol","handleInvalid":"skip"},"defaultParamMap":{"outputCol":"myStringIndexerModel__output","handleInvalid":"error"}}
Binary file not shown.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,17 @@ class MultilayerPerceptronClassifierSuite extends MLTest with DefaultReadWriteTe
assert(expected.weights === actual.weights)
}
}

test("Load MultilayerPerceptronClassificationModel prior to Spark 3.0") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a general question, we also have other algorithms that modified the load/save method (like NaiveBayes), do we need to add testsuites for them like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok either way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so add similar test in NaiveBayes (and other algorithms that modified the load/save method) too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe needed, but in other PRs.

val mlpPath = testFile("ml-models/mlp-2.4.4")
val model = MultilayerPerceptronClassificationModel.load(mlpPath)
val layers = model.getLayers
assert(layers(0) === 4)
assert(layers(1) === 5)
assert(layers(2) === 2)

val metadata = spark.read.json(s"$mlpPath/metadata")
val sparkVersionStr = metadata.select("sparkVersion").first().getString(0)
assert(sparkVersionStr == "2.4.4")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class HashingTFSuite extends MLTest with DefaultReadWriteTest {
}

test("SPARK-23469: Load HashingTF prior to Spark 3.0") {
val hashingTFPath = testFile("test-data/hashingTF-pre3.0")
val hashingTFPath = testFile("ml-models/hashingTF-2.4.4")
val loadedHashingTF = HashingTF.load(hashingTFPath)
val mLlibHashingTF = new MLlibHashingTF(100)
assert(loadedHashingTF.indexOf("a") === mLlibHashingTF.indexOf("a"))
Expand All @@ -99,7 +99,7 @@ class HashingTFSuite extends MLTest with DefaultReadWriteTest {

val metadata = spark.read.json(s"$hashingTFPath/metadata")
val sparkVersionStr = metadata.select("sparkVersion").first().getString(0)
assert(sparkVersionStr == "2.3.0-SNAPSHOT")
assert(sparkVersionStr == "2.4.4")
}

test("read/write") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,13 @@ class StringIndexerSuite extends MLTest with DefaultReadWriteTest {
}

test("Load StringIndexderModel prior to Spark 3.0") {
val modelPath = testFile("test-data/strIndexerModel")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strIndexerModel-2.4.4?

val modelPath = testFile("ml-models/strIndexerModel-2.4.4")

val loadedModel = StringIndexerModel.load(modelPath)
assert(loadedModel.labelsArray === Array(Array("b", "c", "a")))

val metadata = spark.read.json(s"$modelPath/metadata")
val sparkVersionStr = metadata.select("sparkVersion").first().getString(0)
assert(sparkVersionStr == "2.4.1-SNAPSHOT")
assert(sparkVersionStr == "2.4.4")
}
}
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ object MimaExcludes {
// [SPARK-26457] Show hadoop configurations in HistoryServer environment tab
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ApplicationEnvironmentInfo.this"),

// [SPARK-30144][ML] Make MultilayerPerceptronClassificationModel extend MultilayerPerceptronParams
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.layers"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Is this worth to break the API, @huaxingao ?

ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.this"),

// Data Source V2 API changes
(problem: Problem) => problem match {
case MissingClassProblem(cls) =>
Expand Down
19 changes: 7 additions & 12 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -2145,7 +2145,9 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer
>>> model = mlp.fit(df)
>>> model.setFeaturesCol("features")
MultilayerPerceptronClassificationModel...
>>> model.layers
>>> model.getMaxIter()
100
>>> model.getLayers()
[2, 2, 2]
>>> model.weights.size
12
Expand All @@ -2170,15 +2172,15 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer
>>> model_path = temp_path + "/mlp_model"
>>> model.save(model_path)
>>> model2 = MultilayerPerceptronClassificationModel.load(model_path)
>>> model.layers == model2.layers
>>> model.getLayers() == model2.getLayers()
True
>>> model.weights == model2.weights
True
>>> mlp2 = mlp2.setInitialWeights(list(range(0, 12)))
>>> model3 = mlp2.fit(df)
>>> model3.weights != model2.weights
True
>>> model3.layers == model.layers
>>> model3.getLayers() == model.getLayers()
True

.. versionadded:: 1.6.0
Expand Down Expand Up @@ -2274,22 +2276,15 @@ def setSolver(self, value):
return self._set(solver=value)


class MultilayerPerceptronClassificationModel(JavaProbabilisticClassificationModel, JavaMLWritable,
class MultilayerPerceptronClassificationModel(JavaProbabilisticClassificationModel,
_MultilayerPerceptronParams, JavaMLWritable,
JavaMLReadable):
"""
Model fitted by MultilayerPerceptronClassifier.

.. versionadded:: 1.6.0
"""

@property
@since("1.6.0")
def layers(self):
"""
array of layer sizes including input and output layers.
"""
return self._call_java("javaLayers")

@property
@since("2.0.0")
def weights(self):
Expand Down