From d4b14596e9517e6ec272dd530e569b3373c31fcb Mon Sep 17 00:00:00 2001 From: Junyang Qian Date: Mon, 29 Aug 2016 14:09:22 -0700 Subject: [PATCH 1/4] Add Kolmogorov-Smirnov Test wrapper to SparkR. Currently only support normal distribution as null hypothesis. --- R/pkg/NAMESPACE | 7 +- R/pkg/R/generics.R | 4 + R/pkg/R/mllib.R | 108 ++++++++++++++++++ R/pkg/inst/tests/testthat/test_mllib.R | 20 ++++ .../org/apache/spark/ml/r/KSTestWrapper.scala | 57 +++++++++ 5 files changed, 194 insertions(+), 2 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/r/KSTestWrapper.scala diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index ad587a6b7d03a..da111c9564b85 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -35,7 +35,8 @@ exportMethods("glm", "spark.perplexity", "spark.isoreg", "spark.gaussianMixture", - "spark.als") + "spark.als", + "spark.kstest") # Job group lifecycle management methods export("setJobGroup", @@ -335,7 +336,8 @@ export("as.DataFrame", "tables", "uncacheTable", "print.summary.GeneralizedLinearRegressionModel", - "read.ml") + "read.ml", + "print.summary.KSTest") export("structField", "structField.jobj", @@ -359,6 +361,7 @@ S3method(print, jobj) S3method(print, structField) S3method(print, structType) S3method(print, summary.GeneralizedLinearRegressionModel) +S3method(print, summary.KSTest) S3method(structField, character) S3method(structField, jobj) S3method(structType, jobj) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 7e626be50808d..67a999da9bc26 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1375,3 +1375,7 @@ setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") #' @rdname spark.als #' @export setGeneric("spark.als", function(data, ...) { standardGeneric("spark.als") }) + +#' @rdname spark.kstest +#' @export +setGeneric("spark.kstest", function(data, ...) { standardGeneric("spark.kstest") }) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 64d19fab7ec8f..3778833779fc8 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -88,6 +88,13 @@ setClass("GaussianMixtureModel", representation(jobj = "jobj")) #' @note ALSModel since 2.1.0 setClass("ALSModel", representation(jobj = "jobj")) +#' S4 class that represents an KSTest +#' +#' @param jobj a Java object reference to the backing Scala KSTestWrapper +#' @export +#' @note KSTest since 2.1.0 +setClass("KSTest", representation(jobj = "jobj")) + #' Saves the MLlib model to the input path #' #' Saves the MLlib model to the input path. For more information, see the specific @@ -1308,3 +1315,104 @@ setMethod("write.ml", signature(object = "ALSModel", path = "character"), function(object, path, overwrite = FALSE) { write_internal(object, path, overwrite) }) + +#' (One-Sample) Kolmogorov-Smirnov Test +#' +#' @description +#' \code{spark.kstest} Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a +#' continuous distribution. +#' +#' By comparing the largest difference between the empirical cumulative +#' distribution of the sample data and the theoretical distribution we can provide a test for the +#' the null hypothesis that the sample data comes from that theoretical distribution. +#' +#' Users can call \code{summary} to obtain a summary of the test, and \code{print.summary.KSTest} +#' to print out a summary result. +#' +#' @details +#' For more details, see +#' \href{http://spark.apache.org/docs/latest/mllib-statistics.html#hypothesis-testing}{ +#' MLlib: Hypothesis Testing}. +#' +#' @param data a SparkDataFrame of user data. +#' @param testCol column name where the test data is from. It should be a column of double type. +#' @param nullHypothesis name of the theoretical distribution tested against. Currently only +#' \code{"norm"} for normal distribution is supported. +#' @param distParams parameters(s) of the distribution. For \code{nullHypothesis = "norm"}, +#' we can provide as a vector the mean and standard deviation of +#' the distribution. If none is provided, then standard normal will be used. +#' If only one is provided, then the standard deviation will be set to be one. +#' @param ... additional argument(s) passed to the method. +#' @return \code{spark.kstest} returns a test result object. +#' @rdname spark.kstest +#' @aliases spark.kstest,SparkDataFrame-method +#' @name spark.kstest +#' @export +#' @examples +#' \dontrun{ +#' data <- data.frame(test = c(0.1, 0.15, 0.2, 0.3, 0.25)) +#' df <- createDataFrame(data) +#' test <- spark.ktest(df, "test", "norm", c(0, 1)) +#' +#' # get a summary of the test result +#' testSummary <- summary(test) +#' testSummary +#' +#' # print out the summary in an organized way +#' print.summary.KSTest(test) +#' } +#' @note spark.kstest since 2.1.0 +setMethod("spark.kstest", signature(data = "SparkDataFrame"), + function(data, testCol = "test", nullHypothesis = c("norm"), distParams = c(0, 1)) { + tryCatch(match.arg(nullHypothesis), + error = function(e) { + msg <- paste("Distribution", nullHypothesis, "is not supported.") + stop(msg) + }) + if (nullHypothesis == "norm") { + distParams <- as.numeric(distParams) + mu <- ifelse(length(distParams) < 1, 0, distParams[1]) + sigma <- ifelse(length(distParams) < 2, 1, distParams[2]) + jobj <- callJStatic("org.apache.spark.ml.r.KSTestWrapper", + "test", data@sdf, testCol, nullHypothesis, + as.array(c(mu, sigma))) + new("KSTest", jobj = jobj) + } +}) + +# Get the summary of Kolmogorov-Smirnov (KS) Test. +#' @param object test result object of KS. +#' @return \code{summary} returns a list containing the p-value, test statistic computed for the +#' test, the null hypothesis with its parameters tested against +#' and degrees of freedom of the test. +#' @rdname spark.kstest +#' @aliases summary,KSTest-method +#' @export +#' @note summary(KSTest) since 2.1.0 +setMethod("summary", signature(object = "KSTest"), + function(object) { + jobj <- object@jobj + pValue <- callJMethod(jobj, "pValue") + statistic <- callJMethod(jobj, "statistic") + nullHypothesis <- callJMethod(jobj, "nullHypothesis") + distName <- callJMethod(jobj, "distName") + distParams <- unlist(callJMethod(jobj, "distParams")) + degreesOfFreedom <- callJMethod(jobj, "degreesOfFreedom") + + list(p.value = pValue, statistic = statistic, nullHypothesis = nullHypothesis, + nullHypothesis.name = distName, nullHypothesis.parameters = distParams, + degreesOfFreedom = degreesOfFreedom) + }) + +# Prints the summary of GeneralizedLinearRegressionModel + +#' @rdname spark.kstest +#' @param x test result object of KS. +#' @export +#' @note print.summary.KSTest since 2.1.0 +print.summary.KSTest <- function(x, ...) { + jobj <- x@jobj + summaryStr <- callJMethod(jobj, "summary") + cat(summaryStr) + invisible(summaryStr) +} diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 1e6da650d1bb8..ace76727117f0 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -736,4 +736,24 @@ test_that("spark.als", { unlink(modelPath) }) +test_that("spark.kstest", { + data <- data.frame(test = c(0.1, 0.15, 0.2, 0.3, 0.25, -1, -0.5)) + df <- createDataFrame(data) + testResult <- spark.kstest(df, "test", "norm") + stats <- summary(testResult) + + rStats <- ks.test(data$test, "pnorm", alternative = "two.sided") + + expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4) + expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4) + + testResult <- spark.kstest(df, "test", "norm", -0.5) + stats <- summary(testResult) + + rStats <- ks.test(data$test, "pnorm", -0.5, 1, alternative = "two.sided") + + expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4) + expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4) +}) + sparkR.session.stop() diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KSTestWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/KSTestWrapper.scala new file mode 100644 index 0000000000000..21531eb057ad3 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/r/KSTestWrapper.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.r + +import org.apache.spark.mllib.stat.Statistics.kolmogorovSmirnovTest +import org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult +import org.apache.spark.sql.{DataFrame, Row} + +private[r] class KSTestWrapper private ( + val testResult: KolmogorovSmirnovTestResult, + val distName: String, + val distParams: Array[Double]) { + + lazy val pValue = testResult.pValue + + lazy val statistic = testResult.statistic + + lazy val nullHypothesis = testResult.nullHypothesis + + lazy val degreesOfFreedom = testResult.degreesOfFreedom + + def summary: String = testResult.toString +} + +private[r] object KSTestWrapper { + + def test( + data: DataFrame, + featureName: String, + distName: String, + distParams: Array[Double]): KSTestWrapper = { + + val rddData = data.select(featureName).rdd.map { + case Row(feature: Double) => feature + } + + val ksTestResult = kolmogorovSmirnovTest(rddData, distName, distParams : _*) + + new KSTestWrapper(ksTestResult, distName, distParams) + } +} + From 0a8ff8004dad4774f5f196a1437a7f4fa079c9d7 Mon Sep 17 00:00:00 2001 From: Junyang Qian Date: Thu, 1 Sep 2016 11:13:51 -0700 Subject: [PATCH 2/4] Fix docs. Add test for print function. --- R/pkg/R/mllib.R | 11 ++++------- R/pkg/inst/tests/testthat/test_mllib.R | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 3778833779fc8..e5cf6121574c6 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -1329,11 +1329,6 @@ setMethod("write.ml", signature(object = "ALSModel", path = "character"), #' Users can call \code{summary} to obtain a summary of the test, and \code{print.summary.KSTest} #' to print out a summary result. #' -#' @details -#' For more details, see -#' \href{http://spark.apache.org/docs/latest/mllib-statistics.html#hypothesis-testing}{ -#' MLlib: Hypothesis Testing}. -#' #' @param data a SparkDataFrame of user data. #' @param testCol column name where the test data is from. It should be a column of double type. #' @param nullHypothesis name of the theoretical distribution tested against. Currently only @@ -1347,6 +1342,8 @@ setMethod("write.ml", signature(object = "ALSModel", path = "character"), #' @rdname spark.kstest #' @aliases spark.kstest,SparkDataFrame-method #' @name spark.kstest +#' @seealso \href{http://spark.apache.org/docs/latest/mllib-statistics.html#hypothesis-testing}{ +#' MLlib: Hypothesis Testing} #' @export #' @examples #' \dontrun{ @@ -1404,10 +1401,10 @@ setMethod("summary", signature(object = "KSTest"), degreesOfFreedom = degreesOfFreedom) }) -# Prints the summary of GeneralizedLinearRegressionModel +# Prints the summary of KSTest #' @rdname spark.kstest -#' @param x test result object of KS. +#' @param x test result object of \code{spark.kstest}. #' @export #' @note print.summary.KSTest since 2.1.0 print.summary.KSTest <- function(x, ...) { diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index ace76727117f0..51d59ba597906 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -747,6 +747,13 @@ test_that("spark.kstest", { expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4) expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4) + printStr <- print.summary.KSTest(testResult) + expect_match(printStr, paste0("Kolmogorov-Smirnov test summary:\\n", + "degrees of freedom = 0 \\n", + "statistic = 0.38208[0-9]* \\n", + "pValue = 0.19849[0-9]* \\n", + ".*"), perl = TRUE) + testResult <- spark.kstest(df, "test", "norm", -0.5) stats <- summary(testResult) @@ -754,6 +761,13 @@ test_that("spark.kstest", { expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4) expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4) + + printStr <- print.summary.KSTest(testResult) + expect_match(printStr, paste0("Kolmogorov-Smirnov test summary:\\n", + "degrees of freedom = 0 \\n", + "statistic = 0.44003[0-9]* \\n", + "pValue = 0.09470[0-9]* \\n", + ".*"), perl = TRUE) }) sparkR.session.stop() From 78cfdcf5d45901a66d3efbcfb38d2840926b0801 Mon Sep 17 00:00:00 2001 From: Junyang Qian Date: Fri, 2 Sep 2016 10:40:01 -0700 Subject: [PATCH 3/4] revert doc change --- R/pkg/R/mllib.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index e5cf6121574c6..fb407187d765e 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -1378,7 +1378,7 @@ setMethod("spark.kstest", signature(data = "SparkDataFrame"), }) # Get the summary of Kolmogorov-Smirnov (KS) Test. -#' @param object test result object of KS. +#' @param object test result object of KSTest. #' @return \code{summary} returns a list containing the p-value, test statistic computed for the #' test, the null hypothesis with its parameters tested against #' and degrees of freedom of the test. @@ -1404,7 +1404,7 @@ setMethod("summary", signature(object = "KSTest"), # Prints the summary of KSTest #' @rdname spark.kstest -#' @param x test result object of \code{spark.kstest}. +#' @param x test result object of KSTest. #' @export #' @note print.summary.KSTest since 2.1.0 print.summary.KSTest <- function(x, ...) { From caeb91eb42ec47efd428c9a174d9d54c45f290fb Mon Sep 17 00:00:00 2001 From: Junyang Qian Date: Fri, 2 Sep 2016 11:22:57 -0700 Subject: [PATCH 4/4] update param doc --- R/pkg/R/mllib.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index fb407187d765e..dbe53d4efd000 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -1378,7 +1378,7 @@ setMethod("spark.kstest", signature(data = "SparkDataFrame"), }) # Get the summary of Kolmogorov-Smirnov (KS) Test. -#' @param object test result object of KSTest. +#' @param object test result object of KSTest by \code{spark.kstest}. #' @return \code{summary} returns a list containing the p-value, test statistic computed for the #' test, the null hypothesis with its parameters tested against #' and degrees of freedom of the test. @@ -1404,7 +1404,7 @@ setMethod("summary", signature(object = "KSTest"), # Prints the summary of KSTest #' @rdname spark.kstest -#' @param x test result object of KSTest. +#' @param x test result object of KSTest by \code{spark.kstest}. #' @export #' @note print.summary.KSTest since 2.1.0 print.summary.KSTest <- function(x, ...) {