From 9cd7c86fad04c814b2c8f5547583122ba12c359b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 13 Feb 2018 03:51:41 +0000 Subject: [PATCH 1/3] Remove outputCol default value if inputCols is set. --- .../apache/spark/ml/feature/Bucketizer.scala | 24 +++++++++++++++++++ .../ml/feature/QuantileDiscretizer.scala | 24 +++++++++++++++++++ .../org/apache/spark/ml/param/params.scala | 9 +++++++ .../spark/ml/feature/BucketizerSuite.scala | 12 ++++++++-- .../ml/feature/QuantileDiscretizerSuite.scala | 14 +++++++++-- project/MimaExcludes.scala | 5 +++- 6 files changed, 83 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index c13bf47eacb94..f3aa808205995 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -213,6 +213,9 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String override def copy(extra: ParamMap): Bucketizer = { defaultCopy[Bucketizer](extra).setParent(parent) } + + @Since("2.3.0") + override def write: MLWriter = new Bucketizer.BucketizerWriter(this) } @Since("1.6.0") @@ -290,6 +293,27 @@ object Bucketizer extends DefaultParamsReadable[Bucketizer] { } } + + private[Bucketizer] class BucketizerWriter(instance: Bucketizer) extends MLWriter { + + override protected def saveImpl(path: String): Unit = { + // SPARK-23377: The default params will be saved and loaded as user-supplied params. + // Once `inputCols` is set, the default value of `outputCol` param causes the error + // when checking exclusive params. As a temporary to fix it, we remove the default + // value of `outputCol` if `inputCols` is set before saving. + // TODO: If we modify the persistence mechanism later to better handle default params, + // we can get rid of this. + var removedOutputCol: Option[String] = None + if (instance.isSet(instance.inputCols)) { + removedOutputCol = instance.getDefault(instance.outputCol) + instance.clearDefault(instance.outputCol) + } + DefaultParamsWriter.saveMetadata(instance, path, sc) + // Add the default param back. + removedOutputCol.map(instance.setDefault(instance.outputCol, _)) + } + } + @Since("1.6.0") override def load(path: String): Bucketizer = super.load(path) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 1ec5f8cb6139b..ae7161a169e1b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -249,11 +249,35 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("1.6.0") override def copy(extra: ParamMap): QuantileDiscretizer = defaultCopy(extra) + + @Since("2.3.0") + override def write: MLWriter = new QuantileDiscretizer.QuantileDiscretizerWriter(this) } @Since("1.6.0") object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] with Logging { + private[QuantileDiscretizer] + class QuantileDiscretizerWriter(instance: QuantileDiscretizer) extends MLWriter { + + override protected def saveImpl(path: String): Unit = { + // SPARK-23377: The default params will be saved and loaded as user-supplied params. + // Once `inputCols` is set, the default value of `outputCol` param causes the error + // when checking exclusive params. As a temporary to fix it, we remove the default + // value of `outputCol` if `inputCols` is set before saving. + // TODO: If we modify the persistence mechanism later to better handle default params, + // we can get rid of this. + var removedOutputCol: Option[String] = None + if (instance.isSet(instance.inputCols)) { + removedOutputCol = instance.getDefault(instance.outputCol) + instance.clearDefault(instance.outputCol) + } + DefaultParamsWriter.saveMetadata(instance, path, sc) + // Add the default param back. + removedOutputCol.map(instance.setDefault(instance.outputCol, _)) + } + } + @Since("1.6.0") override def load(path: String): QuantileDiscretizer = super.load(path) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 9a83a5882ce29..a34e738c11ed8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -830,6 +830,15 @@ trait Params extends Identifiable with Serializable { defaultParamMap.contains(param) } + /** + * Clears the default value for the input param. + */ + final def clearDefault[T](param: Param[T]): this.type = { + shouldOwn(param) + defaultParamMap.remove(param) + this + } + /** * Creates a copy of this instance with the same UID and some extra params. * Subclasses should implement this method and set the return type properly. diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index 7403680ae3fdc..41cf72fe3470a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -172,7 +172,10 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa .setInputCol("myInputCol") .setOutputCol("myOutputCol") .setSplits(Array(0.1, 0.8, 0.9)) - testDefaultReadWrite(t) + + val bucketizer = testDefaultReadWrite(t) + val data = Seq((1.0, 2.0), (10.0, 100.0), (101.0, -1.0)).toDF("myInputCol", "myInputCol2") + bucketizer.transform(data) } test("Bucket numeric features") { @@ -327,7 +330,12 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa .setInputCols(Array("myInputCol")) .setOutputCols(Array("myOutputCol")) .setSplitsArray(Array(Array(0.1, 0.8, 0.9))) - testDefaultReadWrite(t) + + val bucketizer = testDefaultReadWrite(t) + val data = Seq((1.0, 2.0), (10.0, 100.0), (101.0, -1.0)).toDF("myInputCol", "myInputCol2") + bucketizer.transform(data) + assert(t.hasDefault(t.outputCol)) + assert(bucketizer.hasDefault(bucketizer.outputCol)) } test("Bucketizer in a pipeline") { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index e9a75e931e6a8..6c363799dd300 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.functions.udf class QuantileDiscretizerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + import testImplicits._ + test("Test observed number of buckets and their sizes match expected values") { val spark = this.spark import spark.implicits._ @@ -132,7 +134,10 @@ class QuantileDiscretizerSuite .setInputCol("myInputCol") .setOutputCol("myOutputCol") .setNumBuckets(6) - testDefaultReadWrite(t) + + val readDiscretizer = testDefaultReadWrite(t) + val data = sc.parallelize(1 to 100).map(Tuple1.apply).toDF("myInputCol") + readDiscretizer.fit(data) } test("Verify resulting model has parent") { @@ -379,7 +384,12 @@ class QuantileDiscretizerSuite .setInputCols(Array("input1", "input2")) .setOutputCols(Array("result1", "result2")) .setNumBucketsArray(Array(5, 10)) - testDefaultReadWrite(discretizer) + + val readDiscretizer = testDefaultReadWrite(discretizer) + val data = Seq((1.0, 2.0), (2.0, 3.0), (3.0, 4.0)).toDF("input1", "input2") + readDiscretizer.fit(data) + assert(discretizer.hasDefault(discretizer.outputCol)) + assert(readDiscretizer.hasDefault(readDiscretizer.outputCol)) } test("Multiple Columns: Both inputCol and inputCols are set") { diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index d35c50e1d00fe..b3a3629032404 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -135,7 +135,10 @@ object MimaExcludes { ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.ml.feature.Bucketizer.getHandleInvalid"), ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.ml.feature.StringIndexer.getHandleInvalid"), ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.ml.feature.QuantileDiscretizer.getHandleInvalid"), - ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.ml.feature.StringIndexerModel.getHandleInvalid") + ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.ml.feature.StringIndexerModel.getHandleInvalid"), + + // [SPARK-23377][ML] Fixes Bucketizer with multiple columns persistence bug + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.param.Params.clearDefault") ) // Exclude rules for 2.2.x From 3a290392e87e6476b3a9253b902850a078dfc4ea Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 14 Feb 2018 04:35:11 +0000 Subject: [PATCH 2/3] Choose to skip default of outputCol when saving metadata. --- .../apache/spark/ml/feature/Bucketizer.scala | 21 ++++++++++++------- .../ml/feature/QuantileDiscretizer.scala | 21 ++++++++++++------- .../org/apache/spark/ml/param/params.scala | 9 -------- project/MimaExcludes.scala | 5 +---- 4 files changed, 27 insertions(+), 29 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index f3aa808205995..9eb5031c754f6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -19,6 +19,10 @@ package org.apache.spark.ml.feature import java.{util => ju} +import org.json4s.JsonDSL._ +import org.json4s.JValue +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.ml.Model @@ -299,18 +303,19 @@ object Bucketizer extends DefaultParamsReadable[Bucketizer] { override protected def saveImpl(path: String): Unit = { // SPARK-23377: The default params will be saved and loaded as user-supplied params. // Once `inputCols` is set, the default value of `outputCol` param causes the error - // when checking exclusive params. As a temporary to fix it, we remove the default - // value of `outputCol` if `inputCols` is set before saving. + // when checking exclusive params. As a temporary to fix it, we skip the default value + // of `outputCol` if `inputCols` is set when saving the metadata. // TODO: If we modify the persistence mechanism later to better handle default params, // we can get rid of this. - var removedOutputCol: Option[String] = None + var paramWithoutOutputCol: Option[JValue] = None if (instance.isSet(instance.inputCols)) { - removedOutputCol = instance.getDefault(instance.outputCol) - instance.clearDefault(instance.outputCol) + val params = instance.extractParamMap().toSeq.asInstanceOf[Seq[ParamPair[Any]]] + val jsonParams = params.filter(_.param != instance.outputCol).map { case ParamPair(p, v) => + p.name -> parse(p.jsonEncode(v)) + }.toList + paramWithoutOutputCol = Some(render(jsonParams)) } - DefaultParamsWriter.saveMetadata(instance, path, sc) - // Add the default param back. - removedOutputCol.map(instance.setDefault(instance.outputCol, _)) + DefaultParamsWriter.saveMetadata(instance, path, sc, paramMap = paramWithoutOutputCol) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index ae7161a169e1b..0dbf98ba3473e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -17,6 +17,10 @@ package org.apache.spark.ml.feature +import org.json4s.JsonDSL._ +import org.json4s.JValue +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml._ @@ -263,18 +267,19 @@ object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] wi override protected def saveImpl(path: String): Unit = { // SPARK-23377: The default params will be saved and loaded as user-supplied params. // Once `inputCols` is set, the default value of `outputCol` param causes the error - // when checking exclusive params. As a temporary to fix it, we remove the default - // value of `outputCol` if `inputCols` is set before saving. + // when checking exclusive params. As a temporary to fix it, we skip the default value + // of `outputCol` if `inputCols` is set when saving the metadata. // TODO: If we modify the persistence mechanism later to better handle default params, // we can get rid of this. - var removedOutputCol: Option[String] = None + var paramWithoutOutputCol: Option[JValue] = None if (instance.isSet(instance.inputCols)) { - removedOutputCol = instance.getDefault(instance.outputCol) - instance.clearDefault(instance.outputCol) + val params = instance.extractParamMap().toSeq.asInstanceOf[Seq[ParamPair[Any]]] + val jsonParams = params.filter(_.param != instance.outputCol).map { case ParamPair(p, v) => + p.name -> parse(p.jsonEncode(v)) + }.toList + paramWithoutOutputCol = Some(render(jsonParams)) } - DefaultParamsWriter.saveMetadata(instance, path, sc) - // Add the default param back. - removedOutputCol.map(instance.setDefault(instance.outputCol, _)) + DefaultParamsWriter.saveMetadata(instance, path, sc, paramMap = paramWithoutOutputCol) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index a34e738c11ed8..9a83a5882ce29 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -830,15 +830,6 @@ trait Params extends Identifiable with Serializable { defaultParamMap.contains(param) } - /** - * Clears the default value for the input param. - */ - final def clearDefault[T](param: Param[T]): this.type = { - shouldOwn(param) - defaultParamMap.remove(param) - this - } - /** * Creates a copy of this instance with the same UID and some extra params. * Subclasses should implement this method and set the return type properly. diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index b3a3629032404..d35c50e1d00fe 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -135,10 +135,7 @@ object MimaExcludes { ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.ml.feature.Bucketizer.getHandleInvalid"), ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.ml.feature.StringIndexer.getHandleInvalid"), ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.ml.feature.QuantileDiscretizer.getHandleInvalid"), - ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.ml.feature.StringIndexerModel.getHandleInvalid"), - - // [SPARK-23377][ML] Fixes Bucketizer with multiple columns persistence bug - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.ml.param.Params.clearDefault") + ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.ml.feature.StringIndexerModel.getHandleInvalid") ) // Exclude rules for 2.2.x From 174c1148e595b5eda731dbdae2c6fbf1a4dd4837 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 15 Feb 2018 00:14:57 +0000 Subject: [PATCH 3/3] Address comments. --- .../main/scala/org/apache/spark/ml/feature/Bucketizer.scala | 3 +-- .../org/apache/spark/ml/feature/QuantileDiscretizer.scala | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index 9eb5031c754f6..f49c410cbcfe2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -218,7 +218,6 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String defaultCopy[Bucketizer](extra).setParent(parent) } - @Since("2.3.0") override def write: MLWriter = new Bucketizer.BucketizerWriter(this) } @@ -309,7 +308,7 @@ object Bucketizer extends DefaultParamsReadable[Bucketizer] { // we can get rid of this. var paramWithoutOutputCol: Option[JValue] = None if (instance.isSet(instance.inputCols)) { - val params = instance.extractParamMap().toSeq.asInstanceOf[Seq[ParamPair[Any]]] + val params = instance.extractParamMap().toSeq val jsonParams = params.filter(_.param != instance.outputCol).map { case ParamPair(p, v) => p.name -> parse(p.jsonEncode(v)) }.toList diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 0dbf98ba3473e..3b4c25478fb1d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -254,7 +254,6 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("1.6.0") override def copy(extra: ParamMap): QuantileDiscretizer = defaultCopy(extra) - @Since("2.3.0") override def write: MLWriter = new QuantileDiscretizer.QuantileDiscretizerWriter(this) } @@ -273,7 +272,7 @@ object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] wi // we can get rid of this. var paramWithoutOutputCol: Option[JValue] = None if (instance.isSet(instance.inputCols)) { - val params = instance.extractParamMap().toSeq.asInstanceOf[Seq[ParamPair[Any]]] + val params = instance.extractParamMap().toSeq val jsonParams = params.filter(_.param != instance.outputCol).map { case ParamPair(p, v) => p.name -> parse(p.jsonEncode(v)) }.toList