Skip to content

Commit

Permalink
Consolidate things in mlutils
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Apr 9, 2014
1 parent 264502a commit 91eae64
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 113 deletions.
54 changes: 0 additions & 54 deletions core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala

This file was deleted.

11 changes: 0 additions & 11 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,6 @@ abstract class RDD[T: ClassTag](
}.toArray
}

/**
* Return a k element list of pairs of RDDs with the first element of each pair
* containing a unique 1/Kth of the data and the second element contain the composite of that.
*/
def kFoldRdds(folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = {
1.to(folds).map(fold => ((
new FoldedRDD(this, fold, folds, seed),
new CompositeFoldedRDD(this, fold, folds, seed)
))).toList
}

def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
var fraction = 0.0
var total = 0
Expand Down
31 changes: 0 additions & 31 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -513,37 +513,6 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}

test("FoldedRDD") {
val data = sc.parallelize(1 to 100, 2)
val lowerFoldedRdd = new FoldedRDD(data, 1, 2, 1)
val upperFoldedRdd = new FoldedRDD(data, 2, 2, 1)
val lowerCompositeFoldedRdd = new CompositeFoldedRDD(data, 1, 2, 1)
assert(lowerFoldedRdd.collect().sorted.size == 50)
assert(lowerCompositeFoldedRdd.collect().sorted.size == 50)
assert(lowerFoldedRdd.subtract(lowerCompositeFoldedRdd).collect().sorted ===
lowerFoldedRdd.collect().sorted)
assert(upperFoldedRdd.collect().sorted.size == 50)
}

test("kfoldRdd") {
val data = sc.parallelize(1 to 100, 2)
val collectedData = data.collect().sorted
for (folds <- 2 to 10) {
for (seed <- 1 to 5) {
val foldedRdds = data.kFoldRdds(folds, seed)
assert(foldedRdds.size === folds)
foldedRdds.map{case (test, train) =>
val result = test.union(train).collect().sorted
assert(result === collectedData,
"Each training+test set combined contains all of the data")
}
// K fold cross validation should only have each element in the test set exactly once
assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted ===
data.collect().sorted)
}
}
}

test("runJob on an invalid partition") {
intercept[IllegalArgumentException] {
sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false)
Expand Down
32 changes: 28 additions & 4 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV,

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.reflect._

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PartitionwiseSampledRDD
import org.apache.spark.SparkContext._
import org.apache.spark.util.random.BernoulliSampler

import org.jblas.DoubleMatrix
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.RegressionModel
Expand Down Expand Up @@ -176,6 +185,21 @@ object MLUtils {
(a-b)*(a-b)
}

/**
* Return a k element list of pairs of RDDs with the first element of each pair
* containing a unique 1/Kth of the data and the second element contain the composite of that.
*/
def kFoldRdds[T : ClassTag](rdd: RDD[T], folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = {
val foldsF = folds.toFloat
1.to(folds).map(fold => ((
new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, false),
seed),
new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, true),
seed)
))).toList
}


/**
* Function to perform cross validation on a single learner.
*
Expand All @@ -192,14 +216,14 @@ object MLUtils {
if (folds <= 1) {
throw new IllegalArgumentException("Cross validation requires more than one fold")
}
val rdds = data.kFoldRdds(folds, seed)
val rdds = kFoldRdds(data, folds, seed)
val errorRates = rdds.map{case (testData, trainingData) =>
val model = learner(trainingData)
val predictions = model.predict(testData.map(_.features))
val errors = predictions.zip(testData.map(_.label)).map{case (x,y) => errorFunction(x,y)}
val predictions = testData.map(data => (data.label, model.predict(data.features)))
val errors = predictions.map{case (x, y) => errorFunction(x, y)}
errors.sum()
}
val averageError = errorRates.sum / data.count
val averageError = errorRates.sum / data.count.toFloat
averageError
}

Expand Down
41 changes: 28 additions & 13 deletions mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
System.clearProperty("spark.driver.port")
}


test("epsilon computation") {
assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.")
assert(1.0 + EPSILON / 2.0 === 1.0, s"EPSILON is too big: $EPSILON.")
Expand Down Expand Up @@ -137,20 +136,11 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
new LinearRegressionModel(Array(1.0), 0)
}

test("Test cross validation with a terrible learner") {
val data = sc.parallelize(1.to(100).zip(1.to(100))).map(
x => LabeledPoint(x._1, Array(x._2)))
val expectedError = 1.to(100).map(x => x*x).sum / 100.0
for (seed <- 1 to 5) {
for (folds <- 2 to 5) {
val avgError = MLUtils.crossValidate(data, folds, seed, terribleLearner)
avgError should equal (expectedError)
}
}
}
test("Test cross validation with a reasonable learner") {
val data = sc.parallelize(1.to(100).zip(1.to(100))).map(
x => LabeledPoint(x._1, Array(x._2)))
val features = data.map(_.features)
val labels = data.map(_.label)
for (seed <- 1 to 5) {
for (folds <- 2 to 5) {
val avgError = MLUtils.crossValidate(data, folds, seed, exactLearner)
Expand All @@ -163,8 +153,33 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
val data = sc.parallelize(1.to(100).zip(1.to(100))).map(
x => LabeledPoint(x._1, Array(x._2)))
val thrown = intercept[java.lang.IllegalArgumentException] {
val avgError = MLUtils.crossValidate(data, 1, 1, terribleLearner)
val avgError = MLUtils.crossValidate(data, 1, 1, exactLearner)
}
assert(thrown.getClass === classOf[IllegalArgumentException])
}

test("kfoldRdd") {
val data = sc.parallelize(1 to 100, 2)
val collectedData = data.collect().sorted
val twoFoldedRdd = MLUtils.kFoldRdds(data, 2, 1)
assert(twoFoldedRdd(0)._1.collect().sorted === twoFoldedRdd(1)._2.collect().sorted)
assert(twoFoldedRdd(0)._2.collect().sorted === twoFoldedRdd(1)._1.collect().sorted)
for (folds <- 2 to 10) {
for (seed <- 1 to 5) {
val foldedRdds = MLUtils.kFoldRdds(data, folds, seed)
assert(foldedRdds.size === folds)
foldedRdds.map{case (test, train) =>
val result = test.union(train).collect().sorted
assert(test.collect().size > 0, "Non empty test data")
assert(train.collect().size > 0, "Non empty training data")
assert(result === collectedData,
"Each training+test set combined contains all of the data")
}
// K fold cross validation should only have each element in the test set exactly once
assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted ===
data.collect().sorted)
}
}
}

}

0 comments on commit 91eae64

Please sign in to comment.