From 888144416ced2b6d4c4839ac95b8a3feb2b3aba1 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 11 Jul 2014 18:02:01 -0700 Subject: [PATCH 01/10] RandomRDDGenerator: initial design Looking for feedback on design decisions. Very rough draft and untested. --- .../apache/spark/mllib/rdd/RandomRDD.scala | 80 +++++++++ .../org/apache/spark/mllib/stat/Random.scala | 152 ++++++++++++++++++ 2 files changed, 232 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala new file mode 100644 index 0000000000000..0e9c659aee1cf --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.mllib.stat.Distribution +import org.apache.spark.util.Utils +import org.apache.spark.{TaskContext, Partition, SparkContext} +import org.apache.spark.rdd.RDD + +private[mllib] class RandomRDDPartition(val idx: Int, + val size: Long, + val distribution: Distribution, + val seed: Long) + extends Partition with Serializable { + + override val index: Int = idx + + private val rng = distribution + rng.setSeed(seed + idx) + + private val iter = new FixedSizeIterator(size, rng) + + def getIterator = iter +} + +private[mllib] class FixedSizeIterator(override val size: Long, val rng: Distribution) + extends Iterator[Double] { + + private var currentSize = 0 + + override def hasNext: Boolean = currentSize < size + + override def next(): Double = { + currentSize += 1 + rng.nextDouble() + } +} + +private[mllib] class RandomRDD(@transient private var sc: SparkContext, + numSlices: Int, + size: Long, + @transient distribution: Distribution, + @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { + val split = splitIn.asInstanceOf[RandomRDDPartition] + split.getIterator + } + + override protected def getPartitions: Array[Partition] = { + val partitionSize = size / numSlices + val firstPartitionSize = numSlices - partitionSize * numSlices + val partitions = new Array[RandomRDDPartition](numSlices) + var i = 0 + while (i < numSlices) { + partitions(i) = if (i == 0) { + new RandomRDDPartition(i, firstPartitionSize, distribution, seed) + } else { + new RandomRDDPartition(i, partitionSize, distribution.copy, seed) + } + i += 1 + } + partitions.asInstanceOf[Array[Partition]] + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala new file mode 100644 index 0000000000000..9e614354b9cb6 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat + +import cern.jet.random.engine.{DRand} +import cern.jet.random.Poisson +import org.apache.spark.SparkContext +import org.apache.spark.mllib.rdd.RandomRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils +import org.apache.spark.mllib.linalg.Vector + +import scala.util.Random + +class RandomRDDGenerators(sc: SparkContext) { +// made it into a class instead of an object so the sc only needs to be set once + + def normalRDD(numDataPoints: Long, + numPartitions: Int = 1, + mean: Double, + variance: Double, + seed: Long = Utils.random.nextLong): RDD[Double] = { + val normal = new NormalDistribution(mean, variance) + randomRDD(numDataPoints, numPartitions, normal, seed) + } + + def poissonRDD(numDataPoints: Long, + numPartitions: Int = 1, + mean: Double, + seed: Long = Utils.random.nextLong) : RDD[Double] = { + val poisson = new PoissonDistribution(mean) + randomRDD(numDataPoints, numPartitions, poisson, seed) + } + + def randomRDD(numDataPoints: Long, + numPartitions: Int = 1, + distribution: Distribution, + seed: Long = Utils.random.nextLong) : RDD[Double] = { + new RandomRDD(sc, numPartitions, numDataPoints, distribution, seed) + } + + def normalVectorRDD(numRows: Int, + numColumns: Int, + numPartitions: Int = 1, + mean: Double, + variance: Double, + seed: Long = Utils.random.nextLong): RDD[Vector] = ??? + + def poissonVectorRDD(numRows: Int, + numColumns: Int, + numPartitions: Int = 1, + mean: Double, + seed: Long = Utils.random.nextLong): RDD[Vector] = ??? + + def randomVectorRDD(numRows: Int, + numColumns: Int, + numPartitions: Int = 1, + rng: Distribution, + seed: Long = Utils.random.nextLong): RDD[Vector] = ??? + +} + +trait Distribution { + + /** + * Get a randomly generated value from the distribution + */ + def nextDouble(): Double + + /** + * Set the seed for the underlying random number generator + */ + def setSeed(seed: Long): Unit + + /** + * Make a copy of this distribution object. + * + * This is essential because most random number generator implementations are locking, + * but we need to be able to invoke nextDouble in parallel for each partition. + */ + def copy(): Distribution +} + +class NormalDistribution(val mean: Double, val variance: Double) extends Distribution { + + require(variance >= 0.0, "Variance cannot be negative.") + + private val random = new Random() + private val stddev = math.sqrt(variance) + + /** + * Get a randomly generated value from the distribution + */ + override def nextDouble(): Double = random.nextGaussian() * stddev + mean + + /** + * Set the seed for the underlying random number generator + */ + override def setSeed(seed: Long): Unit = random.setSeed(seed) + + /** + * Make a copy of this distribution object. + * + * This is essential because most random number generator implementations are locking, + * but we need to be able to invoke nextDouble in parallel for each partition. + */ + override def copy(): Distribution = new NormalDistribution(mean, variance) +} + + +class PoissonDistribution(val mean: Double) extends Distribution { + + private var random = new DRand() + private var poisson = new Poisson(mean, random) + + /** + * Get a randomly generated value from the distribution + */ + override def nextDouble(): Double = poisson.nextDouble() + + /** + * Set the seed for the underlying random number generator + */ + override def setSeed(seed: Long): Unit = { + //This is kind of questionable. Better suggestions? + random = new DRand(seed.toInt) + poisson = new Poisson(mean, random) + } + + /** + * Make a copy of this distribution object. + * + * This is essential because most random number generator implementations are locking, + * but we need to be able to invoke nextDouble in parallel for each partition. + */ + override def copy(): Distribution = new PoissonDistribution(mean) +} \ No newline at end of file From 7cb0e406793db493cee72cb91ec02475c95c8de7 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 11 Jul 2014 18:15:56 -0700 Subject: [PATCH 02/10] fix for data inconsistency --- .../org/apache/spark/mllib/rdd/RandomRDD.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala index 0e9c659aee1cf..a731fba158eb8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala @@ -30,12 +30,13 @@ private[mllib] class RandomRDDPartition(val idx: Int, override val index: Int = idx - private val rng = distribution - rng.setSeed(seed + idx) - - private val iter = new FixedSizeIterator(size, rng) - - def getIterator = iter + // The RNG has to be reset every time the iterator is requested to guarantee same data + // every time the content of the RDD is examined. + def getIterator = { + val newRng = distribution.copy() + newRng.setSeed(seed + idx) + new FixedSizeIterator(size, newRng) + } } private[mllib] class FixedSizeIterator(override val size: Long, val rng: Distribution) @@ -71,7 +72,7 @@ private[mllib] class RandomRDD(@transient private var sc: SparkContext, partitions(i) = if (i == 0) { new RandomRDDPartition(i, firstPartitionSize, distribution, seed) } else { - new RandomRDDPartition(i, partitionSize, distribution.copy, seed) + new RandomRDDPartition(i, partitionSize, distribution, seed) } i += 1 } From 49ed20d9a30b0ba5d809974bbcf48cc76a45d68e Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 11 Jul 2014 18:30:15 -0700 Subject: [PATCH 03/10] alternative poisson distribution generator --- .../org/apache/spark/mllib/stat/Random.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala index 9e614354b9cb6..538696237c687 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala @@ -149,4 +149,37 @@ class PoissonDistribution(val mean: Double) extends Distribution { * but we need to be able to invoke nextDouble in parallel for each partition. */ override def copy(): Distribution = new PoissonDistribution(mean) +} + +// alternative "in-house" implementation of Poisson without using colt +class PoissonDistributionNative(val mean: Double) extends Distribution { + + private val random = new Random() + + /** + * Get a randomly generated value from the distribution + */ + override def nextDouble(): Double = { + var k = 0 + var p = random.nextDouble() + val target = math.exp(-mean) + while (p > target) { + p *= random.nextDouble() + k += 1 + } + k + } + + /** + * Set the seed for the underlying random number generator + */ + override def setSeed(seed: Long): Unit = random.setSeed(seed) + + /** + * Make a copy of this distribution object. + * + * This is essential because most random number generator implementations are locking, + * but we need to be able to invoke nextDouble in parallel for each partition. + */ + override def copy(): Distribution = new PoissonDistributionNative(mean) } \ No newline at end of file From f46d928c4e3e71ced4ede9295ef645fb714c9a69 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 18 Jul 2014 19:13:58 -0700 Subject: [PATCH 04/10] WIP --- .../mllib/random/DistributionGenerator.scala | 72 +++++++++++ .../apache/spark/mllib/rdd/RandomRDD.scala | 15 +-- .../org/apache/spark/mllib/stat/Random.scala | 122 +----------------- 3 files changed, 84 insertions(+), 125 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala new file mode 100644 index 0000000000000..f722a6e50b304 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import scala.util.Random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.util.random.Pseudorandom + +/** + * Trait for random number generators that generate i.i.d values from a distribution + */ +trait DistributionGenerator extends Pseudorandom with Cloneable { + + /** + * @return An i.i.d sample as a Double from an underlying distribution + */ + def nextValue(): Double + + /** + * @return A copy of the current DistributionGenerator object + */ + def clone(): DistributionGenerator + +} + +class NormalGenerator(val mean: Double = 0.0, val stddev: Double = 1.0) + extends DistributionGenerator { + + require(stddev >= 0.0, "Standard deviation cannot be negative.") + + private val random = new Random() + + /** + * @return An i.i.d sample as a Double from the Normal distribution + */ + override def nextValue(): Double = random.nextGaussian() + + /** Set random seed. */ + override def setSeed(seed: Long) = random.setSeed(seed) +} + +class PoissonGenerator(val lambda: Double = 0.0) extends DistributionGenerator { + + private var rng = new Poisson(lambda, new DRand) + /** + * @return An i.i.d sample as a Double from the Poisson distribution + */ + override def nextValue(): Double = rng.nextDouble() + + /** Set random seed. */ + override def setSeed(seed: Long) { + rng = new Poisson(lambda, new DRand(seed.toInt)) + } +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala index a731fba158eb8..317ba619a3a5d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala @@ -17,29 +17,28 @@ package org.apache.spark.mllib.rdd -import org.apache.spark.mllib.stat.Distribution +import org.apache.spark.mllib.random.DistributionGenerator import org.apache.spark.util.Utils import org.apache.spark.{TaskContext, Partition, SparkContext} import org.apache.spark.rdd.RDD private[mllib] class RandomRDDPartition(val idx: Int, val size: Long, - val distribution: Distribution, - val seed: Long) - extends Partition with Serializable { + val distribution: DistributionGenerator, + val seed: Long) extends Partition { override val index: Int = idx // The RNG has to be reset every time the iterator is requested to guarantee same data // every time the content of the RDD is examined. def getIterator = { - val newRng = distribution.copy() + val newRng: DistributionGenerator = distribution.clone() newRng.setSeed(seed + idx) new FixedSizeIterator(size, newRng) } } -private[mllib] class FixedSizeIterator(override val size: Long, val rng: Distribution) +private[mllib] class FixedSizeIterator(override val size: Long, val rng: DistributionGenerator) extends Iterator[Double] { private var currentSize = 0 @@ -48,14 +47,14 @@ private[mllib] class FixedSizeIterator(override val size: Long, val rng: Distrib override def next(): Double = { currentSize += 1 - rng.nextDouble() + rng.nextValue() } } private[mllib] class RandomRDD(@transient private var sc: SparkContext, numSlices: Int, size: Long, - @transient distribution: Distribution, + @transient distribution: DistributionGenerator, @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala index 538696237c687..be207d63c7659 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala @@ -17,16 +17,13 @@ package org.apache.spark.mllib.stat -import cern.jet.random.engine.{DRand} -import cern.jet.random.Poisson import org.apache.spark.SparkContext +import org.apache.spark.mllib.random.{DistributionGenerator, PoissonGenerator, NormalGenerator} import org.apache.spark.mllib.rdd.RandomRDD import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils import org.apache.spark.mllib.linalg.Vector -import scala.util.Random - class RandomRDDGenerators(sc: SparkContext) { // made it into a class instead of an object so the sc only needs to be set once @@ -35,7 +32,7 @@ class RandomRDDGenerators(sc: SparkContext) { mean: Double, variance: Double, seed: Long = Utils.random.nextLong): RDD[Double] = { - val normal = new NormalDistribution(mean, variance) + val normal = new NormalGenerator()(mean, variance) randomRDD(numDataPoints, numPartitions, normal, seed) } @@ -43,13 +40,13 @@ class RandomRDDGenerators(sc: SparkContext) { numPartitions: Int = 1, mean: Double, seed: Long = Utils.random.nextLong) : RDD[Double] = { - val poisson = new PoissonDistribution(mean) + val poisson = new PoissonGenerator(mean) randomRDD(numDataPoints, numPartitions, poisson, seed) } def randomRDD(numDataPoints: Long, numPartitions: Int = 1, - distribution: Distribution, + distribution: DistributionGenerator, seed: Long = Utils.random.nextLong) : RDD[Double] = { new RandomRDD(sc, numPartitions, numDataPoints, distribution, seed) } @@ -70,116 +67,7 @@ class RandomRDDGenerators(sc: SparkContext) { def randomVectorRDD(numRows: Int, numColumns: Int, numPartitions: Int = 1, - rng: Distribution, + rng: DistributionGenerator, seed: Long = Utils.random.nextLong): RDD[Vector] = ??? -} - -trait Distribution { - - /** - * Get a randomly generated value from the distribution - */ - def nextDouble(): Double - - /** - * Set the seed for the underlying random number generator - */ - def setSeed(seed: Long): Unit - - /** - * Make a copy of this distribution object. - * - * This is essential because most random number generator implementations are locking, - * but we need to be able to invoke nextDouble in parallel for each partition. - */ - def copy(): Distribution -} - -class NormalDistribution(val mean: Double, val variance: Double) extends Distribution { - - require(variance >= 0.0, "Variance cannot be negative.") - - private val random = new Random() - private val stddev = math.sqrt(variance) - - /** - * Get a randomly generated value from the distribution - */ - override def nextDouble(): Double = random.nextGaussian() * stddev + mean - - /** - * Set the seed for the underlying random number generator - */ - override def setSeed(seed: Long): Unit = random.setSeed(seed) - - /** - * Make a copy of this distribution object. - * - * This is essential because most random number generator implementations are locking, - * but we need to be able to invoke nextDouble in parallel for each partition. - */ - override def copy(): Distribution = new NormalDistribution(mean, variance) -} - - -class PoissonDistribution(val mean: Double) extends Distribution { - - private var random = new DRand() - private var poisson = new Poisson(mean, random) - - /** - * Get a randomly generated value from the distribution - */ - override def nextDouble(): Double = poisson.nextDouble() - - /** - * Set the seed for the underlying random number generator - */ - override def setSeed(seed: Long): Unit = { - //This is kind of questionable. Better suggestions? - random = new DRand(seed.toInt) - poisson = new Poisson(mean, random) - } - - /** - * Make a copy of this distribution object. - * - * This is essential because most random number generator implementations are locking, - * but we need to be able to invoke nextDouble in parallel for each partition. - */ - override def copy(): Distribution = new PoissonDistribution(mean) -} - -// alternative "in-house" implementation of Poisson without using colt -class PoissonDistributionNative(val mean: Double) extends Distribution { - - private val random = new Random() - - /** - * Get a randomly generated value from the distribution - */ - override def nextDouble(): Double = { - var k = 0 - var p = random.nextDouble() - val target = math.exp(-mean) - while (p > target) { - p *= random.nextDouble() - k += 1 - } - k - } - - /** - * Set the seed for the underlying random number generator - */ - override def setSeed(seed: Long): Unit = random.setSeed(seed) - - /** - * Make a copy of this distribution object. - * - * This is essential because most random number generator implementations are locking, - * but we need to be able to invoke nextDouble in parallel for each partition. - */ - override def copy(): Distribution = new PoissonDistributionNative(mean) } \ No newline at end of file From 92d6f1c3ca0f22371f7f0387b875ac16d5030ffb Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Mon, 21 Jul 2014 00:48:12 -0700 Subject: [PATCH 05/10] solution for Cloneable --- .../mllib/random/DistributionGenerator.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala index f722a6e50b304..0b721705f5d43 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala @@ -34,11 +34,7 @@ trait DistributionGenerator extends Pseudorandom with Cloneable { */ def nextValue(): Double - /** - * @return A copy of the current DistributionGenerator object - */ - def clone(): DistributionGenerator - + override def clone(): DistributionGenerator = super.clone().asInstanceOf[DistributionGenerator] } class NormalGenerator(val mean: Double = 0.0, val stddev: Double = 1.0) @@ -47,11 +43,18 @@ class NormalGenerator(val mean: Double = 0.0, val stddev: Double = 1.0) require(stddev >= 0.0, "Standard deviation cannot be negative.") private val random = new Random() + private val standard = mean == 0.0 && stddev == 1.0 /** * @return An i.i.d sample as a Double from the Normal distribution */ - override def nextValue(): Double = random.nextGaussian() + override def nextValue(): Double = { + if (standard) { + random.nextGaussian() + } else { + mean + stddev * random.nextGaussian() + } + } /** Set random seed. */ override def setSeed(seed: Long) = random.setSeed(seed) From d56cacbde7a0550f53b59696ad7c7014c827f3f7 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Mon, 21 Jul 2014 18:23:19 -0700 Subject: [PATCH 06/10] impl with RandomRDD --- .../mllib/random/DistributionGenerator.scala | 68 +++-- .../mllib/random/RandomRDDGenerators.scala | 233 ++++++++++++++++++ .../apache/spark/mllib/rdd/RandomRDD.scala | 113 +++++++-- .../org/apache/spark/mllib/stat/Random.scala | 73 ------ .../random/DistributionGeneratorSuite.scala | 84 +++++++ .../random/RandomRDDGeneratorsSuite.scala | 100 ++++++++ 6 files changed, 552 insertions(+), 119 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala index 0b721705f5d43..af4fcc736dc46 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala @@ -22,54 +22,84 @@ import scala.util.Random import cern.jet.random.Poisson import cern.jet.random.engine.DRand -import org.apache.spark.util.random.Pseudorandom +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} /** - * Trait for random number generators that generate i.i.d values from a distribution + * Trait for random number generators that generate i.i.d values from a distribution. */ -trait DistributionGenerator extends Pseudorandom with Cloneable { +trait DistributionGenerator extends Pseudorandom with Serializable { /** - * @return An i.i.d sample as a Double from an underlying distribution + * @return An i.i.d sample as a Double from an underlying distribution. */ def nextValue(): Double - override def clone(): DistributionGenerator = super.clone().asInstanceOf[DistributionGenerator] + /** + * @return A copy of the DistributionGenerator with a new instance of the rng object used in the + * class when applicable. Each partition has a unique seed and therefore requires its + * own instance of the DistributionGenerator. + */ + def newInstance(): DistributionGenerator } -class NormalGenerator(val mean: Double = 0.0, val stddev: Double = 1.0) - extends DistributionGenerator { +/** + * Generates i.i.d. samples from U[0.0, 1.0] + */ +class UniformGenerator() extends DistributionGenerator { + // XORShiftRandom for better performance. Thread safety isn't necessary here. + private val random = new XORShiftRandom() - require(stddev >= 0.0, "Standard deviation cannot be negative.") + /** + * @return An i.i.d sample as a Double from U[0.0, 1.0]. + */ + override def nextValue(): Double = { + random.nextDouble() + } - private val random = new Random() - private val standard = mean == 0.0 && stddev == 1.0 + /** Set random seed. */ + override def setSeed(seed: Long) = random.setSeed(seed) + + override def newInstance(): UniformGenerator = new UniformGenerator() +} + +/** + * Generates i.i.d. samples from the Standard Normal Distribution. + */ +class StandardNormalGenerator() extends DistributionGenerator { + // XORShiftRandom for better performance. Thread safety isn't necessary here. + private val random = new XORShiftRandom() /** - * @return An i.i.d sample as a Double from the Normal distribution + * @return An i.i.d sample as a Double from the Normal distribution. */ override def nextValue(): Double = { - if (standard) { random.nextGaussian() - } else { - mean + stddev * random.nextGaussian() - } } /** Set random seed. */ override def setSeed(seed: Long) = random.setSeed(seed) + + override def newInstance(): StandardNormalGenerator = new StandardNormalGenerator() } -class PoissonGenerator(val lambda: Double = 0.0) extends DistributionGenerator { +/** + * Generates i.i.d. samples from the Poisson distribution with the given mean. + * + * @param mean mean for the Poisson distribution. + */ +class PoissonGenerator(val mean: Double) extends DistributionGenerator { + + private var rng = new Poisson(mean, new DRand) - private var rng = new Poisson(lambda, new DRand) /** - * @return An i.i.d sample as a Double from the Poisson distribution + * @return An i.i.d sample as a Double from the Poisson distribution. */ override def nextValue(): Double = rng.nextDouble() /** Set random seed. */ override def setSeed(seed: Long) { - rng = new Poisson(lambda, new DRand(seed.toInt)) + rng = new Poisson(mean, new DRand(seed.toInt)) } + + override def newInstance(): PoissonGenerator = new PoissonGenerator(mean) } \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala new file mode 100644 index 0000000000000..1e36a5a38c9c8 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +// TODO add Scaladocs once API fully approved +object RandomRDDGenerators { + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { + val uniform = new UniformGenerator() + randomRDD(sc, size, numPartitions, uniform, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { + uniformRDD(sc, size, sc.defaultParallelism, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { + uniformRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = { + uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { + val normal = new StandardNormalGenerator() + randomRDD(sc, size, numPartitions, normal, seed) + } + + def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { + normalRDD(sc, size, sc.defaultParallelism, seed) + } + + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { + normalRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + def normalRDD(sc: SparkContext, size: Long): RDD[Double] = { + normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + def poissonRDD(sc: SparkContext, + size: Long, + numPartitions: Int, + mean: Double, + seed: Long): RDD[Double] = { + val poisson = new PoissonGenerator(mean) + randomRDD(sc, size, numPartitions, poisson, seed) + } + + def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): RDD[Double] = { + poissonRDD(sc, size, sc.defaultParallelism, mean, seed) + } + + def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: Double): RDD[Double] = { + poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong) + } + + def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] = { + poissonRDD(sc, size, sc.defaultParallelism, mean, Utils.random.nextLong) + } + + def randomRDD(sc: SparkContext, + size: Long, + numPartitions: Int, + distribution: DistributionGenerator, + seed: Long): RDD[Double] = { + new RandomRDD(sc, size, numPartitions, distribution, seed) + } + + def randomRDD(sc: SparkContext, + size: Long, + distribution: DistributionGenerator, + seed: Long): RDD[Double] = { + randomRDD(sc, size, sc.defaultParallelism, distribution, seed) + } + + def randomRDD(sc: SparkContext, + size: Long, + numPartitions: Int, + distribution: DistributionGenerator): RDD[Double] = { + randomRDD(sc, size, numPartitions, distribution, Utils.random.nextLong) + } + + def randomRDD(sc: SparkContext, + size: Long, + distribution: DistributionGenerator): RDD[Double] = { + randomRDD(sc, size, sc.defaultParallelism, distribution, Utils.random.nextLong) + } + + // TODO Generator RDD[Vector] from multivariate distribution + + def uniformVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + numPartitions: Int, + seed: Long): RDD[Vector] = { + val uniform = new UniformGenerator() + randomVectorRDD(sc, numRows, numColumns, numPartitions, uniform, seed) + } + + def uniformVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + seed: Long): RDD[Vector] = { + uniformVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, seed) + } + + def uniformVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + numPartitions: Int): RDD[Vector] = { + uniformVectorRDD(sc, numRows, numColumns, numPartitions, Utils.random.nextLong) + } + + def uniformVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int): RDD[Vector] = { + uniformVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, Utils.random.nextLong) + } + + def normalVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + numPartitions: Int, + seed: Long): RDD[Vector] = { + val uniform = new StandardNormalGenerator() + randomVectorRDD(sc, numRows, numColumns, numPartitions, uniform, seed) + } + + def normalVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + seed: Long): RDD[Vector] = { + normalVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, seed) + } + + def normalVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + numPartitions: Int): RDD[Vector] = { + normalVectorRDD(sc, numRows, numColumns, numPartitions, Utils.random.nextLong) + } + + def normalVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int): RDD[Vector] = { + normalVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, Utils.random.nextLong) + } + + def poissonVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + numPartitions: Int, + mean: Double, + seed: Long): RDD[Vector] = { + val poisson = new PoissonGenerator(mean) + randomVectorRDD(sc, numRows, numColumns, numPartitions, poisson, seed) + } + + def poissonVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + mean: Double, + seed: Long): RDD[Vector] = { + poissonVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, mean, seed) + } + + def poissonVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + numPartitions: Int, + mean: Double): RDD[Vector] = { + poissonVectorRDD(sc, numRows, numColumns, numPartitions, mean, Utils.random.nextLong) + } + + def poissonVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + mean: Double): RDD[Vector] = { + val poisson = new PoissonGenerator(mean) + randomVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, poisson, Utils.random.nextLong) + } + + def randomVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + numPartitions: Int, + rng: DistributionGenerator, + seed: Long): RDD[Vector] = { + new RandomVectorRDD(sc, numRows, numColumns, numPartitions, rng, seed) + } + + def randomVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + rng: DistributionGenerator, + seed: Long): RDD[Vector] = { + randomVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, rng, seed) + } + + def randomVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + numPartitions: Int, + rng: DistributionGenerator): RDD[Vector] = { + randomVectorRDD(sc, numRows, numColumns, numPartitions, rng, Utils.random.nextLong) + } + + def randomVectorRDD(sc: SparkContext, + numRows: Long, + numColumns: Int, + rng: DistributionGenerator): RDD[Vector] = { + randomVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, rng, Utils.random.nextLong) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala index 317ba619a3a5d..d888a7d066f7e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala @@ -17,64 +17,123 @@ package org.apache.spark.mllib.rdd +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} import org.apache.spark.mllib.random.DistributionGenerator -import org.apache.spark.util.Utils -import org.apache.spark.{TaskContext, Partition, SparkContext} import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils private[mllib] class RandomRDDPartition(val idx: Int, val size: Long, - val distribution: DistributionGenerator, + val rng: DistributionGenerator, val seed: Long) extends Partition { override val index: Int = idx - // The RNG has to be reset every time the iterator is requested to guarantee same data - // every time the content of the RDD is examined. - def getIterator = { - val newRng: DistributionGenerator = distribution.clone() - newRng.setSeed(seed + idx) - new FixedSizeIterator(size, newRng) - } } -private[mllib] class FixedSizeIterator(override val size: Long, val rng: DistributionGenerator) - extends Iterator[Double] { +private[mllib] class RandomRDD(@transient private var sc: SparkContext, + size: Long, + numSlices: Int, + @transient rng: DistributionGenerator, + @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { - private var currentSize = 0 + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") - override def hasNext: Boolean = currentSize < size + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { + val split = splitIn.asInstanceOf[RandomRDDPartition] + RandomRDD.getPointIterator(split) + } - override def next(): Double = { - currentSize += 1 - rng.nextValue() + override def getPartitions: Array[Partition] = { + RandomRDD.getPartitions(size, numSlices, rng, seed) } } -private[mllib] class RandomRDD(@transient private var sc: SparkContext, - numSlices: Int, +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, size: Long, - @transient distribution: DistributionGenerator, - @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + vectorSize: Int, + numSlices: Int, + @transient rng: DistributionGenerator, + @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { - override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { val split = splitIn.asInstanceOf[RandomRDDPartition] - split.getIterator + RandomRDD.getVectorIterator(split, vectorSize) } override protected def getPartitions: Array[Partition] = { - val partitionSize = size / numSlices - val firstPartitionSize = numSlices - partitionSize * numSlices + RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) + extends Iterator[Double] { + + private var currentSize = 0 + + override def hasNext: Boolean = currentSize < numElem + + override def next(): Double = { + currentSize += 1 + rng.nextValue() + } + } + + private[mllib] class FixedSizeVectorIterator(val numElem: Long, + val vectorSize: Int, + val rng: DistributionGenerator) + extends Iterator[Vector] { + + private var currentSize = 0 + + override def hasNext: Boolean = currentSize < numElem + + override def next(): Vector = { + currentSize += 1 + new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) + } + } + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + // TODO fix rounding issue + val firstPartitionSize = if (size % numSlices == 0) size / numSlices else size % numSlices + val otherPartitionSize = math.ceil((size - firstPartitionSize) / (numSlices - 1)).toInt + val partitions = new Array[RandomRDDPartition](numSlices) var i = 0 while (i < numSlices) { partitions(i) = if (i == 0) { - new RandomRDDPartition(i, firstPartitionSize, distribution, seed) + new RandomRDDPartition(i, firstPartitionSize, rng, seed) } else { - new RandomRDDPartition(i, partitionSize, distribution, seed) + new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed) } i += 1 } partitions.asInstanceOf[Array[Partition]] } + + // The RNG has to be reset every time the iterator is requested to guarantee same data + // every time the content of the RDD is examined. + def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = { + partition.rng.setSeed(partition.seed + partition.index) + new FixedSizePointIterator(partition.size, partition.rng) + } + + // The RNG has to be reset every time the iterator is requested to guarantee same data + // every time the content of the RDD is examined. + def getVectorIterator(partition: RandomRDDPartition, vectorSize: Int): Iterator[Vector] = { + partition.rng.setSeed(partition.seed + partition.index) + new FixedSizeVectorIterator(partition.size, vectorSize, partition.rng) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala deleted file mode 100644 index be207d63c7659..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Random.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.stat - -import org.apache.spark.SparkContext -import org.apache.spark.mllib.random.{DistributionGenerator, PoissonGenerator, NormalGenerator} -import org.apache.spark.mllib.rdd.RandomRDD -import org.apache.spark.rdd.RDD -import org.apache.spark.util.Utils -import org.apache.spark.mllib.linalg.Vector - -class RandomRDDGenerators(sc: SparkContext) { -// made it into a class instead of an object so the sc only needs to be set once - - def normalRDD(numDataPoints: Long, - numPartitions: Int = 1, - mean: Double, - variance: Double, - seed: Long = Utils.random.nextLong): RDD[Double] = { - val normal = new NormalGenerator()(mean, variance) - randomRDD(numDataPoints, numPartitions, normal, seed) - } - - def poissonRDD(numDataPoints: Long, - numPartitions: Int = 1, - mean: Double, - seed: Long = Utils.random.nextLong) : RDD[Double] = { - val poisson = new PoissonGenerator(mean) - randomRDD(numDataPoints, numPartitions, poisson, seed) - } - - def randomRDD(numDataPoints: Long, - numPartitions: Int = 1, - distribution: DistributionGenerator, - seed: Long = Utils.random.nextLong) : RDD[Double] = { - new RandomRDD(sc, numPartitions, numDataPoints, distribution, seed) - } - - def normalVectorRDD(numRows: Int, - numColumns: Int, - numPartitions: Int = 1, - mean: Double, - variance: Double, - seed: Long = Utils.random.nextLong): RDD[Vector] = ??? - - def poissonVectorRDD(numRows: Int, - numColumns: Int, - numPartitions: Int = 1, - mean: Double, - seed: Long = Utils.random.nextLong): RDD[Vector] = ??? - - def randomVectorRDD(numRows: Int, - numColumns: Int, - numPartitions: Int = 1, - rng: DistributionGenerator, - seed: Long = Utils.random.nextLong): RDD[Vector] = ??? - -} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala new file mode 100644 index 0000000000000..612ad5fea477c --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.scalatest.FunSuite + +import org.apache.spark.util.StatCounter + +// TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged +class DistributionGeneratorSuite extends FunSuite { + + def apiChecks(gen: DistributionGenerator) { + + // resetting seed should generate the same sequence of random numbers + gen.setSeed(42L) + val array1 = (0 until 1000).map(_ => gen.nextValue()) + gen.setSeed(42L) + val array2 = (0 until 1000).map(_ => gen.nextValue()) + assert(array1.equals(array2)) + + // newInstance should contain a difference instance of the rng + // i.e. setting difference seeds for difference instances produces different sequences of + // random numbers. + val gen2 = gen.newInstance() + gen.setSeed(0L) + val array3 = (0 until 1000).map(_ => gen.nextValue()) + gen2.setSeed(1L) + val array4 = (0 until 1000).map(_ => gen2.nextValue()) + // Compare arrays instead of elements since individual elements can coincide by chance but the + // sequences should differ given two different seeds. + assert(!array3.equals(array4)) + } + + def distributionChecks(gen: DistributionGenerator, + mean: Double = 0.0, + stddev: Double = 1.0, + epsilon: Double = 1e-3) { + for (seed <- 0 until 5) { + gen.setSeed(seed.toLong) + val sample = (0 until 10000000).map { _ => gen.nextValue()} + val stats = new StatCounter(sample) + assert(math.abs(stats.mean - mean) < epsilon) + assert(math.abs(stats.stdev - stddev) < epsilon) + } + } + + test("UniformGenerator") { + val uniform = new UniformGenerator() + apiChecks(uniform) + // Stddev of uniform distribution = (ub - lb) / math.sqrt(12) + distributionChecks(uniform, 0.5, 1 / math.sqrt(12)) + } + + test("StandardNormalGenerator") { + val normal = new StandardNormalGenerator() + apiChecks(normal) + distributionChecks(normal, 0.0, 1.0) + } + + test("PoissonGenerator") { + // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced. + for (mean <- List(1.0, 5.0, 100.0)) { + val poisson = new PoissonGenerator(mean) + apiChecks(poisson) + distributionChecks(poisson, mean, math.sqrt(mean), 1e-2) + } + } +} + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala new file mode 100644 index 0000000000000..03534de493149 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.apache.spark.mllib.rdd.RandomRDDPartition +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD + +/** + * Note: avoid including APIs that do not set the seed for the RNG in unit tests + * in order to guarantee deterministic behavior. + * + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged + */ +class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext { + + def testGeneratedRDD(rdd: RDD[Double], + expectedSize: Long, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 1e-2) { + val stats = rdd.stats() + println(stats.toString) + assert(expectedSize === stats.count) + assert(expectedNumPartitions === rdd.partitions.size) + assert(math.abs(stats.mean - expectedMean) < epsilon) + assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } + + test("uniformRDD") { + val defaultSize = 1000000L + val numPartitions = 100 + val defaultSeed = 1L + + // vary seeds + for (seed <- 0 until 5) { + val uniform = RandomRDDGenerators.uniformRDD(sc, defaultSize, numPartitions, seed) + testGeneratedRDD(uniform, defaultSize, numPartitions, 0.5, 1 / math.sqrt(12)) + } + + // cases where size % numParts != 0 + for ((size, numPartitions) <- List((10000, 6), (12345, 1), (13000, 3))) { + val uniform = RandomRDDGenerators.uniformRDD(sc, size, numPartitions, defaultSeed) + uniform.partitions.foreach(p => println(p.asInstanceOf[RandomRDDPartition].size)) + testGeneratedRDD(uniform, size, numPartitions, 0.5, 1 / math.sqrt(12)) + } + + // default numPartitions = sc.defaultParallelism + val uniform = RandomRDDGenerators.normalRDD(sc, defaultSize, defaultSeed) + testGeneratedRDD(uniform, defaultSize, sc.defaultParallelism, 0.0, 1.0) + } + + test("normalRDD") { + val size = 1000000L + val numPartitions = 100 + val defaultSeed = 1L + + for (seed <- 0 until 5) { + val normal = RandomRDDGenerators.normalRDD(sc, size, numPartitions, seed) + testGeneratedRDD(normal, size, numPartitions, 0.0, 1.0) + } + + val normal2 = RandomRDDGenerators.normalRDD(sc, size, defaultSeed) + testGeneratedRDD(normal2, size, sc.defaultParallelism, 0.0, 1.0) + } + + test("poissonRDD") { + val size = 1000000L + val numPartitions = 100 + val defaultSeed = 1L + val mean = 100.0 + + for (seed <- 0 until 5) { + val poisson = RandomRDDGenerators.poissonRDD(sc, size, numPartitions, mean, seed) + testGeneratedRDD(poisson, size, numPartitions, mean, math.sqrt(mean), 1e-1) + } + + val poisson2 = RandomRDDGenerators.poissonRDD(sc, size, mean, defaultSeed) + testGeneratedRDD(poisson2, size, sc.defaultParallelism, mean, math.sqrt(mean), 1e-1) + } +} From bc90234c9639bfb3f4581af63cf4bf370c61e18b Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Mon, 21 Jul 2014 20:37:40 -0700 Subject: [PATCH 07/10] units passed. --- .../mllib/random/DistributionGenerator.scala | 4 +- .../mllib/random/RandomRDDGenerators.scala | 2 + .../apache/spark/mllib/rdd/RandomRDD.scala | 7 +- .../random/RandomRDDGeneratorsSuite.scala | 146 ++++++++++++++---- 4 files changed, 123 insertions(+), 36 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala index af4fcc736dc46..8b4013db86434 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala @@ -17,8 +17,6 @@ package org.apache.spark.mllib.random -import scala.util.Random - import cern.jet.random.Poisson import cern.jet.random.engine.DRand @@ -46,6 +44,7 @@ trait DistributionGenerator extends Pseudorandom with Serializable { * Generates i.i.d. samples from U[0.0, 1.0] */ class UniformGenerator() extends DistributionGenerator { + // XORShiftRandom for better performance. Thread safety isn't necessary here. private val random = new XORShiftRandom() @@ -66,6 +65,7 @@ class UniformGenerator() extends DistributionGenerator { * Generates i.i.d. samples from the Standard Normal Distribution. */ class StandardNormalGenerator() extends DistributionGenerator { + // XORShiftRandom for better performance. Thread safety isn't necessary here. private val random = new XORShiftRandom() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala index 1e36a5a38c9c8..2bd13a9f2c991 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala @@ -24,6 +24,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils // TODO add Scaladocs once API fully approved +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring +// down the number of methods here. object RandomRDDGenerators { def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala index d888a7d066f7e..629fff2b8d0bb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala @@ -32,6 +32,7 @@ private[mllib] class RandomRDDPartition(val idx: Int, } +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue private[mllib] class RandomRDD(@transient private var sc: SparkContext, size: Long, numSlices: Int, @@ -106,9 +107,9 @@ private[mllib] object RandomRDD { numSlices: Int, rng: DistributionGenerator, seed: Long): Array[Partition] = { - // TODO fix rounding issue - val firstPartitionSize = if (size % numSlices == 0) size / numSlices else size % numSlices - val otherPartitionSize = math.ceil((size - firstPartitionSize) / (numSlices - 1)).toInt + + val firstPartitionSize = size / numSlices + size % numSlices + val otherPartitionSize = size / numSlices val partitions = new Array[RandomRDDPartition](numSlices) var i = 0 diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala index 03534de493149..3fd8eee2a9bfa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala @@ -17,12 +17,16 @@ package org.apache.spark.mllib.random -import org.apache.spark.mllib.rdd.RandomRDDPartition +import scala.collection.mutable.ArrayBuffer + import org.scalatest.FunSuite import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.RandomRDD import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.util.StatCounter /** * Note: avoid including APIs that do not set the seed for the RNG in unit tests @@ -30,71 +34,151 @@ import org.apache.spark.rdd.RDD * * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged */ -class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext { +class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable { def testGeneratedRDD(rdd: RDD[Double], expectedSize: Long, expectedNumPartitions: Int, expectedMean: Double, expectedStddev: Double, - epsilon: Double = 1e-2) { + epsilon: Double = 0.01) { val stats = rdd.stats() - println(stats.toString) assert(expectedSize === stats.count) assert(expectedNumPartitions === rdd.partitions.size) assert(math.abs(stats.mean - expectedMean) < epsilon) assert(math.abs(stats.stdev - expectedStddev) < epsilon) } - test("uniformRDD") { - val defaultSize = 1000000L - val numPartitions = 100 - val defaultSeed = 1L + // assume test RDDs are small + def testGeneratedVectorRDD(rdd: RDD[Vector], + expectedRows: Long, + expectedColumns: Int, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 0.01) { + assert(expectedNumPartitions === rdd.partitions.size) + val values = new ArrayBuffer[Double]() + rdd.collect.foreach { vector => { + assert(vector.size === expectedColumns) + values ++= vector.toArray + }} + assert(expectedRows === values.size / expectedColumns) + val stats = new StatCounter(values) + assert(math.abs(stats.mean - expectedMean) < epsilon) + assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } - // vary seeds - for (seed <- 0 until 5) { - val uniform = RandomRDDGenerators.uniformRDD(sc, defaultSize, numPartitions, seed) - testGeneratedRDD(uniform, defaultSize, numPartitions, 0.5, 1 / math.sqrt(12)) - } + test("RandomRDD sizes") { - // cases where size % numParts != 0 + // some cases where size % numParts != 0 to test getPartitions behaves correctly for ((size, numPartitions) <- List((10000, 6), (12345, 1), (13000, 3))) { - val uniform = RandomRDDGenerators.uniformRDD(sc, size, numPartitions, defaultSeed) - uniform.partitions.foreach(p => println(p.asInstanceOf[RandomRDDPartition].size)) - testGeneratedRDD(uniform, size, numPartitions, 0.5, 1 / math.sqrt(12)) + val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) + assert(rdd.count() === size) + assert(rdd.partitions.size === numPartitions) } - // default numPartitions = sc.defaultParallelism - val uniform = RandomRDDGenerators.normalRDD(sc, defaultSize, defaultSeed) - testGeneratedRDD(uniform, defaultSize, sc.defaultParallelism, 0.0, 1.0) + // size > Int.MaxValue + val size = Int.MaxValue.toLong + 100L + val numPartitions = 10 + val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) + assert(rdd.count() === size) + assert(rdd.partitions.size === numPartitions) + + // size needs to be positive + try { + new RandomRDD(sc, 0, 10, new UniformGenerator, 0L) + assert(false) + } catch { + case iae: IllegalArgumentException => + } + + // numPartitions needs to be positive + try { + new RandomRDD(sc, 100, 0, new UniformGenerator, 0L) + assert(false) + } catch { + case iae: IllegalArgumentException => + } } - test("normalRDD") { + test("randomRDD for different distributions") { val size = 1000000L val numPartitions = 100 val defaultSeed = 1L + val poissonMean = 100.0 for (seed <- 0 until 5) { + val uniform = RandomRDDGenerators.uniformRDD(sc, size, numPartitions, seed) + testGeneratedRDD(uniform, size, numPartitions, 0.5, 1 / math.sqrt(12)) + val normal = RandomRDDGenerators.normalRDD(sc, size, numPartitions, seed) testGeneratedRDD(normal, size, numPartitions, 0.0, 1.0) + + val poisson = RandomRDDGenerators.poissonRDD(sc, size, numPartitions, poissonMean, seed) + testGeneratedRDD(poisson, size, numPartitions, poissonMean, math.sqrt(poissonMean), 0.1) } - val normal2 = RandomRDDGenerators.normalRDD(sc, size, defaultSeed) - testGeneratedRDD(normal2, size, sc.defaultParallelism, 0.0, 1.0) + // check default numPartitions = sc.defaultParallelism + val uniform = RandomRDDGenerators.uniformRDD(sc, size, defaultSeed) + testGeneratedRDD(uniform, size, sc.defaultParallelism, 0.5, 1 / math.sqrt(12)) + + val normal = RandomRDDGenerators.normalRDD(sc, size, defaultSeed) + testGeneratedRDD(normal, size, sc.defaultParallelism, 0.0, 1.0) + + val poisson = RandomRDDGenerators.poissonRDD(sc, size, poissonMean, defaultSeed) + testGeneratedRDD(poisson, size, sc.defaultParallelism, poissonMean, math.sqrt(poissonMean), 0.1) + + // custom distribution to check that partitions have unique seeds + val random = RandomRDDGenerators.randomRDD(sc, 10L, 10, new MockDistro(), 0L) + random.collect.sorted.equals(Array[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) } - test("poissonRDD") { - val size = 1000000L - val numPartitions = 100 + test("randomVectorRDD for different distributions") { + val rows = 1000L + val cols = 100 + val parts = 10 val defaultSeed = 1L - val mean = 100.0 + val poissonMean = 100.0 for (seed <- 0 until 5) { - val poisson = RandomRDDGenerators.poissonRDD(sc, size, numPartitions, mean, seed) - testGeneratedRDD(poisson, size, numPartitions, mean, math.sqrt(mean), 1e-1) + val uniform = RandomRDDGenerators.uniformVectorRDD(sc, rows, cols, parts, seed) + testGeneratedVectorRDD(uniform, rows, cols, parts, 0.5, 1 / math.sqrt(12)) + + val normal = RandomRDDGenerators.normalVectorRDD(sc, rows, cols, parts, seed) + testGeneratedVectorRDD(normal, rows, cols, parts, 0.0, 1.0) + + val poisson = RandomRDDGenerators.poissonVectorRDD(sc, rows, cols, parts, poissonMean, seed) + testGeneratedVectorRDD(poisson, rows, cols, parts, poissonMean, math.sqrt(poissonMean), 0.1) } - val poisson2 = RandomRDDGenerators.poissonRDD(sc, size, mean, defaultSeed) - testGeneratedRDD(poisson2, size, sc.defaultParallelism, mean, math.sqrt(mean), 1e-1) + // check default numPartitions = sc.defaultParallelism + val uniform = RandomRDDGenerators.uniformVectorRDD(sc, rows, cols, defaultSeed) + testGeneratedVectorRDD(uniform, rows, cols, sc.defaultParallelism, 0.5, 1 / math.sqrt(12)) + + val normal = RandomRDDGenerators.normalVectorRDD(sc, rows, cols, defaultSeed) + testGeneratedVectorRDD(normal, rows, cols, sc.defaultParallelism, 0.0, 1.0) + + val poisson = RandomRDDGenerators.poissonVectorRDD(sc, rows, cols, poissonMean, defaultSeed) + testGeneratedVectorRDD(poisson, rows, cols, sc.defaultParallelism, + poissonMean, math.sqrt(poissonMean), 0.1) + + // custom distribution to check that partitions have unique seeds + val random = RandomRDDGenerators.randomVectorRDD(sc, 3, 3, parts, new MockDistro, 0L) + val values = new ArrayBuffer[Double]() + random.collect.foldLeft(values){ case (values, vector) => values ++= vector.toArray } + values.sorted.equals(Array[Double](1, 1, 1, 2, 2, 2, 3, 3, 3)) } } + +private[random] class MockDistro extends DistributionGenerator { + + var seed = 0L + + // This allows us to check that each partition has a different seed + override def nextValue(): Double = (1 + seed).toDouble + + override def newInstance(): MockDistro = new MockDistro + + override def setSeed(seed: Long) = this.seed = seed +} From aec68eb167ac9f11c64d95c698009cbf8919bd4b Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Mon, 21 Jul 2014 20:42:31 -0700 Subject: [PATCH 08/10] newline --- .../org/apache/spark/mllib/random/DistributionGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala index 8b4013db86434..8424d1fff56b0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala @@ -102,4 +102,4 @@ class PoissonGenerator(val mean: Double) extends DistributionGenerator { } override def newInstance(): PoissonGenerator = new PoissonGenerator(mean) -} \ No newline at end of file +} From a8ea92d679305feaf823a481103023f502a1728d Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Wed, 23 Jul 2014 14:00:33 -0700 Subject: [PATCH 09/10] Reviewer comments --- .../mllib/random/DistributionGenerator.scala | 31 +- .../mllib/random/RandomRDDGenerators.scala | 415 +++++++++++++----- .../apache/spark/mllib/rdd/RandomRDD.scala | 70 +-- .../random/DistributionGeneratorSuite.scala | 9 +- .../random/RandomRDDGeneratorsSuite.scala | 71 ++- 5 files changed, 371 insertions(+), 225 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala index 8424d1fff56b0..bcf87214ac774 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala @@ -28,58 +28,49 @@ import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} trait DistributionGenerator extends Pseudorandom with Serializable { /** - * @return An i.i.d sample as a Double from an underlying distribution. + * Returns an i.i.d sample as a Double from an underlying distribution. */ def nextValue(): Double /** - * @return A copy of the DistributionGenerator with a new instance of the rng object used in the - * class when applicable. Each partition has a unique seed and therefore requires its - * own instance of the DistributionGenerator. + * Returns a copy of the DistributionGenerator with a new instance of the rng object used in the + * class when applicable for non-locking concurrent usage. */ - def newInstance(): DistributionGenerator + def copy(): DistributionGenerator } /** * Generates i.i.d. samples from U[0.0, 1.0] */ -class UniformGenerator() extends DistributionGenerator { +class UniformGenerator extends DistributionGenerator { // XORShiftRandom for better performance. Thread safety isn't necessary here. private val random = new XORShiftRandom() - /** - * @return An i.i.d sample as a Double from U[0.0, 1.0]. - */ override def nextValue(): Double = { random.nextDouble() } - /** Set random seed. */ override def setSeed(seed: Long) = random.setSeed(seed) - override def newInstance(): UniformGenerator = new UniformGenerator() + override def copy(): UniformGenerator = new UniformGenerator() } /** * Generates i.i.d. samples from the Standard Normal Distribution. */ -class StandardNormalGenerator() extends DistributionGenerator { +class StandardNormalGenerator extends DistributionGenerator { // XORShiftRandom for better performance. Thread safety isn't necessary here. private val random = new XORShiftRandom() - /** - * @return An i.i.d sample as a Double from the Normal distribution. - */ override def nextValue(): Double = { random.nextGaussian() } - /** Set random seed. */ override def setSeed(seed: Long) = random.setSeed(seed) - override def newInstance(): StandardNormalGenerator = new StandardNormalGenerator() + override def copy(): StandardNormalGenerator = new StandardNormalGenerator() } /** @@ -91,15 +82,11 @@ class PoissonGenerator(val mean: Double) extends DistributionGenerator { private var rng = new Poisson(mean, new DRand) - /** - * @return An i.i.d sample as a Double from the Poisson distribution. - */ override def nextValue(): Double = rng.nextDouble() - /** Set random seed. */ override def setSeed(seed: Long) { rng = new Poisson(mean, new DRand(seed.toInt)) } - override def newInstance(): PoissonGenerator = new PoissonGenerator(mean) + override def copy(): PoissonGenerator = new PoissonGenerator(mean) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala index 2bd13a9f2c991..3d1e201d97751 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala @@ -23,112 +23,213 @@ import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils -// TODO add Scaladocs once API fully approved -// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring -// down the number of methods here. +/** + * Generator methods for creating RDDs comprised of i.i.d samples from some distribution. + * + * TODO Generate RDD[Vector] from multivariate distributions. + */ object RandomRDDGenerators { + /** + * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + */ def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { val uniform = new UniformGenerator() - randomRDD(sc, size, numPartitions, uniform, seed) - } - - def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { - uniformRDD(sc, size, sc.defaultParallelism, seed) + randomRDD(sc, uniform, size, numPartitions, seed) } + /** + * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + */ def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { uniformRDD(sc, size, numPartitions, Utils.random.nextLong) } + /** + * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + */ def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = { uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) } + /** + * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + */ def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { val normal = new StandardNormalGenerator() - randomRDD(sc, size, numPartitions, normal, seed) - } - - def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { - normalRDD(sc, size, sc.defaultParallelism, seed) + randomRDD(sc, normal, size, numPartitions, seed) } + /** + * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + */ def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { normalRDD(sc, size, numPartitions, Utils.random.nextLong) } + /** + * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + */ def normalRDD(sc: SparkContext, size: Long): RDD[Double] = { normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) } + /** + * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + */ def poissonRDD(sc: SparkContext, + mean: Double, size: Long, numPartitions: Int, - mean: Double, seed: Long): RDD[Double] = { val poisson = new PoissonGenerator(mean) - randomRDD(sc, size, numPartitions, poisson, seed) - } - - def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): RDD[Double] = { - poissonRDD(sc, size, sc.defaultParallelism, mean, seed) - } - - def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: Double): RDD[Double] = { - poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong) - } - - def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] = { - poissonRDD(sc, size, sc.defaultParallelism, mean, Utils.random.nextLong) - } - + randomRDD(sc, poisson, size, numPartitions, seed) + } + + /** + * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + */ + def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double] = { + poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong) + } + + /** + * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param size Size of the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + */ + def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = { + poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong) + } + + /** + * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Double] comprised of i.i.d. samples produced by generator. + */ def randomRDD(sc: SparkContext, + generator: DistributionGenerator, size: Long, numPartitions: Int, - distribution: DistributionGenerator, seed: Long): RDD[Double] = { - new RandomRDD(sc, size, numPartitions, distribution, seed) - } - + new RandomRDD(sc, size, numPartitions, generator, seed) + } + + /** + * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Double] comprised of i.i.d. samples produced by generator. + */ def randomRDD(sc: SparkContext, + generator: DistributionGenerator, size: Long, - distribution: DistributionGenerator, - seed: Long): RDD[Double] = { - randomRDD(sc, size, sc.defaultParallelism, distribution, seed) - } - - def randomRDD(sc: SparkContext, - size: Long, - numPartitions: Int, - distribution: DistributionGenerator): RDD[Double] = { - randomRDD(sc, size, numPartitions, distribution, Utils.random.nextLong) - } - + numPartitions: Int): RDD[Double] = { + randomRDD(sc, generator, size, numPartitions, Utils.random.nextLong) + } + + /** + * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param size Size of the RDD. + * @return RDD[Double] comprised of i.i.d. samples produced by generator. + */ def randomRDD(sc: SparkContext, - size: Long, - distribution: DistributionGenerator): RDD[Double] = { - randomRDD(sc, size, sc.defaultParallelism, distribution, Utils.random.nextLong) - } - - // TODO Generator RDD[Vector] from multivariate distribution - + generator: DistributionGenerator, + size: Long): RDD[Double] = { + randomRDD(sc, generator, size, sc.defaultParallelism, Utils.random.nextLong) + } + + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * uniform distribution on [0.0 1.0]. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + */ def uniformVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int, numPartitions: Int, seed: Long): RDD[Vector] = { val uniform = new UniformGenerator() - randomVectorRDD(sc, numRows, numColumns, numPartitions, uniform, seed) - } - - def uniformVectorRDD(sc: SparkContext, - numRows: Long, - numColumns: Int, - seed: Long): RDD[Vector] = { - uniformVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, seed) - } - + randomVectorRDD(sc, uniform, numRows, numColumns, numPartitions, seed) + } + + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * uniform distribution on [0.0 1.0]. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + */ def uniformVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int, @@ -136,26 +237,50 @@ object RandomRDDGenerators { uniformVectorRDD(sc, numRows, numColumns, numPartitions, Utils.random.nextLong) } + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * uniform distribution on [0.0 1.0]. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + */ def uniformVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int): RDD[Vector] = { uniformVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, Utils.random.nextLong) } + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * standard normal distribution. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + */ def normalVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int, numPartitions: Int, seed: Long): RDD[Vector] = { val uniform = new StandardNormalGenerator() - randomVectorRDD(sc, numRows, numColumns, numPartitions, uniform, seed) - } - - def normalVectorRDD(sc: SparkContext, - numRows: Long, - numColumns: Int, - seed: Long): RDD[Vector] = { - normalVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, seed) - } - + randomVectorRDD(sc, uniform, numRows, numColumns, numPartitions, seed) + } + + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * standard normal distribution. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + */ def normalVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int, @@ -163,73 +288,135 @@ object RandomRDDGenerators { normalVectorRDD(sc, numRows, numColumns, numPartitions, Utils.random.nextLong) } + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * standard normal distribution. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + */ def normalVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int): RDD[Vector] = { normalVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, Utils.random.nextLong) } + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Poisson distribution with the input mean. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + */ def poissonVectorRDD(sc: SparkContext, + mean: Double, numRows: Long, numColumns: Int, numPartitions: Int, - mean: Double, seed: Long): RDD[Vector] = { val poisson = new PoissonGenerator(mean) - randomVectorRDD(sc, numRows, numColumns, numPartitions, poisson, seed) - } - + randomVectorRDD(sc, poisson, numRows, numColumns, numPartitions, seed) + } + + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Poisson distribution with the input mean. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + */ def poissonVectorRDD(sc: SparkContext, - numRows: Long, - numColumns: Int, mean: Double, - seed: Long): RDD[Vector] = { - poissonVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, mean, seed) - } - - def poissonVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int, - numPartitions: Int, - mean: Double): RDD[Vector] = { - poissonVectorRDD(sc, numRows, numColumns, numPartitions, mean, Utils.random.nextLong) - } - + numPartitions: Int): RDD[Vector] = { + poissonVectorRDD(sc, mean, numRows, numColumns, numPartitions, Utils.random.nextLong) + } + + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Poisson distribution with the input mean. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + */ def poissonVectorRDD(sc: SparkContext, + mean: Double, numRows: Long, - numColumns: Int, - mean: Double): RDD[Vector] = { - val poisson = new PoissonGenerator(mean) - randomVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, poisson, Utils.random.nextLong) - } - + numColumns: Int): RDD[Vector] = { + poissonVectorRDD(sc, mean, numRows, numColumns, sc.defaultParallelism, Utils.random.nextLong) + } + + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * input DistributionGenerator. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + */ def randomVectorRDD(sc: SparkContext, + generator: DistributionGenerator, numRows: Long, numColumns: Int, numPartitions: Int, - rng: DistributionGenerator, seed: Long): RDD[Vector] = { - new RandomVectorRDD(sc, numRows, numColumns, numPartitions, rng, seed) - } - + new RandomVectorRDD(sc, numRows, numColumns, numPartitions, generator, seed) + } + + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * input DistributionGenerator. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + */ def randomVectorRDD(sc: SparkContext, + generator: DistributionGenerator, numRows: Long, numColumns: Int, - rng: DistributionGenerator, - seed: Long): RDD[Vector] = { - randomVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, rng, seed) - } - - def randomVectorRDD(sc: SparkContext, - numRows: Long, - numColumns: Int, - numPartitions: Int, - rng: DistributionGenerator): RDD[Vector] = { - randomVectorRDD(sc, numRows, numColumns, numPartitions, rng, Utils.random.nextLong) - } - + numPartitions: Int): RDD[Vector] = { + randomVectorRDD(sc, generator, numRows, numColumns, numPartitions, Utils.random.nextLong) + } + + /** + * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * input DistributionGenerator. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numColumns Number of elements in each Vector. + * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + */ def randomVectorRDD(sc: SparkContext, + generator: DistributionGenerator, numRows: Long, - numColumns: Int, - rng: DistributionGenerator): RDD[Vector] = { - randomVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, rng, Utils.random.nextLong) + numColumns: Int): RDD[Vector] = { + randomVectorRDD(sc, generator, numRows, numColumns, + sc.defaultParallelism, Utils.random.nextLong) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala index 629fff2b8d0bb..0bef63afd3640 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala @@ -23,13 +23,14 @@ import org.apache.spark.mllib.random.DistributionGenerator import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils -private[mllib] class RandomRDDPartition(val idx: Int, - val size: Long, - val rng: DistributionGenerator, - val seed: Long) extends Partition { - - override val index: Int = idx +import scala.util.Random +private[mllib] class RandomRDDPartition(override val index: Int, + val size: Int, + val generator: DistributionGenerator, + val seed: Long) extends Partition { + // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size + require(size > 0, "Positive partition size required.") } // These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue @@ -41,6 +42,8 @@ private[mllib] class RandomRDD(@transient private var sc: SparkContext, require(size > 0, "Positive RDD size required.") require(numSlices > 0, "Positive number of partitions required") + require(math.ceil(size.toDouble / numSlices) <= Int.MaxValue, + "Partition size cannot exceed Int.MaxValue") override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { val split = splitIn.asInstanceOf[RandomRDDPartition] @@ -62,6 +65,8 @@ private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, require(size > 0, "Positive RDD size required.") require(numSlices > 0, "Positive number of partitions required") require(vectorSize > 0, "Positive vector size required.") + require(math.ceil(size.toDouble / numSlices) <= Int.MaxValue, + "Partition size cannot exceed Int.MaxValue") override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { val split = splitIn.asInstanceOf[RandomRDDPartition] @@ -75,50 +80,20 @@ private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, private[mllib] object RandomRDD { - private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) - extends Iterator[Double] { - - private var currentSize = 0 - - override def hasNext: Boolean = currentSize < numElem - - override def next(): Double = { - currentSize += 1 - rng.nextValue() - } - } - - private[mllib] class FixedSizeVectorIterator(val numElem: Long, - val vectorSize: Int, - val rng: DistributionGenerator) - extends Iterator[Vector] { - - private var currentSize = 0 - - override def hasNext: Boolean = currentSize < numElem - - override def next(): Vector = { - currentSize += 1 - new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) - } - } - def getPartitions(size: Long, numSlices: Int, rng: DistributionGenerator, seed: Long): Array[Partition] = { - val firstPartitionSize = size / numSlices + size % numSlices - val otherPartitionSize = size / numSlices - val partitions = new Array[RandomRDDPartition](numSlices) var i = 0 + var start: Long = 0 + var end: Long = 0 + val random = new Random(seed) while (i < numSlices) { - partitions(i) = if (i == 0) { - new RandomRDDPartition(i, firstPartitionSize, rng, seed) - } else { - new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed) - } + end = ((i + 1) * size) / numSlices + partitions(i) = new RandomRDDPartition(i, (end - start).toInt, rng, random.nextLong()) + start = end i += 1 } partitions.asInstanceOf[Array[Partition]] @@ -127,14 +102,17 @@ private[mllib] object RandomRDD { // The RNG has to be reset every time the iterator is requested to guarantee same data // every time the content of the RDD is examined. def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = { - partition.rng.setSeed(partition.seed + partition.index) - new FixedSizePointIterator(partition.size, partition.rng) + val generator = partition.generator.copy() + generator.setSeed(partition.seed) + Iterator.fill(partition.size)(generator.nextValue()) } // The RNG has to be reset every time the iterator is requested to guarantee same data // every time the content of the RDD is examined. def getVectorIterator(partition: RandomRDDPartition, vectorSize: Int): Iterator[Vector] = { - partition.rng.setSeed(partition.seed + partition.index) - new FixedSizeVectorIterator(partition.size, vectorSize, partition.rng) + val generator = partition.generator.copy() + generator.setSeed(partition.seed) + Iterator.fill(partition.size)(new DenseVector( + (0 until vectorSize).map { _ => generator.nextValue() }.toArray)) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala index 612ad5fea477c..a2059f147eed2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala @@ -36,7 +36,7 @@ class DistributionGeneratorSuite extends FunSuite { // newInstance should contain a difference instance of the rng // i.e. setting difference seeds for difference instances produces different sequences of // random numbers. - val gen2 = gen.newInstance() + val gen2 = gen.copy() gen.setSeed(0L) val array3 = (0 until 1000).map(_ => gen.nextValue()) gen2.setSeed(1L) @@ -44,6 +44,13 @@ class DistributionGeneratorSuite extends FunSuite { // Compare arrays instead of elements since individual elements can coincide by chance but the // sequences should differ given two different seeds. assert(!array3.equals(array4)) + + // test that setting the same seed in the copied instance produces the same sequence of numbers + gen.setSeed(0L) + val array5 = (0 until 1000).map(_ => gen.nextValue()) + gen2.setSeed(0L) + val array6 = (0 until 1000).map(_ => gen2.nextValue()) + assert(array5.equals(array6)) } def distributionChecks(gen: DistributionGenerator, diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala index 3fd8eee2a9bfa..6d65af19e1921 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala @@ -23,7 +23,7 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.rdd.RandomRDD +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD} import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.util.StatCounter @@ -72,18 +72,26 @@ class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Seri test("RandomRDD sizes") { // some cases where size % numParts != 0 to test getPartitions behaves correctly - for ((size, numPartitions) <- List((10000, 6), (12345, 1), (13000, 3))) { + for ((size, numPartitions) <- List((10000, 6), (12345, 1), (1000, 101))) { val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) assert(rdd.count() === size) assert(rdd.partitions.size === numPartitions) + + // check that partition sizes are balanced + val partSizes = rdd.partitions.map( p => p.asInstanceOf[RandomRDDPartition].size.toDouble) + val partStats = new StatCounter(partSizes) + assert(partStats.stdev < 1.0) } // size > Int.MaxValue - val size = Int.MaxValue.toLong + 100L - val numPartitions = 10 + val size = Int.MaxValue.toLong * 100L + val numPartitions = 101 val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) - assert(rdd.count() === size) assert(rdd.partitions.size === numPartitions) + val count = rdd.partitions.foldLeft(0L){ + (count, part) => count + part.asInstanceOf[RandomRDDPartition].size + } + assert(count === size) // size needs to be positive try { @@ -100,12 +108,19 @@ class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Seri } catch { case iae: IllegalArgumentException => } + + // partition size needs to be <= Int.MaxValue + try { + new RandomRDD(sc, Int.MaxValue.toLong * 100L, 99, new UniformGenerator, 0L) + assert(false) + } catch { + case iae: IllegalArgumentException => + } } test("randomRDD for different distributions") { val size = 1000000L val numPartitions = 100 - val defaultSeed = 1L val poissonMean = 100.0 for (seed <- 0 until 5) { @@ -115,30 +130,19 @@ class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Seri val normal = RandomRDDGenerators.normalRDD(sc, size, numPartitions, seed) testGeneratedRDD(normal, size, numPartitions, 0.0, 1.0) - val poisson = RandomRDDGenerators.poissonRDD(sc, size, numPartitions, poissonMean, seed) + val poisson = RandomRDDGenerators.poissonRDD(sc, poissonMean, size, numPartitions, seed) testGeneratedRDD(poisson, size, numPartitions, poissonMean, math.sqrt(poissonMean), 0.1) } - // check default numPartitions = sc.defaultParallelism - val uniform = RandomRDDGenerators.uniformRDD(sc, size, defaultSeed) - testGeneratedRDD(uniform, size, sc.defaultParallelism, 0.5, 1 / math.sqrt(12)) - - val normal = RandomRDDGenerators.normalRDD(sc, size, defaultSeed) - testGeneratedRDD(normal, size, sc.defaultParallelism, 0.0, 1.0) - - val poisson = RandomRDDGenerators.poissonRDD(sc, size, poissonMean, defaultSeed) - testGeneratedRDD(poisson, size, sc.defaultParallelism, poissonMean, math.sqrt(poissonMean), 0.1) - - // custom distribution to check that partitions have unique seeds - val random = RandomRDDGenerators.randomRDD(sc, 10L, 10, new MockDistro(), 0L) - random.collect.sorted.equals(Array[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + // mock distribution to check that partitions have unique seeds + val random = RandomRDDGenerators.randomRDD(sc, new MockDistro(), 1000L, 1000, 0L) + assert(random.collect.size === random.collect.distinct.size) } test("randomVectorRDD for different distributions") { val rows = 1000L val cols = 100 val parts = 10 - val defaultSeed = 1L val poissonMean = 100.0 for (seed <- 0 until 5) { @@ -148,26 +152,9 @@ class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Seri val normal = RandomRDDGenerators.normalVectorRDD(sc, rows, cols, parts, seed) testGeneratedVectorRDD(normal, rows, cols, parts, 0.0, 1.0) - val poisson = RandomRDDGenerators.poissonVectorRDD(sc, rows, cols, parts, poissonMean, seed) + val poisson = RandomRDDGenerators.poissonVectorRDD(sc, poissonMean, rows, cols, parts, seed) testGeneratedVectorRDD(poisson, rows, cols, parts, poissonMean, math.sqrt(poissonMean), 0.1) } - - // check default numPartitions = sc.defaultParallelism - val uniform = RandomRDDGenerators.uniformVectorRDD(sc, rows, cols, defaultSeed) - testGeneratedVectorRDD(uniform, rows, cols, sc.defaultParallelism, 0.5, 1 / math.sqrt(12)) - - val normal = RandomRDDGenerators.normalVectorRDD(sc, rows, cols, defaultSeed) - testGeneratedVectorRDD(normal, rows, cols, sc.defaultParallelism, 0.0, 1.0) - - val poisson = RandomRDDGenerators.poissonVectorRDD(sc, rows, cols, poissonMean, defaultSeed) - testGeneratedVectorRDD(poisson, rows, cols, sc.defaultParallelism, - poissonMean, math.sqrt(poissonMean), 0.1) - - // custom distribution to check that partitions have unique seeds - val random = RandomRDDGenerators.randomVectorRDD(sc, 3, 3, parts, new MockDistro, 0L) - val values = new ArrayBuffer[Double]() - random.collect.foldLeft(values){ case (values, vector) => values ++= vector.toArray } - values.sorted.equals(Array[Double](1, 1, 1, 2, 2, 2, 3, 3, 3)) } } @@ -176,9 +163,9 @@ private[random] class MockDistro extends DistributionGenerator { var seed = 0L // This allows us to check that each partition has a different seed - override def nextValue(): Double = (1 + seed).toDouble - - override def newInstance(): MockDistro = new MockDistro + override def nextValue(): Double = seed.toDouble override def setSeed(seed: Long) = this.seed = seed + + override def copy(): MockDistro = new MockDistro } From 01121ac8af3d7f6cd44160644d387b3747b925f4 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 25 Jul 2014 14:01:16 -0700 Subject: [PATCH 10/10] reviewer comments --- .../mllib/random/DistributionGenerator.scala | 15 ++- .../mllib/random/RandomRDDGenerators.scala | 127 ++++++++++++------ .../apache/spark/mllib/rdd/RandomRDD.scala | 38 +++--- .../random/DistributionGeneratorSuite.scala | 7 +- .../random/RandomRDDGeneratorsSuite.scala | 33 ++--- 5 files changed, 133 insertions(+), 87 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala index bcf87214ac774..7ecb409c4a91a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala @@ -20,15 +20,18 @@ package org.apache.spark.mllib.random import cern.jet.random.Poisson import cern.jet.random.engine.DRand +import org.apache.spark.annotation.Experimental import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} /** - * Trait for random number generators that generate i.i.d values from a distribution. + * :: Experimental :: + * Trait for random number generators that generate i.i.d. values from a distribution. */ +@Experimental trait DistributionGenerator extends Pseudorandom with Serializable { /** - * Returns an i.i.d sample as a Double from an underlying distribution. + * Returns an i.i.d. sample as a Double from an underlying distribution. */ def nextValue(): Double @@ -40,8 +43,10 @@ trait DistributionGenerator extends Pseudorandom with Serializable { } /** + * :: Experimental :: * Generates i.i.d. samples from U[0.0, 1.0] */ +@Experimental class UniformGenerator extends DistributionGenerator { // XORShiftRandom for better performance. Thread safety isn't necessary here. @@ -57,8 +62,10 @@ class UniformGenerator extends DistributionGenerator { } /** - * Generates i.i.d. samples from the Standard Normal Distribution. + * :: Experimental :: + * Generates i.i.d. samples from the standard normal distribution. */ +@Experimental class StandardNormalGenerator extends DistributionGenerator { // XORShiftRandom for better performance. Thread safety isn't necessary here. @@ -74,10 +81,12 @@ class StandardNormalGenerator extends DistributionGenerator { } /** + * :: Experimental :: * Generates i.i.d. samples from the Poisson distribution with the given mean. * * @param mean mean for the Poisson distribution. */ +@Experimental class PoissonGenerator(val mean: Double) extends DistributionGenerator { private var rng = new Poisson(mean, new DRand) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala index 3d1e201d97751..d7ee2d3f46846 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala @@ -18,19 +18,21 @@ package org.apache.spark.mllib.random import org.apache.spark.SparkContext +import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils /** + * :: Experimental :: * Generator methods for creating RDDs comprised of i.i.d samples from some distribution. - * - * TODO Generate RDD[Vector] from multivariate distributions. */ +@Experimental object RandomRDDGenerators { /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. * * @param sc SparkContext used to create the RDD. @@ -39,12 +41,14 @@ object RandomRDDGenerators { * @param seed Seed for the RNG that generates the seed for the generator in each partition. * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. */ + @Experimental def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { val uniform = new UniformGenerator() randomRDD(sc, uniform, size, numPartitions, seed) } /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. * * @param sc SparkContext used to create the RDD. @@ -52,11 +56,13 @@ object RandomRDDGenerators { * @param numPartitions Number of partitions in the RDD. * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. */ + @Experimental def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { uniformRDD(sc, size, numPartitions, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. * sc.defaultParallelism used for the number of partitions in the RDD. * @@ -64,11 +70,13 @@ object RandomRDDGenerators { * @param size Size of the RDD. * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. */ + @Experimental def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = { uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the standard normal distribution. * * @param sc SparkContext used to create the RDD. @@ -77,12 +85,14 @@ object RandomRDDGenerators { * @param seed Seed for the RNG that generates the seed for the generator in each partition. * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). */ + @Experimental def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { val normal = new StandardNormalGenerator() randomRDD(sc, normal, size, numPartitions, seed) } /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the standard normal distribution. * * @param sc SparkContext used to create the RDD. @@ -90,11 +100,13 @@ object RandomRDDGenerators { * @param numPartitions Number of partitions in the RDD. * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). */ + @Experimental def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { normalRDD(sc, size, numPartitions, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the standard normal distribution. * sc.defaultParallelism used for the number of partitions in the RDD. * @@ -102,11 +114,13 @@ object RandomRDDGenerators { * @param size Size of the RDD. * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). */ + @Experimental def normalRDD(sc: SparkContext, size: Long): RDD[Double] = { normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. @@ -116,6 +130,7 @@ object RandomRDDGenerators { * @param seed Seed for the RNG that generates the seed for the generator in each partition. * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). */ + @Experimental def poissonRDD(sc: SparkContext, mean: Double, size: Long, @@ -126,6 +141,7 @@ object RandomRDDGenerators { } /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. @@ -134,11 +150,13 @@ object RandomRDDGenerators { * @param numPartitions Number of partitions in the RDD. * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). */ + @Experimental def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double] = { poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. * sc.defaultParallelism used for the number of partitions in the RDD. * @@ -147,11 +165,13 @@ object RandomRDDGenerators { * @param size Size of the RDD. * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). */ + @Experimental def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = { poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. * * @param sc SparkContext used to create the RDD. @@ -161,6 +181,7 @@ object RandomRDDGenerators { * @param seed Seed for the RNG that generates the seed for the generator in each partition. * @return RDD[Double] comprised of i.i.d. samples produced by generator. */ + @Experimental def randomRDD(sc: SparkContext, generator: DistributionGenerator, size: Long, @@ -170,6 +191,7 @@ object RandomRDDGenerators { } /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. * * @param sc SparkContext used to create the RDD. @@ -178,6 +200,7 @@ object RandomRDDGenerators { * @param numPartitions Number of partitions in the RDD. * @return RDD[Double] comprised of i.i.d. samples produced by generator. */ + @Experimental def randomRDD(sc: SparkContext, generator: DistributionGenerator, size: Long, @@ -186,6 +209,7 @@ object RandomRDDGenerators { } /** + * :: Experimental :: * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. * sc.defaultParallelism used for the number of partitions in the RDD. * @@ -194,156 +218,176 @@ object RandomRDDGenerators { * @param size Size of the RDD. * @return RDD[Double] comprised of i.i.d. samples produced by generator. */ + @Experimental def randomRDD(sc: SparkContext, generator: DistributionGenerator, size: Long): RDD[Double] = { randomRDD(sc, generator, size, sc.defaultParallelism, Utils.random.nextLong) } + // TODO Generate RDD[Vector] from multivariate distributions. + /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the * uniform distribution on [0.0 1.0]. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. */ + @Experimental def uniformVectorRDD(sc: SparkContext, numRows: Long, - numColumns: Int, + numCols: Int, numPartitions: Int, seed: Long): RDD[Vector] = { val uniform = new UniformGenerator() - randomVectorRDD(sc, uniform, numRows, numColumns, numPartitions, seed) + randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed) } /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the * uniform distribution on [0.0 1.0]. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. */ + @Experimental def uniformVectorRDD(sc: SparkContext, numRows: Long, - numColumns: Int, + numCols: Int, numPartitions: Int): RDD[Vector] = { - uniformVectorRDD(sc, numRows, numColumns, numPartitions, Utils.random.nextLong) + uniformVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the * uniform distribution on [0.0 1.0]. * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. */ - def uniformVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int): RDD[Vector] = { - uniformVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, Utils.random.nextLong) + @Experimental + def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { + uniformVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the * standard normal distribution. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). */ + @Experimental def normalVectorRDD(sc: SparkContext, numRows: Long, - numColumns: Int, + numCols: Int, numPartitions: Int, seed: Long): RDD[Vector] = { val uniform = new StandardNormalGenerator() - randomVectorRDD(sc, uniform, numRows, numColumns, numPartitions, seed) + randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed) } /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the * standard normal distribution. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). */ + @Experimental def normalVectorRDD(sc: SparkContext, numRows: Long, - numColumns: Int, + numCols: Int, numPartitions: Int): RDD[Vector] = { - normalVectorRDD(sc, numRows, numColumns, numPartitions, Utils.random.nextLong) + normalVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the * standard normal distribution. * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). */ - def normalVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int): RDD[Vector] = { - normalVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, Utils.random.nextLong) + @Experimental + def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { + normalVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the * Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. * @param mean Mean, or lambda, for the Poisson distribution. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). */ + @Experimental def poissonVectorRDD(sc: SparkContext, mean: Double, numRows: Long, - numColumns: Int, + numCols: Int, numPartitions: Int, seed: Long): RDD[Vector] = { val poisson = new PoissonGenerator(mean) - randomVectorRDD(sc, poisson, numRows, numColumns, numPartitions, seed) + randomVectorRDD(sc, poisson, numRows, numCols, numPartitions, seed) } /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the * Poisson distribution with the input mean. * * @param sc SparkContext used to create the RDD. * @param mean Mean, or lambda, for the Poisson distribution. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). */ + @Experimental def poissonVectorRDD(sc: SparkContext, mean: Double, numRows: Long, - numColumns: Int, + numCols: Int, numPartitions: Int): RDD[Vector] = { - poissonVectorRDD(sc, mean, numRows, numColumns, numPartitions, Utils.random.nextLong) + poissonVectorRDD(sc, mean, numRows, numCols, numPartitions, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the * Poisson distribution with the input mean. * sc.defaultParallelism used for the number of partitions in the RDD. @@ -351,57 +395,63 @@ object RandomRDDGenerators { * @param sc SparkContext used to create the RDD. * @param mean Mean, or lambda, for the Poisson distribution. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). */ + @Experimental def poissonVectorRDD(sc: SparkContext, mean: Double, numRows: Long, - numColumns: Int): RDD[Vector] = { - poissonVectorRDD(sc, mean, numRows, numColumns, sc.defaultParallelism, Utils.random.nextLong) + numCols: Int): RDD[Vector] = { + poissonVectorRDD(sc, mean, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the * input DistributionGenerator. * * @param sc SparkContext used to create the RDD. * @param generator DistributionGenerator used to populate the RDD. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. */ + @Experimental def randomVectorRDD(sc: SparkContext, generator: DistributionGenerator, numRows: Long, - numColumns: Int, + numCols: Int, numPartitions: Int, seed: Long): RDD[Vector] = { - new RandomVectorRDD(sc, numRows, numColumns, numPartitions, generator, seed) + new RandomVectorRDD(sc, numRows, numCols, numPartitions, generator, seed) } /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the * input DistributionGenerator. * * @param sc SparkContext used to create the RDD. * @param generator DistributionGenerator used to populate the RDD. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. */ + @Experimental def randomVectorRDD(sc: SparkContext, generator: DistributionGenerator, numRows: Long, - numColumns: Int, + numCols: Int, numPartitions: Int): RDD[Vector] = { - randomVectorRDD(sc, generator, numRows, numColumns, numPartitions, Utils.random.nextLong) + randomVectorRDD(sc, generator, numRows, numCols, numPartitions, Utils.random.nextLong) } /** + * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the * input DistributionGenerator. * sc.defaultParallelism used for the number of partitions in the RDD. @@ -409,14 +459,15 @@ object RandomRDDGenerators { * @param sc SparkContext used to create the RDD. * @param generator DistributionGenerator used to populate the RDD. * @param numRows Number of Vectors in the RDD. - * @param numColumns Number of elements in each Vector. + * @param numCols Number of elements in each Vector. * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. */ + @Experimental def randomVectorRDD(sc: SparkContext, generator: DistributionGenerator, numRows: Long, - numColumns: Int): RDD[Vector] = { - randomVectorRDD(sc, generator, numRows, numColumns, + numCols: Int): RDD[Vector] = { + randomVectorRDD(sc, generator, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala index 0bef63afd3640..f13282d07ff92 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala @@ -29,20 +29,20 @@ private[mllib] class RandomRDDPartition(override val index: Int, val size: Int, val generator: DistributionGenerator, val seed: Long) extends Partition { - // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size - require(size > 0, "Positive partition size required.") + + require(size >= 0, "Non-negative partition size required.") } // These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue -private[mllib] class RandomRDD(@transient private var sc: SparkContext, +private[mllib] class RandomRDD(@transient sc: SparkContext, size: Long, - numSlices: Int, + numPartitions: Int, @transient rng: DistributionGenerator, @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { require(size > 0, "Positive RDD size required.") - require(numSlices > 0, "Positive number of partitions required") - require(math.ceil(size.toDouble / numSlices) <= Int.MaxValue, + require(numPartitions > 0, "Positive number of partitions required") + require(math.ceil(size.toDouble / numPartitions) <= Int.MaxValue, "Partition size cannot exceed Int.MaxValue") override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { @@ -51,21 +51,21 @@ private[mllib] class RandomRDD(@transient private var sc: SparkContext, } override def getPartitions: Array[Partition] = { - RandomRDD.getPartitions(size, numSlices, rng, seed) + RandomRDD.getPartitions(size, numPartitions, rng, seed) } } -private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +private[mllib] class RandomVectorRDD(@transient sc: SparkContext, size: Long, vectorSize: Int, - numSlices: Int, + numPartitions: Int, @transient rng: DistributionGenerator, @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { require(size > 0, "Positive RDD size required.") - require(numSlices > 0, "Positive number of partitions required") + require(numPartitions > 0, "Positive number of partitions required") require(vectorSize > 0, "Positive vector size required.") - require(math.ceil(size.toDouble / numSlices) <= Int.MaxValue, + require(math.ceil(size.toDouble / numPartitions) <= Int.MaxValue, "Partition size cannot exceed Int.MaxValue") override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { @@ -74,24 +74,24 @@ private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, } override protected def getPartitions: Array[Partition] = { - RandomRDD.getPartitions(size, numSlices, rng, seed) + RandomRDD.getPartitions(size, numPartitions, rng, seed) } } private[mllib] object RandomRDD { def getPartitions(size: Long, - numSlices: Int, + numPartitions: Int, rng: DistributionGenerator, seed: Long): Array[Partition] = { - val partitions = new Array[RandomRDDPartition](numSlices) + val partitions = new Array[RandomRDDPartition](numPartitions) var i = 0 var start: Long = 0 var end: Long = 0 val random = new Random(seed) - while (i < numSlices) { - end = ((i + 1) * size) / numSlices + while (i < numPartitions) { + end = ((i + 1) * size) / numPartitions partitions(i) = new RandomRDDPartition(i, (end - start).toInt, rng, random.nextLong()) start = end i += 1 @@ -104,7 +104,7 @@ private[mllib] object RandomRDD { def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = { val generator = partition.generator.copy() generator.setSeed(partition.seed) - Iterator.fill(partition.size)(generator.nextValue()) + Array.fill(partition.size)(generator.nextValue()).toIterator } // The RNG has to be reset every time the iterator is requested to guarantee same data @@ -112,7 +112,7 @@ private[mllib] object RandomRDD { def getVectorIterator(partition: RandomRDDPartition, vectorSize: Int): Iterator[Vector] = { val generator = partition.generator.copy() generator.setSeed(partition.seed) - Iterator.fill(partition.size)(new DenseVector( - (0 until vectorSize).map { _ => generator.nextValue() }.toArray)) + Array.fill(partition.size)(new DenseVector( + (0 until vectorSize).map { _ => generator.nextValue() }.toArray)).toIterator } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala index a2059f147eed2..974dec4c0b5ee 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala @@ -56,10 +56,10 @@ class DistributionGeneratorSuite extends FunSuite { def distributionChecks(gen: DistributionGenerator, mean: Double = 0.0, stddev: Double = 1.0, - epsilon: Double = 1e-3) { + epsilon: Double = 0.01) { for (seed <- 0 until 5) { gen.setSeed(seed.toLong) - val sample = (0 until 10000000).map { _ => gen.nextValue()} + val sample = (0 until 100000).map { _ => gen.nextValue()} val stats = new StatCounter(sample) assert(math.abs(stats.mean - mean) < epsilon) assert(math.abs(stats.stdev - stddev) < epsilon) @@ -84,8 +84,7 @@ class DistributionGeneratorSuite extends FunSuite { for (mean <- List(1.0, 5.0, 100.0)) { val poisson = new PoissonGenerator(mean) apiChecks(poisson) - distributionChecks(poisson, mean, math.sqrt(mean), 1e-2) + distributionChecks(poisson, mean, math.sqrt(mean), 0.1) } } } - diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala index 6d65af19e1921..6aa4f803df0f7 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.util.StatCounter -/** +/* * Note: avoid including APIs that do not set the seed for the RNG in unit tests * in order to guarantee deterministic behavior. * @@ -78,9 +78,9 @@ class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Seri assert(rdd.partitions.size === numPartitions) // check that partition sizes are balanced - val partSizes = rdd.partitions.map( p => p.asInstanceOf[RandomRDDPartition].size.toDouble) + val partSizes = rdd.partitions.map(p => p.asInstanceOf[RandomRDDPartition].size.toDouble) val partStats = new StatCounter(partSizes) - assert(partStats.stdev < 1.0) + assert(partStats.max - partStats.min <= 1) } // size > Int.MaxValue @@ -88,39 +88,26 @@ class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Seri val numPartitions = 101 val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) assert(rdd.partitions.size === numPartitions) - val count = rdd.partitions.foldLeft(0L){ - (count, part) => count + part.asInstanceOf[RandomRDDPartition].size + val count = rdd.partitions.foldLeft(0L) { (count, part) => + count + part.asInstanceOf[RandomRDDPartition].size } assert(count === size) // size needs to be positive - try { - new RandomRDD(sc, 0, 10, new UniformGenerator, 0L) - assert(false) - } catch { - case iae: IllegalArgumentException => - } + intercept[IllegalArgumentException] { new RandomRDD(sc, 0, 10, new UniformGenerator, 0L) } // numPartitions needs to be positive - try { - new RandomRDD(sc, 100, 0, new UniformGenerator, 0L) - assert(false) - } catch { - case iae: IllegalArgumentException => - } + intercept[IllegalArgumentException] { new RandomRDD(sc, 100, 0, new UniformGenerator, 0L) } // partition size needs to be <= Int.MaxValue - try { + intercept[IllegalArgumentException] { new RandomRDD(sc, Int.MaxValue.toLong * 100L, 99, new UniformGenerator, 0L) - assert(false) - } catch { - case iae: IllegalArgumentException => } } test("randomRDD for different distributions") { - val size = 1000000L - val numPartitions = 100 + val size = 100000L + val numPartitions = 10 val poissonMean = 100.0 for (seed <- 0 until 5) {