From 18c3a494c4ccca1843748023f21cda34c5ecd9a8 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 20 Apr 2015 09:43:42 +0800 Subject: [PATCH 1/4] add FactorizationMachine.scala --- .../regression/FactorizationMachine.scala | 454 ++++++++++++++++++ 1 file changed, 454 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala new file mode 100644 index 0000000000000..ea72305302d2c --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala @@ -0,0 +1,454 @@ +package org.apache.spark.mllib.regression + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import scala.util.Random + +import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.optimization.{GradientDescent, Updater, Gradient} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.mllib.util.Loader._ +import org.apache.spark.mllib.util.{Loader, Saveable} +import org.apache.spark.sql.{DataFrame, SQLContext} + +/** + * Created by zrf on 4/13/15. + */ + +object FMWithSGD { + + def train(input: RDD[LabeledPoint], + numIterations: Int = 100, + stepSize: Double = 0.01, + miniBatchFraction: Double = 0.1, + dim: (Boolean, Boolean, Int) = (true, true, 8), + regParam: (Double, Double, Double) = (1, 1, 1), + initStd: Double = 0.01): FMModel = { + + new FMWithSGD(stepSize, numIterations, dim, regParam, miniBatchFraction) + .setInitStd(initStd) + .run(input) + } +} + + +class FMWithSGD(private var stepSize: Double, + private var numIterations: Int, + private var dim: (Boolean, Boolean, Int), + private var regParam: (Double, Double, Double), + private var miniBatchFraction: Double) + extends Serializable with Logging { + + private var k0: Boolean = dim._1 + private var k1: Boolean = dim._2 + private var k2: Int = dim._3 + + private var r0: Double = regParam._1 + private var r1: Double = regParam._2 + private var r2: Double = regParam._3 + + private var initMean: Double = 0 + private var initStd: Double = 0.01 + + private var numFeatures: Int = -1 + private var minLabel: Double = Double.MaxValue + private var maxLabel: Double = Double.MinValue + + /** + * + */ + def setDim(dim: (Boolean, Boolean, Int)): this.type = { + require(dim._3 > 0) + this.k0 = dim._1 + this.k1 = dim._2 + this.k2 = dim._3 + this + } + + /** + * + * @param addIntercept determines if the global bias term w0 should be used + * @param add1Way determines if one-way interactions (bias terms for each variable) + * @param numFactors the number of factors that are used for pairwise interactions + * @return + */ + def setDim(addIntercept: Boolean = true, add1Way: Boolean = true, numFactors: Int = 8): this.type = { + setDim((addIntercept, add1Way, numFactors)) + } + + def setRegParam(reg: (Double, Double, Double)): this.type = { + require(reg._1 >= 0 && reg._2 >= 0 && reg._3 >= 0) + this.r0 = reg._1 + this.r1 = reg._2 + this.r2 = reg._3 + this + } + + /** + * + * @param regIntercept intercept regularization + * @param reg1Way one-way regularization + * @param reg2Way two-way regularization + * @return + */ + def setRegParam(regIntercept: Double = 0, reg1Way: Double = 0, reg2Way: Double = 0): this.type = { + setRegParam((regIntercept, reg1Way, reg2Way)) + } + + + def setInitStd(initStd: Double): this.type = { + require(initStd > 0) + this.initStd = initStd + this + } + + /** + * Set fraction of data to be used for each SGD iteration. Default 0.1. + */ + def setMiniBatchFraction(miniBatchFraction: Double): this.type = { + require(miniBatchFraction > 0 && miniBatchFraction <= 1) + this.miniBatchFraction = miniBatchFraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(numIterations: Int): this.type = { + require(numIterations > 0) + this.numIterations = numIterations + this + } + + /** + * Set the initial step size of SGD for the first step. Default 0.01. + * In subsequent steps, the step size will decrease with stepSize/sqrt(t) + */ + def setStepSize(stepSize: Double): this.type = { + require(stepSize >= 0) + this.stepSize = stepSize + this + } + + + /** + * v : numFeatures * numFactors + w : numFeatures + w0 : 1 + * @return + */ + private def genInitWeights(): Vector = { + (k0, k1) match { + case (true, true) => + Vectors.dense(Array.fill(numFeatures * k2)(Random.nextGaussian() * initStd + initMean) ++ + Array.fill(numFeatures + 1)(0.0)) + + case (true, false) => + Vectors.dense(Array.fill(numFeatures * k2)(Random.nextGaussian() * initStd + initMean) ++ + Array(0.0)) + + case (false, true) => + Vectors.dense(Array.fill(numFeatures * k2)(Random.nextGaussian() * initStd + initMean) ++ + Array.fill(numFeatures)(0.0)) + + case (false, false) => + Vectors.dense(Array.fill(numFeatures * k2)(Random.nextGaussian() * initStd + initMean)) + } + } + + private def createModel(weights: Vector): FMModel = { + + val values = weights.toArray + + val v = new DenseMatrix(k2, numFeatures, values.slice(0, numFeatures * k2)) + + val w = if (k1) + Some(Vectors.dense(values.slice(numFeatures * k2, numFeatures * k2 + numFeatures))) + else None + + val w0 = if (k0) values.last else 0.0 + + new FMModel(v, w, w0) + } + + def run(input: RDD[LabeledPoint]): FMModel = { + + if (input.getStorageLevel == StorageLevel.NONE) { + logWarning("The input data is not directly cached, which may hurt performance if its" + + " parent RDDs are also uncached.") + } + + this.numFeatures = input.first().features.size + require(numFeatures > 0) + + val (minT, maxT) = input.map(_.label).aggregate[(Double, Double)]((Double.MaxValue, Double.MinValue))({ + case ((min, max), v) => + (Math.min(min, v), Math.max(max, v)) + }, { + case ((min1, max1), (min2, max2)) => + (Math.min(min1, min2), Math.max(max1, max2)) + }) + + this.minLabel = minT + this.maxLabel = maxT + + val gradient = new FMSGDGradient(k0, k1, k2, numFeatures, minLabel, maxLabel) + + val updater = new FMSGDUpdater(k0, k1, k2, r0, r1, r2, numFeatures) + + val optimizer = new GradientDescent(gradient, updater) + .setStepSize(stepSize) + .setNumIterations(numIterations) + .setMiniBatchFraction(miniBatchFraction) + + val data = input.map(l => (l.label, l.features)).cache() + + val initWeights = genInitWeights() + + val weights = optimizer.optimize(data, initWeights) + + createModel(weights) + } +} + + +class FMSGDGradient(val k0: Boolean, val k1: Boolean, val k2: Int, + val numFeatures: Int, val min: Double, val max: Double) extends Gradient { + + private def predict(data: Vector, weights: Vector): (Double, Array[Double]) = { + var pred = if (k0) + weights(weights.size - 1) + else 0.0 + + if (k1) { + val pos = numFeatures * k2 + data.foreachActive { + case (i, v) => + pred += weights(pos + i) * v + } + } + + val sum = Array.fill(k2)(0.0) + for (f <- 0 until k2) { + var sumSqr = 0.0 + data.foreachActive { + case (i, v) => + val d = weights(i * k2 + f) * v + sum(f) += d + sumSqr += d * d + } + pred += (sum(f) * sum(f) - sumSqr) / 2 + } + + pred = Math.max(pred, min) + pred = Math.min(pred, max) + + (pred, sum) + } + + + private def cumulateGradient(data: Vector, weights: Vector, + diff: Double, sum: Array[Double], cumGrad: Vector): Unit = { + cumGrad match { + case vec: DenseVector => + val cumValues = vec.values + + if (k0) + cumValues(cumValues.length - 1) += diff + + if (k1) { + val pos = numFeatures * k2 + data.foreachActive { + case (i, v) => + cumValues(pos + i) += v * diff + } + } + + data.foreachActive { + case (i, v) => + val pos = i * k2 + for (f <- 0 until k2) { + cumValues(pos + f) += (sum(f) * v - weights(pos + f) * v * v) * diff + } + } + + case _ => + throw new IllegalArgumentException( + s"cumulateGradient only supports adding to a dense vector but got type ${cumGrad.getClass}.") + } + } + + + override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = { + val cumGradient = Vectors.dense(Array.fill(weights.size)(0.0)) + val loss = compute(data, label, weights, cumGradient) + (cumGradient, loss) + } + + override def compute(data: Vector, label: Double, weights: Vector, cumGradient: Vector): Double = { + require(data.size == numFeatures) + val (pred, sum) = predict(data, weights) + val diff = pred - label + cumulateGradient(data, weights, diff, sum, cumGradient) + diff * diff / 2 + } +} + + +class FMSGDUpdater(val k0: Boolean, val k1: Boolean, val k2: Int, + val r0: Double, val r1: Double, val r2: Double, + val numFeatures: Int) extends Updater { + + override def compute(weightsOld: Vector, gradient: Vector, + stepSize: Double, iter: Int, regParam: Double): (Vector, Double) = { + val thisIterStepSize = stepSize / math.sqrt(iter) + // val thisIterStepSize = stepSize + val len = weightsOld.size + + val weightsNew = Array.fill(len)(0.0) + var regVal = 0.0 + + if (k0) { + weightsNew(len - 1) = weightsOld(len - 1) - thisIterStepSize * (gradient(len - 1) + r0 * weightsOld(len - 1)) + regVal += r0 * weightsNew(len - 1) * weightsNew(len - 1) + } + + if (k1) { + for (i <- numFeatures * k2 until numFeatures * k2 + numFeatures) { + weightsNew(i) = weightsOld(i) - thisIterStepSize * (gradient(i) + r1 * weightsOld(i)) + regVal += r1 * weightsNew(i) * weightsNew(i) + } + } + + for (i <- 0 until numFeatures * k2) { + weightsNew(i) = weightsOld(i) - thisIterStepSize * (gradient(i) + r2 * weightsOld(i)) + regVal += r2 * weightsNew(i) * weightsNew(i) + } + + (Vectors.dense(weightsNew), regVal / 2) + } +} + + +class FMModel(val factorMatrix: Matrix, + val weightVector: Option[Vector], + val intercept: Double) extends Serializable with Saveable { + + val numFeatures = factorMatrix.numCols + val numFactors = factorMatrix.numRows + + def predict(testData: Vector): Double = { + require(testData.size == numFeatures) + + var pred = intercept + if (weightVector.isDefined) { + testData.foreachActive { + case (i, v) => + pred += weightVector.get(i) * v + } + } + + for (f <- 0 until numFactors) { + var sum = 0.0 + var sumSqr = 0.0 + testData.foreachActive { + case (i, v) => + val d = factorMatrix(f, i) * v + sum += d + sumSqr += d * d + } + pred += (sum * sum - sumSqr) / 2 + } + + pred + } + + def predict(testData: RDD[Vector]): RDD[Double] = { + testData.mapPartitions { + _.map { + vec => + predict(vec) + } + } + } + + override protected def formatVersion: String = "1.0" + + override def save(sc: SparkContext, path: String): Unit = { + val data = FMModel.SaveLoadV1_0.Data(factorMatrix, weightVector, intercept) + FMModel.SaveLoadV1_0.save(sc, path, data) + } +} + +object FMModel extends Loader[FMModel] { + + private object SaveLoadV1_0 { + + def thisFormatVersion = "1.0" + + def thisClassName = this.getClass.getName + + /** Model data for model import/export */ + case class Data(factorMatrix: Matrix, weightVector: Option[Vector], intercept: Double) + + def save(sc: SparkContext, path: String, data: Data): Unit = { + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + // Create JSON metadata. + val metadata = compact(render( + ("class" -> this.getClass.getName) ~ ("version" -> thisFormatVersion) ~ + ("numFeatures" -> data.factorMatrix.numCols) ~ ("numFactors" -> data.factorMatrix.numRows))) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + + // Create Parquet data. + val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF() + dataRDD.saveAsParquetFile(dataPath(path)) + } + + def load(sc: SparkContext, path: String): FMModel = { + val sqlContext = new SQLContext(sc) + // Load Parquet data. + val dataRDD = sqlContext.parquetFile(dataPath(path)) + // Check schema explicitly since erasure makes it hard to use match-case for checking. + checkSchema[Data](dataRDD.schema) + val dataArray = dataRDD.select("factorMatrix", "weightVector", "intercept").take(1) + assert(dataArray.length == 1, s"Unable to load FMModel data from: ${dataPath(path)}") + val data = dataArray(0) + val factorMatrix = data.getAs[Matrix](0) + val weightVector = data.getAs[Option[Vector]](1) + val intercept = data.getDouble(2) + new FMModel(factorMatrix, weightVector, intercept) + } + } + + override def load(sc: SparkContext, path: String): FMModel = { + implicit val formats = DefaultFormats + + val (loadedClassName, version, metadata) = loadMetadata(sc, path) + val classNameV1_0 = SaveLoadV1_0.thisClassName + + (loadedClassName, version) match { + case (className, "1.0") if className == classNameV1_0 => + val numFeatures = (metadata \ "numFeatures").extract[Int] + val numFactors = (metadata \ "numFactors").extract[Int] + val model = SaveLoadV1_0.load(sc, path) + assert(model.factorMatrix.numCols == numFeatures, + s"FMModel.load expected $numFeatures features," + + s" but factorMatrix had columns of size:" + + s" ${model.factorMatrix.numCols}") + assert(model.factorMatrix.numRows == numFactors, + s"FMModel.load expected $numFactors factors," + + s" but factorMatrix had rows of size:" + + s" ${model.factorMatrix.numRows}") + model + + case _ => throw new Exception( + s"FMModel.load did not recognize model with (className, format version):" + + s"($loadedClassName, $version). Supported:\n" + + s" ($classNameV1_0, 1.0)") + } + } +} + From 2b49d74805e02a21cf0e5a0aa022493f19868a4b Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 20 Apr 2015 17:13:04 +0800 Subject: [PATCH 2/4] Add Test --- .../regression/FactorizationMachine.scala | 67 +++-- .../FactorizationMachineSuite.scala | 244 ++++++++++++++++++ 2 files changed, 295 insertions(+), 16 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/regression/FactorizationMachineSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala index ea72305302d2c..eca2f1b08c9de 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala @@ -22,17 +22,45 @@ import org.apache.spark.sql.{DataFrame, SQLContext} object FMWithSGD { def train(input: RDD[LabeledPoint], - numIterations: Int = 100, - stepSize: Double = 0.01, - miniBatchFraction: Double = 0.1, - dim: (Boolean, Boolean, Int) = (true, true, 8), - regParam: (Double, Double, Double) = (1, 1, 1), - initStd: Double = 0.01): FMModel = { - + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + dim: (Boolean, Boolean, Int), + regParam: (Double, Double, Double), + initStd: Double): FMModel = { new FMWithSGD(stepSize, numIterations, dim, regParam, miniBatchFraction) .setInitStd(initStd) .run(input) } + + def train(input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + regParam: (Double, Double, Double), + dim: (Boolean, Boolean, Int)): FMModel = { + new FMWithSGD(stepSize, numIterations, dim, regParam, miniBatchFraction) + .setInitStd(0.01) + .run(input) + } + + def train(input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + dim: (Boolean, Boolean, Int)): FMModel = { + new FMWithSGD(stepSize, numIterations, dim, (0, 0.01, 0.01), miniBatchFraction) + .setInitStd(0.01) + .run(input) + } + + def train(input: RDD[LabeledPoint], + numIterations: Int, + dim: (Boolean, Boolean, Int)): FMModel = { + new FMWithSGD(1.0, numIterations, dim, (0, 0.01, 0.01), 1.0) + .setInitStd(0.01) + .run(input) + } } @@ -43,6 +71,13 @@ class FMWithSGD(private var stepSize: Double, private var miniBatchFraction: Double) extends Serializable with Logging { + + /** + * Construct a Lasso object with default parameters: {stepSize: 1.0, numIterations: 100, + * dim: (true, true, 8), regParam: (0, 0.01, 0.01), miniBatchFraction: 1.0}. + */ + def this() = this(1.0, 100, (true, true, 8), (0, 0.01, 0.01), 1.0) + private var k0: Boolean = dim._1 private var k1: Boolean = dim._2 private var k2: Int = dim._3 @@ -139,7 +174,7 @@ class FMWithSGD(private var stepSize: Double, * v : numFeatures * numFactors + w : numFeatures + w0 : 1 * @return */ - private def genInitWeights(): Vector = { + private def generateInitWeights(): Vector = { (k0, k1) match { case (true, true) => Vectors.dense(Array.fill(numFeatures * k2)(Random.nextGaussian() * initStd + initMean) ++ @@ -194,9 +229,9 @@ class FMWithSGD(private var stepSize: Double, this.minLabel = minT this.maxLabel = maxT - val gradient = new FMSGDGradient(k0, k1, k2, numFeatures, minLabel, maxLabel) + val gradient = new FMGradient(k0, k1, k2, numFeatures, minLabel, maxLabel) - val updater = new FMSGDUpdater(k0, k1, k2, r0, r1, r2, numFeatures) + val updater = new FMUpdater(k0, k1, k2, r0, r1, r2, numFeatures) val optimizer = new GradientDescent(gradient, updater) .setStepSize(stepSize) @@ -205,7 +240,7 @@ class FMWithSGD(private var stepSize: Double, val data = input.map(l => (l.label, l.features)).cache() - val initWeights = genInitWeights() + val initWeights = generateInitWeights() val weights = optimizer.optimize(data, initWeights) @@ -214,8 +249,8 @@ class FMWithSGD(private var stepSize: Double, } -class FMSGDGradient(val k0: Boolean, val k1: Boolean, val k2: Int, - val numFeatures: Int, val min: Double, val max: Double) extends Gradient { +class FMGradient(val k0: Boolean, val k1: Boolean, val k2: Int, + val numFeatures: Int, val min: Double, val max: Double) extends Gradient { private def predict(data: Vector, weights: Vector): (Double, Array[Double]) = { var pred = if (k0) @@ -297,9 +332,9 @@ class FMSGDGradient(val k0: Boolean, val k1: Boolean, val k2: Int, } -class FMSGDUpdater(val k0: Boolean, val k1: Boolean, val k2: Int, - val r0: Double, val r1: Double, val r2: Double, - val numFeatures: Int) extends Updater { +class FMUpdater(val k0: Boolean, val k1: Boolean, val k2: Int, + val r0: Double, val r1: Double, val r2: Double, + val numFeatures: Int) extends Updater { override def compute(weightsOld: Vector, gradient: Vector, stepSize: Double, iter: Int, regParam: Double): (Vector, Double) = { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/FactorizationMachineSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/FactorizationMachineSuite.scala new file mode 100644 index 0000000000000..7901a9a16bb3e --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/FactorizationMachineSuite.scala @@ -0,0 +1,244 @@ +package org.apache.spark.mllib.regression + +import com.github.fommil.netlib.BLAS._ +import com.github.fommil.netlib.BLAS.{getInstance => blas} + +import org.scalatest.FunSuite +import scala.util.Random + +import org.apache.spark.mllib.linalg.{Matrix, DenseMatrix, Vectors} +import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} +import org.apache.spark.util.Utils + +/** + * Created by zrf on 4/20/15. + */ + +private object FactorizationMachineSuite { + + /** + * model with numFactors = 8 and numFeatures = 10 + */ + val modle = new FMModel(new DenseMatrix(8, 10, Array.fill(80)(Random.nextGaussian())), + Some(Vectors.dense(Array.fill(10)(Random.nextDouble()))), 1.0) +} + +class FactorizationMachineSuite extends FunSuite with MLlibTestSparkContext { + + def predictionError(predictions: Seq[Double], input: Seq[LabeledPoint]): Double = { + predictions.zip(input).map { case (prediction, expected) => + (prediction - expected.label) * (prediction - expected.label) + }.reduceLeft(_ + _) / predictions.size + } + + + def generateMultiLinearInput(intercept: Double, + weights: Array[Double], + factors: Matrix, + nPoints: Int, + seed: Int, + eps: Double = 0.1): Seq[LabeledPoint] = { + require(weights.length == factors.numCols) + + val rnd = new Random(seed) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(2 * rnd.nextDouble - 1.0)) + val y = x.map { xi => + var l = blas.ddot(weights.length, xi, 1, weights, 1) + intercept + eps * rnd.nextGaussian() + + for (i <- 0 until weights.length; j <- i + 1 until weights.length) { + var d = 0.0 + for (f <- 0 until factors.numRows) { + d += factors(f, i) * factors(f, j) + } + l += xi(i) * xi(j) * d + } + l + } + + y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2))) + } + + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).count { case (prediction, expected) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected.label) > 0.5 + } + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + + // Test if we can correctly learn Y = 0.1 + 1.2*X1 - 1.3*X2 + 20*X1*X2 + test("regression with weights and intercept") { + + val testRDD = sc.parallelize(generateMultiLinearInput(0.1, Array(1.2, -1.3), + new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 20)), 1000, 42), 2).cache() + + val model = FMWithSGD.train(testRDD, numIterations = 100, stepSize = 1, + miniBatchFraction = 0.1, dim = (true, true, 2), regParam = (0, 0.01, 0.01)) + + val w0 = model.intercept + assert(w0 >= 0.05 && w0 <= 0.15, w0 + " not in [0.05, 0.15]") + + val w = model.weightVector + assert(w.isDefined && w.get.size == 2) + val w1 = w.get(0) + assert(w1 >= 1.1 && w1 <= 1.3, w1 + " not in [1.1, 1.3]") + val w2 = w.get(1) + assert(w2 >= -1.4 && w2 <= -1.2, w2 + " not in [-1.4, -1.2]") + + val v = model.factorMatrix + val v12 = v(0, 0) * v(0, 1) + v(1, 0) * v(1, 1) + assert(v12 >= 18 && v12 <= 22, v12 + " not in [18, 22]") + + val validationData = generateMultiLinearInput(0.1, Array(1.2, -1.3), + new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 20)), 1000, 17) + + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + + // Test if we can correctly learn Y = 1.2*X1 - 1.3*X2 + 20*X1*X2 + test("regression with weights but without intercept") { + + val testRDD = sc.parallelize(generateMultiLinearInput(0.0, Array(1.2, -1.3), + new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 20)), 1000, 42), 2).cache() + + val model = FMWithSGD.train(testRDD, numIterations = 100, stepSize = 1, + miniBatchFraction = 0.1, dim = (false, true, 2), regParam = (0, 0.01, 0.01)) + + val w0 = model.intercept + assert(w0 == 0.0, w0 + " not equal 0") + + val w = model.weightVector + assert(w.isDefined && w.get.size == 2) + val w1 = w.get(0) + assert(w1 >= 1.1 && w1 <= 1.3, w1 + " not in [1.1, 1.3]") + val w2 = w.get(1) + assert(w2 >= -1.4 && w2 <= -1.2, w2 + " not in [-1.4, -1.2]") + + val v = model.factorMatrix + val v12 = v(0, 0) * v(0, 1) + v(1, 0) * v(1, 1) + assert(v12 >= 18 && v12 <= 22, v12 + " not in [18, 22]") + + val validationData = generateMultiLinearInput(0.0, Array(1.2, -1.3), + new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 20)), 1000, 17) + + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + + + // Test if we can correctly learn Y = 0.1 + 20*X1*X2 + test("regression with intercept but without weights") { + + val testRDD = sc.parallelize(generateMultiLinearInput(0.1, Array(0.0, 0.0), + new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 20)), 1000, 42), 2).cache() + + val model = FMWithSGD.train(testRDD, numIterations = 100, stepSize = 1, + miniBatchFraction = 0.1, dim = (true, false, 2), regParam = (0, 0.01, 0.01)) + + val w0 = model.intercept + assert(w0 >= 0.05 && w0 <= 0.15, w0 + " not in [0.05, 0.15]") + + val w = model.weightVector + assert(w == None) + + val v = model.factorMatrix + val v12 = v(0, 0) * v(0, 1) + v(1, 0) * v(1, 1) + assert(v12 >= 18 && v12 <= 22, v12 + " not in [18, 22]") + + val validationData = generateMultiLinearInput(0.1, Array(0.0, 0.0), + new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 20)), 1000, 17) + + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + + // Test if we can correctly learn Y = 20*X1*X2 + test("regression without weights or intercept") { + + val testRDD = sc.parallelize(generateMultiLinearInput(0.0, Array(0.0, 0.0), + new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 20)), 1000, 42), 2).cache() + + val model = FMWithSGD.train(testRDD, numIterations = 100, stepSize = 1, + miniBatchFraction = 0.1, dim = (false, false, 2), regParam = (0, 0.01, 0.01)) + + val w0 = model.intercept + assert(w0 == 0.0, w0 + " not equal 0") + + val w = model.weightVector + assert(w == None) + + val v = model.factorMatrix + val v12 = v(0, 0) * v(0, 1) + v(1, 0) * v(1, 1) + assert(v12 >= 18 && v12 <= 22, v12 + " not in [18, 22]") + + val validationData = generateMultiLinearInput(0.0, Array(0.0, 0.0), + new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 20)), 1000, 17) + + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + + test("model save/load") { + val model = FactorizationMachineSuite.modle + + val tempDir = Utils.createTempDir() + val path = tempDir.toURI.toString + + // Save model, load it back, and compare. + try { + model.save(sc, path) + val sameModel = FMModel.load(sc, path) + assert(model.factorMatrix == sameModel.factorMatrix) + assert(model.weightVector == sameModel.weightVector) + assert(model.intercept == sameModel.intercept) + } finally { + Utils.deleteRecursively(tempDir) + } + } +} + + +class FactorizationMachineSuiteClusterSuite extends FunSuite with LocalClusterSparkContext { + + test("task size should be small in both training and prediction") { + val m = 4 + val n = 200000 + val points = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) => + val random = new Random(idx) + iter.map(i => LabeledPoint(1.0, Vectors.dense(Array.fill(n)(random.nextDouble())))) + }.cache() + // If we serialize data directly in the task closure, the size of the serialized task would be + // greater than 1MB and hence Spark would throw an error. + val model = FMWithSGD.train(points, numIterations = 2, dim = (true, true, 8)) + val predictions = model.predict(points.map(_.features)) + } +} \ No newline at end of file From dfd0c3b54ec016b8cbc233d44432c9e38ed9570e Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 20 Apr 2015 18:46:35 +0800 Subject: [PATCH 3/4] Add comments --- .../regression/FactorizationMachine.scala | 91 +++++++++++++++---- .../FactorizationMachineSuite.scala | 21 +++-- 2 files changed, 82 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala index eca2f1b08c9de..a7b270fdef971 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala @@ -21,6 +21,24 @@ import org.apache.spark.sql.{DataFrame, SQLContext} object FMWithSGD { + /** + * Train a Factoriaton Machine Regression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used + * in gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param dim A (Boolean,Boolean,Int) 3-Tuple stands for whether the global bias term should be used, whether the + * one-way interactions should be used, and the number of factors that are used for pairwise + * interactions, respectively. + * @param regParam A (Double,Double,Double) 3-Tuple stands for the regularization parameters of intercept, one-way + * interactions and pairwise interactions, respectively. + * @param initStd Standard Deviation used for factorization matrix initialization. + */ def train(input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, @@ -73,7 +91,7 @@ class FMWithSGD(private var stepSize: Double, /** - * Construct a Lasso object with default parameters: {stepSize: 1.0, numIterations: 100, + * Construct an object with default parameters: {stepSize: 1.0, numIterations: 100, * dim: (true, true, 8), regParam: (0, 0.01, 0.01), miniBatchFraction: 1.0}. */ def this() = this(1.0, 100, (true, true, 8), (0, 0.01, 0.01), 1.0) @@ -94,7 +112,8 @@ class FMWithSGD(private var stepSize: Double, private var maxLabel: Double = Double.MinValue /** - * + * A (Boolean,Boolean,Int) 3-Tuple stands for whether the global bias term should be used, whether the one-way + * interactions should be used, and the number of factors that are used for pairwise interactions, respectively. */ def setDim(dim: (Boolean, Boolean, Int)): this.type = { require(dim._3 > 0) @@ -109,32 +128,37 @@ class FMWithSGD(private var stepSize: Double, * @param addIntercept determines if the global bias term w0 should be used * @param add1Way determines if one-way interactions (bias terms for each variable) * @param numFactors the number of factors that are used for pairwise interactions - * @return */ def setDim(addIntercept: Boolean = true, add1Way: Boolean = true, numFactors: Int = 8): this.type = { setDim((addIntercept, add1Way, numFactors)) } - def setRegParam(reg: (Double, Double, Double)): this.type = { - require(reg._1 >= 0 && reg._2 >= 0 && reg._3 >= 0) - this.r0 = reg._1 - this.r1 = reg._2 - this.r2 = reg._3 + + /** + * @param regParams A (Double,Double,Double) 3-Tuple stands for the regularization parameters of intercept, one-way + * interactions and pairwise interactions, respectively. + */ + def setRegParam(regParams: (Double, Double, Double)): this.type = { + require(regParams._1 >= 0 && regParams._2 >= 0 && regParams._3 >= 0) + this.r0 = regParams._1 + this.r1 = regParams._2 + this.r2 = regParams._3 this } /** - * * @param regIntercept intercept regularization - * @param reg1Way one-way regularization - * @param reg2Way two-way regularization - * @return + * @param reg1Way one-way interactions regularization + * @param reg2Way pairwise interactions regularization */ def setRegParam(regIntercept: Double = 0, reg1Way: Double = 0, reg2Way: Double = 0): this.type = { setRegParam((regIntercept, reg1Way, reg2Way)) } + /** + * @param initStd Standard Deviation used for factorization matrix initialization. + */ def setInitStd(initStd: Double): this.type = { require(initStd > 0) this.initStd = initStd @@ -142,7 +166,7 @@ class FMWithSGD(private var stepSize: Double, } /** - * Set fraction of data to be used for each SGD iteration. Default 0.1. + * Set fraction of data to be used for each SGD iteration. */ def setMiniBatchFraction(miniBatchFraction: Double): this.type = { require(miniBatchFraction > 0 && miniBatchFraction <= 1) @@ -151,7 +175,7 @@ class FMWithSGD(private var stepSize: Double, } /** - * Set the number of iterations for SGD. Default 100. + * Set the number of iterations for SGD. */ def setNumIterations(numIterations: Int): this.type = { require(numIterations > 0) @@ -160,7 +184,7 @@ class FMWithSGD(private var stepSize: Double, } /** - * Set the initial step size of SGD for the first step. Default 0.01. + * Set the initial step size of SGD for the first step. * In subsequent steps, the step size will decrease with stepSize/sqrt(t) */ def setStepSize(stepSize: Double): this.type = { @@ -171,8 +195,11 @@ class FMWithSGD(private var stepSize: Double, /** - * v : numFeatures * numFactors + w : numFeatures + w0 : 1 - * @return + * Encode the FMModel to a dense vector, with its first numFeatures * numFactors elements representing the + * factorization matrix v, sequential numFeaturs elements representing the one-way interactions weights w if k1 is + * set to true, and the last element representing the intercept w0 if k0 is set to true. + * The factorization matrix v is initialized by Gaussinan(0, initStd). + * v : numFeatures * numFactors + w : [numFeatures] + w0 : [1] */ private def generateInitWeights(): Vector = { (k0, k1) match { @@ -193,6 +220,10 @@ class FMWithSGD(private var stepSize: Double, } } + + /** + * Create a FMModle from vector. + */ private def createModel(weights: Vector): FMModel = { val values = weights.toArray @@ -208,6 +239,11 @@ class FMWithSGD(private var stepSize: Double, new FMModel(v, w, w0) } + + /** + * Run the algorithm with the configured parameters on an input RDD + * of LabeledPoint entries. + */ def run(input: RDD[LabeledPoint]): FMModel = { if (input.getStorageLevel == StorageLevel.NONE) { @@ -249,6 +285,12 @@ class FMWithSGD(private var stepSize: Double, } +/** + * :: DeveloperApi :: + * Compute gradient and loss for a Least-squared loss function, as used in linear regression. + * For the detailed mathematical derivation, see the reference at + * http://doi.acm.org/10.1145/2168752.2168771 + */ class FMGradient(val k0: Boolean, val k1: Boolean, val k2: Int, val numFeatures: Int, val min: Double, val max: Double) extends Gradient { @@ -290,8 +332,9 @@ class FMGradient(val k0: Boolean, val k1: Boolean, val k2: Int, case vec: DenseVector => val cumValues = vec.values - if (k0) + if (k0) { cumValues(cumValues.length - 1) += diff + } if (k1) { val pos = numFeatures * k2 @@ -331,7 +374,12 @@ class FMGradient(val k0: Boolean, val k1: Boolean, val k2: Int, } } - +/** + * :: DeveloperApi :: + * Updater for L2 regularized problems. + * R(w) = 1/2 ||w||^2 + * Uses a step-size decreasing with the square root of the number of iterations. + */ class FMUpdater(val k0: Boolean, val k1: Boolean, val k2: Int, val r0: Double, val r1: Double, val r2: Double, val numFeatures: Int) extends Updater { @@ -367,6 +415,9 @@ class FMUpdater(val k0: Boolean, val k1: Boolean, val k2: Int, } +/** + * Factorization Machine model. + */ class FMModel(val factorMatrix: Matrix, val weightVector: Option[Vector], val intercept: Double) extends Serializable with Saveable { @@ -423,7 +474,7 @@ object FMModel extends Loader[FMModel] { def thisFormatVersion = "1.0" - def thisClassName = this.getClass.getName + def thisClassName = "org.apache.spark.mllib.regression.FMModel" /** Model data for model import/export */ case class Data(factorMatrix: Matrix, weightVector: Option[Vector], intercept: Double) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/FactorizationMachineSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/FactorizationMachineSuite.scala index 7901a9a16bb3e..0b3e426805c54 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/FactorizationMachineSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/FactorizationMachineSuite.scala @@ -43,17 +43,18 @@ class FactorizationMachineSuite extends FunSuite with MLlibTestSparkContext { val rnd = new Random(seed) val x = Array.fill[Array[Double]](nPoints)( Array.fill[Double](weights.length)(2 * rnd.nextDouble - 1.0)) - val y = x.map { xi => - var l = blas.ddot(weights.length, xi, 1, weights, 1) + intercept + eps * rnd.nextGaussian() - - for (i <- 0 until weights.length; j <- i + 1 until weights.length) { - var d = 0.0 - for (f <- 0 until factors.numRows) { - d += factors(f, i) * factors(f, j) + val y = x.map { + xi => + var l = blas.ddot(weights.length, xi, 1, weights, 1) + intercept + eps * rnd.nextGaussian() + + for (i <- 0 until weights.length; j <- i + 1 until weights.length) { + var d = 0.0 + for (f <- 0 until factors.numRows) { + d += factors(f, i) * factors(f, j) + } + l += xi(i) * xi(j) * d } - l += xi(i) * xi(j) * d - } - l + l } y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2))) From ba33af8fa67c568b5130392e7c3a38456405cfc0 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 20 Apr 2015 18:53:42 +0800 Subject: [PATCH 4/4] some comments --- .../apache/spark/mllib/regression/FactorizationMachine.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala index a7b270fdef971..2408a07714762 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/FactorizationMachine.scala @@ -222,7 +222,7 @@ class FMWithSGD(private var stepSize: Double, /** - * Create a FMModle from vector. + * Create a FMModle from an encoded vector. */ private def createModel(weights: Vector): FMModel = { @@ -377,7 +377,6 @@ class FMGradient(val k0: Boolean, val k1: Boolean, val k2: Int, /** * :: DeveloperApi :: * Updater for L2 regularized problems. - * R(w) = 1/2 ||w||^2 * Uses a step-size decreasing with the square root of the number of iterations. */ class FMUpdater(val k0: Boolean, val k1: Boolean, val k2: Int,