From 23b3b827a850a5ef47fbe2992ec10dae2d89c2f6 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Sat, 25 Jan 2025 16:23:46 +0800 Subject: [PATCH] fix --- .../services/org.apache.spark.ml.Estimator | 1 + .../services/org.apache.spark.ml.Transformer | 1 + .../org/apache/spark/ml/feature/Imputer.scala | 2 + python/pyspark/ml/feature.py | 1 + python/pyspark/ml/tests/test_feature.py | 42 +++++++++++++++++++ .../apache/spark/sql/connect/ml/MLUtils.scala | 1 + 6 files changed, 48 insertions(+) diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator index 449a07aed31d7..1183f50ae7f36 100644 --- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator +++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator @@ -43,6 +43,7 @@ org.apache.spark.ml.recommendation.ALS org.apache.spark.ml.fpm.FPGrowth # feature +org.apache.spark.ml.feature.Imputer org.apache.spark.ml.feature.StandardScaler org.apache.spark.ml.feature.MaxAbsScaler org.apache.spark.ml.feature.MinMaxScaler diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer index f13924931e923..74a2c960a98b9 100644 --- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer +++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer @@ -56,6 +56,7 @@ org.apache.spark.ml.recommendation.ALSModel org.apache.spark.ml.fpm.FPGrowthModel # feature +org.apache.spark.ml.feature.ImputerModel org.apache.spark.ml.feature.StandardScalerModel org.apache.spark.ml.feature.MaxAbsScalerModel org.apache.spark.ml.feature.MinMaxScalerModel diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala index ed093c4ba35d3..2f51ae2d7fe39 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala @@ -246,6 +246,8 @@ class ImputerModel private[ml] ( import ImputerModel._ + private[ml] def this() = this(Identifiable.randomUID("imputer"), null) + /** @group setParam */ @Since("3.0.0") def setInputCol(value: String): this.type = set(inputCol, value) diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 4c218267749cc..4cc45c1bf1947 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -2261,6 +2261,7 @@ def setOutputCol(self, value: str) -> "ImputerModel": @property @since("2.2.0") + @try_remote_attribute_relation def surrogateDF(self) -> DataFrame: """ Returns a DataFrame containing inputCols and their corresponding surrogates, diff --git a/python/pyspark/ml/tests/test_feature.py b/python/pyspark/ml/tests/test_feature.py index beb2a80443cc0..2017d87f73bc5 100644 --- a/python/pyspark/ml/tests/test_feature.py +++ b/python/pyspark/ml/tests/test_feature.py @@ -34,6 +34,8 @@ HashingTF, IDF, IDFModel, + Imputer, + ImputerModel, NGram, RFormula, Tokenizer, @@ -541,6 +543,46 @@ def test_word2vec(self): model2 = Word2VecModel.load(d) self.assertEqual(str(model), str(model2)) + def test_imputer(self): + spark = self.spark + df = spark.createDataFrame( + [ + (1.0, float("nan")), + (2.0, float("nan")), + (float("nan"), 3.0), + (4.0, 4.0), + (5.0, 5.0), + ], + ["a", "b"], + ) + + imputer = Imputer(strategy="mean") + imputer.setInputCols(["a", "b"]) + imputer.setOutputCols(["out_a", "out_b"]) + + self.assertEqual(imputer.getStrategy(), "mean") + self.assertEqual(imputer.getInputCols(), ["a", "b"]) + self.assertEqual(imputer.getOutputCols(), ["out_a", "out_b"]) + + model = imputer.fit(df) + self.assertEqual(model.surrogateDF.columns, ["a", "b"]) + self.assertEqual(model.surrogateDF.count(), 1) + self.assertEqual(list(model.surrogateDF.head()), [3.0, 4.0]) + + output = model.transform(df) + self.assertEqual(output.columns, ["a", "b", "out_a", "out_b"]) + self.assertEqual(output.count(), 5) + + # save & load + with tempfile.TemporaryDirectory(prefix="word2vec") as d: + imputer.write().overwrite().save(d) + imputer2 = Imputer.load(d) + self.assertEqual(str(imputer), str(imputer2)) + + model.write().overwrite().save(d) + model2 = ImputerModel.load(d) + self.assertEqual(str(model), str(model2)) + def test_count_vectorizer(self): df = self.spark.createDataFrame( [(0, ["a", "b", "c"]), (1, ["a", "b", "b", "c", "a"])], diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala index 833c00b1a5c33..cd6e13f33d2bc 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala @@ -582,6 +582,7 @@ private[ml] object MLUtils { (classOf[FPGrowthModel], Set("associationRules", "freqItemsets")), // Feature Models + (classOf[ImputerModel], Set("surrogateDF")), (classOf[StandardScalerModel], Set("mean", "std")), (classOf[MaxAbsScalerModel], Set("maxAbs")), (classOf[MinMaxScalerModel], Set("originalMax", "originalMin")),