Skip to content

Commit

Permalink
impl with RandomRDD
Browse files Browse the repository at this point in the history
  • Loading branch information
dorx committed Jul 22, 2014
1 parent 92d6f1c commit d56cacb
Show file tree
Hide file tree
Showing 6 changed files with 552 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,54 +22,84 @@ import scala.util.Random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand

import org.apache.spark.util.random.Pseudorandom
import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}

/**
* Trait for random number generators that generate i.i.d values from a distribution
* Trait for random number generators that generate i.i.d values from a distribution.
*/
trait DistributionGenerator extends Pseudorandom with Cloneable {
trait DistributionGenerator extends Pseudorandom with Serializable {

/**
* @return An i.i.d sample as a Double from an underlying distribution
* @return An i.i.d sample as a Double from an underlying distribution.
*/
def nextValue(): Double

override def clone(): DistributionGenerator = super.clone().asInstanceOf[DistributionGenerator]
/**
* @return A copy of the DistributionGenerator with a new instance of the rng object used in the
* class when applicable. Each partition has a unique seed and therefore requires its
* own instance of the DistributionGenerator.
*/
def newInstance(): DistributionGenerator
}

class NormalGenerator(val mean: Double = 0.0, val stddev: Double = 1.0)
extends DistributionGenerator {
/**
* Generates i.i.d. samples from U[0.0, 1.0]
*/
class UniformGenerator() extends DistributionGenerator {
// XORShiftRandom for better performance. Thread safety isn't necessary here.
private val random = new XORShiftRandom()

require(stddev >= 0.0, "Standard deviation cannot be negative.")
/**
* @return An i.i.d sample as a Double from U[0.0, 1.0].
*/
override def nextValue(): Double = {
random.nextDouble()
}

private val random = new Random()
private val standard = mean == 0.0 && stddev == 1.0
/** Set random seed. */
override def setSeed(seed: Long) = random.setSeed(seed)

override def newInstance(): UniformGenerator = new UniformGenerator()
}

/**
* Generates i.i.d. samples from the Standard Normal Distribution.
*/
class StandardNormalGenerator() extends DistributionGenerator {
// XORShiftRandom for better performance. Thread safety isn't necessary here.
private val random = new XORShiftRandom()

/**
* @return An i.i.d sample as a Double from the Normal distribution
* @return An i.i.d sample as a Double from the Normal distribution.
*/
override def nextValue(): Double = {
if (standard) {
random.nextGaussian()
} else {
mean + stddev * random.nextGaussian()
}
}

/** Set random seed. */
override def setSeed(seed: Long) = random.setSeed(seed)

override def newInstance(): StandardNormalGenerator = new StandardNormalGenerator()
}

class PoissonGenerator(val lambda: Double = 0.0) extends DistributionGenerator {
/**
* Generates i.i.d. samples from the Poisson distribution with the given mean.
*
* @param mean mean for the Poisson distribution.
*/
class PoissonGenerator(val mean: Double) extends DistributionGenerator {

private var rng = new Poisson(mean, new DRand)

private var rng = new Poisson(lambda, new DRand)
/**
* @return An i.i.d sample as a Double from the Poisson distribution
* @return An i.i.d sample as a Double from the Poisson distribution.
*/
override def nextValue(): Double = rng.nextDouble()

/** Set random seed. */
override def setSeed(seed: Long) {
rng = new Poisson(lambda, new DRand(seed.toInt))
rng = new Poisson(mean, new DRand(seed.toInt))
}

override def newInstance(): PoissonGenerator = new PoissonGenerator(mean)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.random

import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

// TODO add Scaladocs once API fully approved
object RandomRDDGenerators {

def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
val uniform = new UniformGenerator()
randomRDD(sc, size, numPartitions, uniform, seed)
}

def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
uniformRDD(sc, size, sc.defaultParallelism, seed)
}

def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
}

def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
}

def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
val normal = new StandardNormalGenerator()
randomRDD(sc, size, numPartitions, normal, seed)
}

def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
normalRDD(sc, size, sc.defaultParallelism, seed)
}

def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
normalRDD(sc, size, numPartitions, Utils.random.nextLong)
}

def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
}

def poissonRDD(sc: SparkContext,
size: Long,
numPartitions: Int,
mean: Double,
seed: Long): RDD[Double] = {
val poisson = new PoissonGenerator(mean)
randomRDD(sc, size, numPartitions, poisson, seed)
}

def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): RDD[Double] = {
poissonRDD(sc, size, sc.defaultParallelism, mean, seed)
}

def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: Double): RDD[Double] = {
poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong)
}

def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] = {
poissonRDD(sc, size, sc.defaultParallelism, mean, Utils.random.nextLong)
}

def randomRDD(sc: SparkContext,
size: Long,
numPartitions: Int,
distribution: DistributionGenerator,
seed: Long): RDD[Double] = {
new RandomRDD(sc, size, numPartitions, distribution, seed)
}

def randomRDD(sc: SparkContext,
size: Long,
distribution: DistributionGenerator,
seed: Long): RDD[Double] = {
randomRDD(sc, size, sc.defaultParallelism, distribution, seed)
}

def randomRDD(sc: SparkContext,
size: Long,
numPartitions: Int,
distribution: DistributionGenerator): RDD[Double] = {
randomRDD(sc, size, numPartitions, distribution, Utils.random.nextLong)
}

def randomRDD(sc: SparkContext,
size: Long,
distribution: DistributionGenerator): RDD[Double] = {
randomRDD(sc, size, sc.defaultParallelism, distribution, Utils.random.nextLong)
}

// TODO Generator RDD[Vector] from multivariate distribution

def uniformVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
numPartitions: Int,
seed: Long): RDD[Vector] = {
val uniform = new UniformGenerator()
randomVectorRDD(sc, numRows, numColumns, numPartitions, uniform, seed)
}

def uniformVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
seed: Long): RDD[Vector] = {
uniformVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, seed)
}

def uniformVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
numPartitions: Int): RDD[Vector] = {
uniformVectorRDD(sc, numRows, numColumns, numPartitions, Utils.random.nextLong)
}

def uniformVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int): RDD[Vector] = {
uniformVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, Utils.random.nextLong)
}

def normalVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
numPartitions: Int,
seed: Long): RDD[Vector] = {
val uniform = new StandardNormalGenerator()
randomVectorRDD(sc, numRows, numColumns, numPartitions, uniform, seed)
}

def normalVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
seed: Long): RDD[Vector] = {
normalVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, seed)
}

def normalVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
numPartitions: Int): RDD[Vector] = {
normalVectorRDD(sc, numRows, numColumns, numPartitions, Utils.random.nextLong)
}

def normalVectorRDD(sc: SparkContext, numRows: Long, numColumns: Int): RDD[Vector] = {
normalVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, Utils.random.nextLong)
}

def poissonVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
numPartitions: Int,
mean: Double,
seed: Long): RDD[Vector] = {
val poisson = new PoissonGenerator(mean)
randomVectorRDD(sc, numRows, numColumns, numPartitions, poisson, seed)
}

def poissonVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
mean: Double,
seed: Long): RDD[Vector] = {
poissonVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, mean, seed)
}

def poissonVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
numPartitions: Int,
mean: Double): RDD[Vector] = {
poissonVectorRDD(sc, numRows, numColumns, numPartitions, mean, Utils.random.nextLong)
}

def poissonVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
mean: Double): RDD[Vector] = {
val poisson = new PoissonGenerator(mean)
randomVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, poisson, Utils.random.nextLong)
}

def randomVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
numPartitions: Int,
rng: DistributionGenerator,
seed: Long): RDD[Vector] = {
new RandomVectorRDD(sc, numRows, numColumns, numPartitions, rng, seed)
}

def randomVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
rng: DistributionGenerator,
seed: Long): RDD[Vector] = {
randomVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, rng, seed)
}

def randomVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
numPartitions: Int,
rng: DistributionGenerator): RDD[Vector] = {
randomVectorRDD(sc, numRows, numColumns, numPartitions, rng, Utils.random.nextLong)
}

def randomVectorRDD(sc: SparkContext,
numRows: Long,
numColumns: Int,
rng: DistributionGenerator): RDD[Vector] = {
randomVectorRDD(sc, numRows, numColumns, sc.defaultParallelism, rng, Utils.random.nextLong)
}
}
Loading

0 comments on commit d56cacb

Please sign in to comment.