diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala new file mode 100644 index 0000000000000..7ecb409c4a91a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.annotation.Experimental +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} + +/** + * :: Experimental :: + * Trait for random number generators that generate i.i.d. values from a distribution. + */ +@Experimental +trait DistributionGenerator extends Pseudorandom with Serializable { + + /** + * Returns an i.i.d. sample as a Double from an underlying distribution. + */ + def nextValue(): Double + + /** + * Returns a copy of the DistributionGenerator with a new instance of the rng object used in the + * class when applicable for non-locking concurrent usage. + */ + def copy(): DistributionGenerator +} + +/** + * :: Experimental :: + * Generates i.i.d. samples from U[0.0, 1.0] + */ +@Experimental +class UniformGenerator extends DistributionGenerator { + + // XORShiftRandom for better performance. Thread safety isn't necessary here. + private val random = new XORShiftRandom() + + override def nextValue(): Double = { + random.nextDouble() + } + + override def setSeed(seed: Long) = random.setSeed(seed) + + override def copy(): UniformGenerator = new UniformGenerator() +} + +/** + * :: Experimental :: + * Generates i.i.d. samples from the standard normal distribution. + */ +@Experimental +class StandardNormalGenerator extends DistributionGenerator { + + // XORShiftRandom for better performance. Thread safety isn't necessary here. + private val random = new XORShiftRandom() + + override def nextValue(): Double = { + random.nextGaussian() + } + + override def setSeed(seed: Long) = random.setSeed(seed) + + override def copy(): StandardNormalGenerator = new StandardNormalGenerator() +} + +/** + * :: Experimental :: + * Generates i.i.d. samples from the Poisson distribution with the given mean. + * + * @param mean mean for the Poisson distribution. + */ +@Experimental +class PoissonGenerator(val mean: Double) extends DistributionGenerator { + + private var rng = new Poisson(mean, new DRand) + + override def nextValue(): Double = rng.nextDouble() + + override def setSeed(seed: Long) { + rng = new Poisson(mean, new DRand(seed.toInt)) + } + + override def copy(): PoissonGenerator = new PoissonGenerator(mean) +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala new file mode 100644 index 0000000000000..d7ee2d3f46846 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +/** + * :: Experimental :: + * Generator methods for creating RDDs comprised of i.i.d samples from some distribution. + */ +@Experimental +object RandomRDDGenerators { + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + */ + @Experimental + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { + val uniform = new UniformGenerator() + randomRDD(sc, uniform, size, numPartitions, seed) + } + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + */ + @Experimental + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { + uniformRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + */ + @Experimental + def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = { + uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + */ + @Experimental + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { + val normal = new StandardNormalGenerator() + randomRDD(sc, normal, size, numPartitions, seed) + } + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + */ + @Experimental + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { + normalRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + */ + @Experimental + def normalRDD(sc: SparkContext, size: Long): RDD[Double] = { + normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + */ + @Experimental + def poissonRDD(sc: SparkContext, + mean: Double, + size: Long, + numPartitions: Int, + seed: Long): RDD[Double] = { + val poisson = new PoissonGenerator(mean) + randomRDD(sc, poisson, size, numPartitions, seed) + } + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + */ + @Experimental + def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double] = { + poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param size Size of the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + */ + @Experimental + def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = { + poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Double] comprised of i.i.d. samples produced by generator. + */ + @Experimental + def randomRDD(sc: SparkContext, + generator: DistributionGenerator, + size: Long, + numPartitions: Int, + seed: Long): RDD[Double] = { + new RandomRDD(sc, size, numPartitions, generator, seed) + } + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Double] comprised of i.i.d. samples produced by generator. + */ + @Experimental + def randomRDD(sc: SparkContext, + generator: DistributionGenerator, + size: Long, + numPartitions: Int): RDD[Double] = { + randomRDD(sc, generator, size, numPartitions, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param size Size of the RDD. + * @return RDD[Double] comprised of i.i.d. samples produced by generator. + */ + @Experimental + def randomRDD(sc: SparkContext, + generator: DistributionGenerator, + size: Long): RDD[Double] = { + randomRDD(sc, generator, size, sc.defaultParallelism, Utils.random.nextLong) + } + + // TODO Generate RDD[Vector] from multivariate distributions. + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * uniform distribution on [0.0 1.0]. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + */ + @Experimental + def uniformVectorRDD(sc: SparkContext, + numRows: Long, + numCols: Int, + numPartitions: Int, + seed: Long): RDD[Vector] = { + val uniform = new UniformGenerator() + randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed) + } + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * uniform distribution on [0.0 1.0]. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + */ + @Experimental + def uniformVectorRDD(sc: SparkContext, + numRows: Long, + numCols: Int, + numPartitions: Int): RDD[Vector] = { + uniformVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * uniform distribution on [0.0 1.0]. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + */ + @Experimental + def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { + uniformVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * standard normal distribution. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + */ + @Experimental + def normalVectorRDD(sc: SparkContext, + numRows: Long, + numCols: Int, + numPartitions: Int, + seed: Long): RDD[Vector] = { + val uniform = new StandardNormalGenerator() + randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed) + } + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * standard normal distribution. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + */ + @Experimental + def normalVectorRDD(sc: SparkContext, + numRows: Long, + numCols: Int, + numPartitions: Int): RDD[Vector] = { + normalVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * standard normal distribution. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0). + */ + @Experimental + def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { + normalVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Poisson distribution with the input mean. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + */ + @Experimental + def poissonVectorRDD(sc: SparkContext, + mean: Double, + numRows: Long, + numCols: Int, + numPartitions: Int, + seed: Long): RDD[Vector] = { + val poisson = new PoissonGenerator(mean) + randomVectorRDD(sc, poisson, numRows, numCols, numPartitions, seed) + } + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Poisson distribution with the input mean. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + */ + @Experimental + def poissonVectorRDD(sc: SparkContext, + mean: Double, + numRows: Long, + numCols: Int, + numPartitions: Int): RDD[Vector] = { + poissonVectorRDD(sc, mean, numRows, numCols, numPartitions, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the + * Poisson distribution with the input mean. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or lambda, for the Poisson distribution. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean). + */ + @Experimental + def poissonVectorRDD(sc: SparkContext, + mean: Double, + numRows: Long, + numCols: Int): RDD[Vector] = { + poissonVectorRDD(sc, mean, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * input DistributionGenerator. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + */ + @Experimental + def randomVectorRDD(sc: SparkContext, + generator: DistributionGenerator, + numRows: Long, + numCols: Int, + numPartitions: Int, + seed: Long): RDD[Vector] = { + new RandomVectorRDD(sc, numRows, numCols, numPartitions, generator, seed) + } + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * input DistributionGenerator. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + */ + @Experimental + def randomVectorRDD(sc: SparkContext, + generator: DistributionGenerator, + numRows: Long, + numCols: Int, + numPartitions: Int): RDD[Vector] = { + randomVectorRDD(sc, generator, numRows, numCols, numPartitions, Utils.random.nextLong) + } + + /** + * :: Experimental :: + * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the + * input DistributionGenerator. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param generator DistributionGenerator used to populate the RDD. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @return RDD[Vector] with vectors containing i.i.d samples produced by generator. + */ + @Experimental + def randomVectorRDD(sc: SparkContext, + generator: DistributionGenerator, + numRows: Long, + numCols: Int): RDD[Vector] = { + randomVectorRDD(sc, generator, numRows, numCols, + sc.defaultParallelism, Utils.random.nextLong) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala new file mode 100644 index 0000000000000..f13282d07ff92 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +import scala.util.Random + +private[mllib] class RandomRDDPartition(override val index: Int, + val size: Int, + val generator: DistributionGenerator, + val seed: Long) extends Partition { + + require(size >= 0, "Non-negative partition size required.") +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient sc: SparkContext, + size: Long, + numPartitions: Int, + @transient rng: DistributionGenerator, + @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numPartitions > 0, "Positive number of partitions required") + require(math.ceil(size.toDouble / numPartitions) <= Int.MaxValue, + "Partition size cannot exceed Int.MaxValue") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { + val split = splitIn.asInstanceOf[RandomRDDPartition] + RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { + RandomRDD.getPartitions(size, numPartitions, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient sc: SparkContext, + size: Long, + vectorSize: Int, + numPartitions: Int, + @transient rng: DistributionGenerator, + @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numPartitions > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + require(math.ceil(size.toDouble / numPartitions) <= Int.MaxValue, + "Partition size cannot exceed Int.MaxValue") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { + val split = splitIn.asInstanceOf[RandomRDDPartition] + RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { + RandomRDD.getPartitions(size, numPartitions, rng, seed) + } +} + +private[mllib] object RandomRDD { + + def getPartitions(size: Long, + numPartitions: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + + val partitions = new Array[RandomRDDPartition](numPartitions) + var i = 0 + var start: Long = 0 + var end: Long = 0 + val random = new Random(seed) + while (i < numPartitions) { + end = ((i + 1) * size) / numPartitions + partitions(i) = new RandomRDDPartition(i, (end - start).toInt, rng, random.nextLong()) + start = end + i += 1 + } + partitions.asInstanceOf[Array[Partition]] + } + + // The RNG has to be reset every time the iterator is requested to guarantee same data + // every time the content of the RDD is examined. + def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = { + val generator = partition.generator.copy() + generator.setSeed(partition.seed) + Array.fill(partition.size)(generator.nextValue()).toIterator + } + + // The RNG has to be reset every time the iterator is requested to guarantee same data + // every time the content of the RDD is examined. + def getVectorIterator(partition: RandomRDDPartition, vectorSize: Int): Iterator[Vector] = { + val generator = partition.generator.copy() + generator.setSeed(partition.seed) + Array.fill(partition.size)(new DenseVector( + (0 until vectorSize).map { _ => generator.nextValue() }.toArray)).toIterator + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala new file mode 100644 index 0000000000000..974dec4c0b5ee --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.scalatest.FunSuite + +import org.apache.spark.util.StatCounter + +// TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged +class DistributionGeneratorSuite extends FunSuite { + + def apiChecks(gen: DistributionGenerator) { + + // resetting seed should generate the same sequence of random numbers + gen.setSeed(42L) + val array1 = (0 until 1000).map(_ => gen.nextValue()) + gen.setSeed(42L) + val array2 = (0 until 1000).map(_ => gen.nextValue()) + assert(array1.equals(array2)) + + // newInstance should contain a difference instance of the rng + // i.e. setting difference seeds for difference instances produces different sequences of + // random numbers. + val gen2 = gen.copy() + gen.setSeed(0L) + val array3 = (0 until 1000).map(_ => gen.nextValue()) + gen2.setSeed(1L) + val array4 = (0 until 1000).map(_ => gen2.nextValue()) + // Compare arrays instead of elements since individual elements can coincide by chance but the + // sequences should differ given two different seeds. + assert(!array3.equals(array4)) + + // test that setting the same seed in the copied instance produces the same sequence of numbers + gen.setSeed(0L) + val array5 = (0 until 1000).map(_ => gen.nextValue()) + gen2.setSeed(0L) + val array6 = (0 until 1000).map(_ => gen2.nextValue()) + assert(array5.equals(array6)) + } + + def distributionChecks(gen: DistributionGenerator, + mean: Double = 0.0, + stddev: Double = 1.0, + epsilon: Double = 0.01) { + for (seed <- 0 until 5) { + gen.setSeed(seed.toLong) + val sample = (0 until 100000).map { _ => gen.nextValue()} + val stats = new StatCounter(sample) + assert(math.abs(stats.mean - mean) < epsilon) + assert(math.abs(stats.stdev - stddev) < epsilon) + } + } + + test("UniformGenerator") { + val uniform = new UniformGenerator() + apiChecks(uniform) + // Stddev of uniform distribution = (ub - lb) / math.sqrt(12) + distributionChecks(uniform, 0.5, 1 / math.sqrt(12)) + } + + test("StandardNormalGenerator") { + val normal = new StandardNormalGenerator() + apiChecks(normal) + distributionChecks(normal, 0.0, 1.0) + } + + test("PoissonGenerator") { + // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced. + for (mean <- List(1.0, 5.0, 100.0)) { + val poisson = new PoissonGenerator(mean) + apiChecks(poisson) + distributionChecks(poisson, mean, math.sqrt(mean), 0.1) + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala new file mode 100644 index 0000000000000..6aa4f803df0f7 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD} +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.util.StatCounter + +/* + * Note: avoid including APIs that do not set the seed for the RNG in unit tests + * in order to guarantee deterministic behavior. + * + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged + */ +class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable { + + def testGeneratedRDD(rdd: RDD[Double], + expectedSize: Long, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 0.01) { + val stats = rdd.stats() + assert(expectedSize === stats.count) + assert(expectedNumPartitions === rdd.partitions.size) + assert(math.abs(stats.mean - expectedMean) < epsilon) + assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } + + // assume test RDDs are small + def testGeneratedVectorRDD(rdd: RDD[Vector], + expectedRows: Long, + expectedColumns: Int, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 0.01) { + assert(expectedNumPartitions === rdd.partitions.size) + val values = new ArrayBuffer[Double]() + rdd.collect.foreach { vector => { + assert(vector.size === expectedColumns) + values ++= vector.toArray + }} + assert(expectedRows === values.size / expectedColumns) + val stats = new StatCounter(values) + assert(math.abs(stats.mean - expectedMean) < epsilon) + assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } + + test("RandomRDD sizes") { + + // some cases where size % numParts != 0 to test getPartitions behaves correctly + for ((size, numPartitions) <- List((10000, 6), (12345, 1), (1000, 101))) { + val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) + assert(rdd.count() === size) + assert(rdd.partitions.size === numPartitions) + + // check that partition sizes are balanced + val partSizes = rdd.partitions.map(p => p.asInstanceOf[RandomRDDPartition].size.toDouble) + val partStats = new StatCounter(partSizes) + assert(partStats.max - partStats.min <= 1) + } + + // size > Int.MaxValue + val size = Int.MaxValue.toLong * 100L + val numPartitions = 101 + val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) + assert(rdd.partitions.size === numPartitions) + val count = rdd.partitions.foldLeft(0L) { (count, part) => + count + part.asInstanceOf[RandomRDDPartition].size + } + assert(count === size) + + // size needs to be positive + intercept[IllegalArgumentException] { new RandomRDD(sc, 0, 10, new UniformGenerator, 0L) } + + // numPartitions needs to be positive + intercept[IllegalArgumentException] { new RandomRDD(sc, 100, 0, new UniformGenerator, 0L) } + + // partition size needs to be <= Int.MaxValue + intercept[IllegalArgumentException] { + new RandomRDD(sc, Int.MaxValue.toLong * 100L, 99, new UniformGenerator, 0L) + } + } + + test("randomRDD for different distributions") { + val size = 100000L + val numPartitions = 10 + val poissonMean = 100.0 + + for (seed <- 0 until 5) { + val uniform = RandomRDDGenerators.uniformRDD(sc, size, numPartitions, seed) + testGeneratedRDD(uniform, size, numPartitions, 0.5, 1 / math.sqrt(12)) + + val normal = RandomRDDGenerators.normalRDD(sc, size, numPartitions, seed) + testGeneratedRDD(normal, size, numPartitions, 0.0, 1.0) + + val poisson = RandomRDDGenerators.poissonRDD(sc, poissonMean, size, numPartitions, seed) + testGeneratedRDD(poisson, size, numPartitions, poissonMean, math.sqrt(poissonMean), 0.1) + } + + // mock distribution to check that partitions have unique seeds + val random = RandomRDDGenerators.randomRDD(sc, new MockDistro(), 1000L, 1000, 0L) + assert(random.collect.size === random.collect.distinct.size) + } + + test("randomVectorRDD for different distributions") { + val rows = 1000L + val cols = 100 + val parts = 10 + val poissonMean = 100.0 + + for (seed <- 0 until 5) { + val uniform = RandomRDDGenerators.uniformVectorRDD(sc, rows, cols, parts, seed) + testGeneratedVectorRDD(uniform, rows, cols, parts, 0.5, 1 / math.sqrt(12)) + + val normal = RandomRDDGenerators.normalVectorRDD(sc, rows, cols, parts, seed) + testGeneratedVectorRDD(normal, rows, cols, parts, 0.0, 1.0) + + val poisson = RandomRDDGenerators.poissonVectorRDD(sc, poissonMean, rows, cols, parts, seed) + testGeneratedVectorRDD(poisson, rows, cols, parts, poissonMean, math.sqrt(poissonMean), 0.1) + } + } +} + +private[random] class MockDistro extends DistributionGenerator { + + var seed = 0L + + // This allows us to check that each partition has a different seed + override def nextValue(): Double = seed.toDouble + + override def setSeed(seed: Long) = this.seed = seed + + override def copy(): MockDistro = new MockDistro +}