From ad9bdc2dc6a774d4f83a66366ae7a0f581c625f3 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:15:06 -0700 Subject: [PATCH] Use labeled points and predictOnValues in examples --- docs/mllib-clustering.md | 5 +++-- .../spark/examples/mllib/StreamingKMeans.scala | 13 +++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index d0aa1a8bb0678..2428d84ec5e03 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -180,6 +180,7 @@ First we import the neccessary classes. {% highlight scala %} import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.clustering.StreamingKMeans {% endhighlight %} @@ -189,7 +190,7 @@ Then we make an input stream of vectors for training, as well as one for testing {% highlight scala %} val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) -val testData = ssc.textFileStream("/testing/data/dir").map(Vectors.parse) +val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse) {% endhighlight %} @@ -211,7 +212,7 @@ Now register the streams for training and testing and start the job, printing th {% highlight scala %} model.trainOn(trainingData) -model.predictOn(testData).print() +model.predictOnValues(testData).print() ssc.start() ssc.awaitTermination() diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala index 8dc410be8d86b..79416ae734c52 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala @@ -18,6 +18,7 @@ package org.apache.spark.examples.mllib import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.clustering.StreamingKMeans import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} @@ -27,9 +28,13 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} * on another stream, where the data streams arrive as text files * into two different directories. * - * The rows of the text files must be vector data in the form + * The rows of the training text files must be vector data in the form * `[x1,x2,x3,...,xn]` - * Where n is the number of dimensions. n must be the same for train and test. + * Where n is the number of dimensions. + * + * The rows of the test text files must be labeled data in the form + * `(y,[x1,x2,x3,...,xn])` + * Where y is some identifier. n must be the same for train and test. * * Usage: StreamingKmeans * @@ -57,7 +62,7 @@ object StreamingKMeans { val ssc = new StreamingContext(conf, Seconds(args(2).toLong)) val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse) - val testData = ssc.textFileStream(args(1)).map(Vectors.parse) + val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse) val model = new StreamingKMeans() .setK(args(3).toInt) @@ -65,7 +70,7 @@ object StreamingKMeans { .setRandomCenters(args(4).toInt) model.trainOn(trainingData) - model.predictOn(testData).print() + model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination()