Skip to content

Commit

Permalink
update naive Bayes to use lowercase model type strings
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed May 20, 2015
1 parent 60336e3 commit 17bba53
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,20 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, DenseVector, SparseVector, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}


/**
* Model for Naive Bayes Classifiers.
*
* @param labels list of labels
* @param pi log of class priors, whose dimension is C, number of labels
* @param theta log of class conditional probabilities, whose dimension is C-by-D,
* where D is number of features
* @param modelType The type of NB model to fit can be "Multinomial" or "Bernoulli"
* @param modelType The type of NB model to fit can be "multinomial" or "bernoulli"
*/
class NaiveBayesModel private[mllib] (
val labels: Array[Double],
Expand All @@ -48,11 +47,13 @@ class NaiveBayesModel private[mllib] (
val modelType: String)
extends ClassificationModel with Serializable with Saveable {

import NaiveBayes.{Bernoulli, Multinomial, supportedModelTypes}

private val piVector = new DenseVector(pi)
private val thetaMatrix = new DenseMatrix(labels.size, theta(0).size, theta.flatten, true)
private val thetaMatrix = new DenseMatrix(labels.length, theta(0).length, theta.flatten, true)

private[mllib] def this(labels: Array[Double], pi: Array[Double], theta: Array[Array[Double]]) =
this(labels, pi, theta, "Multinomial")
this(labels, pi, theta, NaiveBayes.Multinomial)

/** A Java-friendly constructor that takes three Iterable parameters. */
private[mllib] def this(
Expand All @@ -61,21 +62,20 @@ class NaiveBayesModel private[mllib] (
theta: JIterable[JIterable[Double]]) =
this(labels.asScala.toArray, pi.asScala.toArray, theta.asScala.toArray.map(_.asScala.toArray))

require(supportedModelTypes.contains(modelType), s"Invalid model type $modelType.")

// Bernoulli scoring requires log(condprob) if 1, log(1-condprob) if 0.
// This precomputes log(1.0 - exp(theta)) and its sum which are used for the linear algebra
// application of this condition (in predict function).
private val (thetaMinusNegTheta, negThetaSum) = modelType match {
case "Multinomial" => (None, None)
case "Bernoulli" =>
case Multinomial => (None, None)
case Bernoulli =>
val negTheta = thetaMatrix.map(value => math.log(1.0 - math.exp(value)))
val ones = new DenseVector(Array.fill(thetaMatrix.numCols){1.0})
val thetaMinusNegTheta = thetaMatrix.map { value =>
value - math.log(1.0 - math.exp(value))
}
(Option(thetaMinusNegTheta), Option(negTheta.multiply(ones)))
case _ =>
// This should never happen.
throw new UnknownError(s"NaiveBayesModel was created with an unknown ModelType: $modelType")
}

override def predict(testData: RDD[Vector]): RDD[Double] = {
Expand All @@ -88,24 +88,21 @@ class NaiveBayesModel private[mllib] (

override def predict(testData: Vector): Double = {
modelType match {
case "Multinomial" =>
case Multinomial =>
val prob = thetaMatrix.multiply(testData)
BLAS.axpy(1.0, piVector, prob)
labels(prob.argmax)
case "Bernoulli" =>
case Bernoulli =>
testData.foreachActive { (index, value) =>
if (value != 0.0 && value != 1.0) {
throw new SparkException(
s"Bernoulli Naive Bayes requires 0 or 1 feature values but found $testData.")
s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.")
}
}
val prob = thetaMinusNegTheta.get.multiply(testData)
BLAS.axpy(1.0, piVector, prob)
BLAS.axpy(1.0, negThetaSum.get, prob)
labels(prob.argmax)
case _ =>
// This should never happen.
throw new UnknownError(s"NaiveBayesModel was created with an unknown ModelType: $modelType")
}
}

Expand Down Expand Up @@ -230,16 +227,16 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
s"($loadedClassName, $version). Supported:\n" +
s" ($classNameV1_0, 1.0)")
}
assert(model.pi.size == numClasses,
assert(model.pi.length == numClasses,
s"NaiveBayesModel.load expected $numClasses classes," +
s" but class priors vector pi had ${model.pi.size} elements")
assert(model.theta.size == numClasses,
s" but class priors vector pi had ${model.pi.length} elements")
assert(model.theta.length == numClasses,
s"NaiveBayesModel.load expected $numClasses classes," +
s" but class conditionals array theta had ${model.theta.size} elements")
assert(model.theta.forall(_.size == numFeatures),
s" but class conditionals array theta had ${model.theta.length} elements")
assert(model.theta.forall(_.length == numFeatures),
s"NaiveBayesModel.load expected $numFeatures features," +
s" but class conditionals array theta had elements of size:" +
s" ${model.theta.map(_.size).mkString(",")}")
s" ${model.theta.map(_.length).mkString(",")}")
model
}
}
Expand All @@ -257,9 +254,11 @@ class NaiveBayes private (
private var lambda: Double,
private var modelType: String) extends Serializable with Logging {

def this(lambda: Double) = this(lambda, "Multinomial")
import NaiveBayes.{Bernoulli, Multinomial}

def this(lambda: Double) = this(lambda, NaiveBayes.Multinomial)

def this() = this(1.0, "Multinomial")
def this() = this(1.0, NaiveBayes.Multinomial)

/** Set the smoothing parameter. Default: 1.0. */
def setLambda(lambda: Double): NaiveBayes = {
Expand All @@ -272,12 +271,11 @@ class NaiveBayes private (

/**
* Set the model type using a string (case-sensitive).
* Supported options: "Multinomial" and "Bernoulli".
* (default: Multinomial)
* Supported options: "multinomial" (default) and "bernoulli".
*/
def setModelType(modelType:String): NaiveBayes = {
def setModelType(modelType: String): NaiveBayes = {
require(NaiveBayes.supportedModelTypes.contains(modelType),
s"NaiveBayes was created with an unknown ModelType: $modelType")
s"NaiveBayes was created with an unknown ModelType: $modelType.")
this.modelType = modelType
this
}
Expand Down Expand Up @@ -308,7 +306,7 @@ class NaiveBayes private (
}
if (!values.forall(v => v == 0.0 || v == 1.0)) {
throw new SparkException(
s"Bernoulli Naive Bayes requires 0 or 1 feature values but found $v.")
s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.")
}
}

Expand All @@ -317,7 +315,7 @@ class NaiveBayes private (
// TODO: similar to reduceByKeyLocally to save one stage.
val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)](
createCombiner = (v: Vector) => {
if (modelType == "Bernoulli") {
if (modelType == Bernoulli) {
requireZeroOneBernoulliValues(v)
} else {
requireNonnegativeValues(v)
Expand Down Expand Up @@ -352,11 +350,8 @@ class NaiveBayes private (
labels(i) = label
pi(i) = math.log(n + lambda) - piLogDenom
val thetaLogDenom = modelType match {
case "Multinomial" => math.log(sumTermFreqs.values.sum + numFeatures * lambda)
case "Bernoulli" => math.log(n + 2.0 * lambda)
case _ =>
// This should never happen.
throw new UnknownError(s"NaiveBayes was created with an unknown ModelType: $modelType")
case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda)
case Bernoulli => math.log(n + 2.0 * lambda)
}
var j = 0
while (j < numFeatures) {
Expand All @@ -375,8 +370,14 @@ class NaiveBayes private (
*/
object NaiveBayes {

/** String name for multinomial model type. */
private[classification] val Multinomial: String = "multinomial"

/** String name for Bernoulli model type. */
private[classification] val Bernoulli: String = "bernoulli"

/* Set of modelTypes that NaiveBayes supports */
private[mllib] val supportedModelTypes = Set("Multinomial", "Bernoulli")
private[classification] val supportedModelTypes = Set(Multinomial, Bernoulli)

/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
Expand Down Expand Up @@ -406,7 +407,7 @@ object NaiveBayes {
* @param lambda The smoothing parameter
*/
def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = {
new NaiveBayes(lambda, "Multinomial").run(input)
new NaiveBayes(lambda, Multinomial).run(input)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.mllib.classification

import scala.util.Random

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum, Axis}
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}
import breeze.stats.distributions.{Multinomial => BrzMultinomial}

import org.scalatest.FunSuite

import org.apache.spark.SparkException
Expand All @@ -30,9 +29,10 @@ import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.apache.spark.util.Utils


object NaiveBayesSuite {

import NaiveBayes.{Multinomial, Bernoulli}

private def calcLabel(p: Double, pi: Array[Double]): Int = {
var sum = 0.0
for (j <- 0 until pi.length) {
Expand All @@ -48,7 +48,7 @@ object NaiveBayesSuite {
theta: Array[Array[Double]], // CXD
nPoints: Int,
seed: Int,
modelType: String = "Multinomial",
modelType: String = Multinomial,
sample: Int = 10): Seq[LabeledPoint] = {
val D = theta(0).length
val rnd = new Random(seed)
Expand All @@ -58,19 +58,16 @@ object NaiveBayesSuite {
for (i <- 0 until nPoints) yield {
val y = calcLabel(rnd.nextDouble(), _pi)
val xi = modelType match {
case "Bernoulli" => Array.tabulate[Double] (D) { j =>
case Bernoulli => Array.tabulate[Double] (D) { j =>
if (rnd.nextDouble () < _theta(y)(j) ) 1 else 0
}
case "Multinomial" =>
case Multinomial =>
val mult = BrzMultinomial(BDV(_theta(y)))
val emptyMap = (0 until D).map(x => (x, 0.0)).toMap
val counts = emptyMap ++ mult.sample(sample).groupBy(x => x).map {
case (index, reps) => (index, reps.size.toDouble)
}
counts.toArray.sortBy(_._1).map(_._2)
case _ =>
// This should never happen.
throw new UnknownError(s"NaiveBayesSuite found unknown ModelType: $modelType")
}

LabeledPoint(y, Vectors.dense(xi))
Expand All @@ -79,17 +76,17 @@ object NaiveBayesSuite {

/** Bernoulli NaiveBayes with binary labels, 3 features */
private val binaryBernoulliModel = new NaiveBayesModel(labels = Array(0.0, 1.0),
pi = Array(0.2, 0.8), theta = Array(Array(0.1, 0.3, 0.6), Array(0.2, 0.4, 0.4)),
"Bernoulli")
pi = Array(0.2, 0.8), theta = Array(Array(0.1, 0.3, 0.6), Array(0.2, 0.4, 0.4)), Bernoulli)

/** Multinomial NaiveBayes with binary labels, 3 features */
private val binaryMultinomialModel = new NaiveBayesModel(labels = Array(0.0, 1.0),
pi = Array(0.2, 0.8), theta = Array(Array(0.1, 0.3, 0.6), Array(0.2, 0.4, 0.4)),
"Multinomial")
pi = Array(0.2, 0.8), theta = Array(Array(0.1, 0.3, 0.6), Array(0.2, 0.4, 0.4)), Multinomial)
}

class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {

import NaiveBayes.{Multinomial, Bernoulli}

def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOfPredictions = predictions.zip(input).count {
case (prediction, expected) =>
Expand Down Expand Up @@ -117,6 +114,11 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {
}
}

test("model types") {
assert(Multinomial === "multinomial")
assert(Bernoulli === "bernoulli")
}

test("get, set params") {
val nb = new NaiveBayes()
nb.setLambda(2.0)
Expand All @@ -134,16 +136,15 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {
Array(0.10, 0.10, 0.70, 0.10) // label 2
).map(_.map(math.log))

val testData = NaiveBayesSuite.generateNaiveBayesInput(
pi, theta, nPoints, 42, "Multinomial")
val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42, Multinomial)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()

val model = NaiveBayes.train(testRDD, 1.0, "Multinomial")
val model = NaiveBayes.train(testRDD, 1.0, Multinomial)
validateModelFit(pi, theta, model)

val validationData = NaiveBayesSuite.generateNaiveBayesInput(
pi, theta, nPoints, 17, "Multinomial")
pi, theta, nPoints, 17, Bernoulli)
val validationRDD = sc.parallelize(validationData, 2)

// Test prediction on RDD.
Expand All @@ -163,15 +164,15 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {
).map(_.map(math.log))

val testData = NaiveBayesSuite.generateNaiveBayesInput(
pi, theta, nPoints, 45, "Bernoulli")
pi, theta, nPoints, 45, Bernoulli)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()

val model = NaiveBayes.train(testRDD, 1.0, "Bernoulli")
val model = NaiveBayes.train(testRDD, 1.0, Bernoulli)
validateModelFit(pi, theta, model)

val validationData = NaiveBayesSuite.generateNaiveBayesInput(
pi, theta, nPoints, 20, "Bernoulli")
pi, theta, nPoints, 20, Bernoulli)
val validationRDD = sc.parallelize(validationData, 2)

// Test prediction on RDD.
Expand Down Expand Up @@ -216,7 +217,7 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {
LabeledPoint(1.0, Vectors.dense(0.0)))

intercept[SparkException] {
NaiveBayes.train(sc.makeRDD(badTrain, 2), 1.0, "Bernoulli")
NaiveBayes.train(sc.makeRDD(badTrain, 2), 1.0, Bernoulli)
}

val okTrain = Seq(
Expand All @@ -235,7 +236,7 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {
Vectors.dense(1.0),
Vectors.dense(0.0))

val model = NaiveBayes.train(sc.makeRDD(okTrain, 2), 1.0, "Bernoulli")
val model = NaiveBayes.train(sc.makeRDD(okTrain, 2), 1.0, Bernoulli)
intercept[SparkException] {
model.predict(sc.makeRDD(badPredict, 2)).collect()
}
Expand Down Expand Up @@ -275,7 +276,7 @@ class NaiveBayesSuite extends FunSuite with MLlibTestSparkContext {
assert(model.labels === sameModel.labels)
assert(model.pi === sameModel.pi)
assert(model.theta === sameModel.theta)
assert(model.modelType === "Multinomial")
assert(model.modelType === Multinomial)
} finally {
Utils.deleteRecursively(tempDir)
}
Expand Down

0 comments on commit 17bba53

Please sign in to comment.