diff --git a/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala deleted file mode 100644 index 050ae4b447ac9..0000000000000 --- a/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import scala.reflect.ClassTag -import java.util.Random - -import cern.jet.random.Poisson -import cern.jet.random.engine.DRand - -import org.apache.spark.{Partition, TaskContext} -import org.apache.spark.util.random.BernoulliSampler - -private[spark] -class FoldedRDDPartition(val prev: Partition, val seed: Int) extends Partition with Serializable { - override val index: Int = prev.index -} - -class FoldedRDD[T: ClassTag]( - prev: RDD[T], - fold: Float, - folds: Float, - seed: Int) - extends PartitionwiseSampledRDD[T, T](prev, - new BernoulliSampler((fold-1)/folds,fold/folds, false), seed) { -} - -/** - * A companion class to FoldedRDD which contains all of the elements not in the fold for the same - * fold/seed combination. Useful for cross validation - */ -class CompositeFoldedRDD[T: ClassTag]( - prev: RDD[T], - fold: Float, - folds: Float, - seed: Int) - extends PartitionwiseSampledRDD[T, T](prev, - new BernoulliSampler((fold-1)/folds, fold/folds, true), seed) { -} diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index b5197cebba022..bf3c57ad41eb2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -334,17 +334,6 @@ abstract class RDD[T: ClassTag]( }.toArray } - /** - * Return a k element list of pairs of RDDs with the first element of each pair - * containing a unique 1/Kth of the data and the second element contain the composite of that. - */ - def kFoldRdds(folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = { - 1.to(folds).map(fold => (( - new FoldedRDD(this, fold, folds, seed), - new CompositeFoldedRDD(this, fold, folds, seed) - ))).toList - } - def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = { var fraction = 0.0 var total = 0 diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 9466c94d2208a..25973348a7837 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -513,37 +513,6 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } - test("FoldedRDD") { - val data = sc.parallelize(1 to 100, 2) - val lowerFoldedRdd = new FoldedRDD(data, 1, 2, 1) - val upperFoldedRdd = new FoldedRDD(data, 2, 2, 1) - val lowerCompositeFoldedRdd = new CompositeFoldedRDD(data, 1, 2, 1) - assert(lowerFoldedRdd.collect().sorted.size == 50) - assert(lowerCompositeFoldedRdd.collect().sorted.size == 50) - assert(lowerFoldedRdd.subtract(lowerCompositeFoldedRdd).collect().sorted === - lowerFoldedRdd.collect().sorted) - assert(upperFoldedRdd.collect().sorted.size == 50) - } - - test("kfoldRdd") { - val data = sc.parallelize(1 to 100, 2) - val collectedData = data.collect().sorted - for (folds <- 2 to 10) { - for (seed <- 1 to 5) { - val foldedRdds = data.kFoldRdds(folds, seed) - assert(foldedRdds.size === folds) - foldedRdds.map{case (test, train) => - val result = test.union(train).collect().sorted - assert(result === collectedData, - "Each training+test set combined contains all of the data") - } - // K fold cross validation should only have each element in the test set exactly once - assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted === - data.collect().sorted) - } - } - } - test("runJob on an invalid partition") { intercept[IllegalArgumentException] { sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index cad257290542b..22f2ad6398617 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -22,6 +22,15 @@ import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV, import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import scala.reflect._ + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.PartitionwiseSampledRDD +import org.apache.spark.SparkContext._ +import org.apache.spark.util.random.BernoulliSampler + +import org.jblas.DoubleMatrix import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.RegressionModel @@ -176,6 +185,21 @@ object MLUtils { (a-b)*(a-b) } + /** + * Return a k element list of pairs of RDDs with the first element of each pair + * containing a unique 1/Kth of the data and the second element contain the composite of that. + */ + def kFoldRdds[T : ClassTag](rdd: RDD[T], folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = { + val foldsF = folds.toFloat + 1.to(folds).map(fold => (( + new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, false), + seed), + new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, true), + seed) + ))).toList + } + + /** * Function to perform cross validation on a single learner. * @@ -192,14 +216,14 @@ object MLUtils { if (folds <= 1) { throw new IllegalArgumentException("Cross validation requires more than one fold") } - val rdds = data.kFoldRdds(folds, seed) + val rdds = kFoldRdds(data, folds, seed) val errorRates = rdds.map{case (testData, trainingData) => val model = learner(trainingData) - val predictions = model.predict(testData.map(_.features)) - val errors = predictions.zip(testData.map(_.label)).map{case (x,y) => errorFunction(x,y)} + val predictions = testData.map(data => (data.label, model.predict(data.features))) + val errors = predictions.map{case (x, y) => errorFunction(x, y)} errors.sum() } - val averageError = errorRates.sum / data.count + val averageError = errorRates.sum / data.count.toFloat averageError } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 5ea55bce47908..bea65984f47f0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -43,7 +43,6 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { System.clearProperty("spark.driver.port") } - test("epsilon computation") { assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.") assert(1.0 + EPSILON / 2.0 === 1.0, s"EPSILON is too big: $EPSILON.") @@ -137,20 +136,11 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { new LinearRegressionModel(Array(1.0), 0) } - test("Test cross validation with a terrible learner") { - val data = sc.parallelize(1.to(100).zip(1.to(100))).map( - x => LabeledPoint(x._1, Array(x._2))) - val expectedError = 1.to(100).map(x => x*x).sum / 100.0 - for (seed <- 1 to 5) { - for (folds <- 2 to 5) { - val avgError = MLUtils.crossValidate(data, folds, seed, terribleLearner) - avgError should equal (expectedError) - } - } - } test("Test cross validation with a reasonable learner") { val data = sc.parallelize(1.to(100).zip(1.to(100))).map( x => LabeledPoint(x._1, Array(x._2))) + val features = data.map(_.features) + val labels = data.map(_.label) for (seed <- 1 to 5) { for (folds <- 2 to 5) { val avgError = MLUtils.crossValidate(data, folds, seed, exactLearner) @@ -163,8 +153,33 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { val data = sc.parallelize(1.to(100).zip(1.to(100))).map( x => LabeledPoint(x._1, Array(x._2))) val thrown = intercept[java.lang.IllegalArgumentException] { - val avgError = MLUtils.crossValidate(data, 1, 1, terribleLearner) + val avgError = MLUtils.crossValidate(data, 1, 1, exactLearner) } assert(thrown.getClass === classOf[IllegalArgumentException]) } + + test("kfoldRdd") { + val data = sc.parallelize(1 to 100, 2) + val collectedData = data.collect().sorted + val twoFoldedRdd = MLUtils.kFoldRdds(data, 2, 1) + assert(twoFoldedRdd(0)._1.collect().sorted === twoFoldedRdd(1)._2.collect().sorted) + assert(twoFoldedRdd(0)._2.collect().sorted === twoFoldedRdd(1)._1.collect().sorted) + for (folds <- 2 to 10) { + for (seed <- 1 to 5) { + val foldedRdds = MLUtils.kFoldRdds(data, folds, seed) + assert(foldedRdds.size === folds) + foldedRdds.map{case (test, train) => + val result = test.union(train).collect().sorted + assert(test.collect().size > 0, "Non empty test data") + assert(train.collect().size > 0, "Non empty training data") + assert(result === collectedData, + "Each training+test set combined contains all of the data") + } + // K fold cross validation should only have each element in the test set exactly once + assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted === + data.collect().sorted) + } + } + } + }