From 05a113946c09f4e61c4f16b80ae3ae217e471e9f Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 19 Aug 2014 20:23:44 -0400 Subject: [PATCH 1/2] Added documentation and example for StreamingLR --- docs/mllib-linear-methods.md | 74 ++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md index e504cd7f0f578..69ac3a62bbf10 100644 --- a/docs/mllib-linear-methods.md +++ b/docs/mllib-linear-methods.md @@ -518,6 +518,80 @@ print("Mean Squared Error = " + str(MSE)) +## Streaming linear regression + +When data arrive in a streaming fashion, it is useful to fit regression models online, +updating the parameters of the model as new data arrive. MLlib currently supports +streaming linear regression using ordinary least squares. The fitting is similar +to that performed offline, except fitting occurs on each batch of data, so that +the model continually updates to reflect the data from the stream. + +### Examples + +The following example demonstrates how to load training and testing data from two different +input streams of text files, parse the streams as labeled points, fit a linear regression model +online to the first stream, and make predictions on the second stream. + +
+ +
+ +First, we import the necessary classes for parsing our input data and creating the model. + +{% highlight scala %} + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD + +{% endhighlight %} + +Then we make input streams for training and testing data. We assume a Streaming Context `ssc` +has already been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) +for more info. For this example, we use labeled points in training and testing streams, +but in practice you will likely want to use unlabeled Vectors for test data. + +{% highlight scala %} + +val trainingData = ssc.textFileStream('/training/data/dir').map(LabeledPoint.parse) +val testData = ssc.textFileStream('/testing/data/dir').map(LabeledPoint.parse) + +{% endhighlight %} + +We create our model by initializing the weights to 0 + +{% highlight scala %} + +val model = new StreamingLinearRegressionWithSGD() + .setInitialWeights(Vectors.zeros(3)) + +{% endhighlight %} + +Now we register the streams for training and testing and start the job. +Printing predictions alongside true labels lets us easily see the result. + +{% highlight scala %} + +model.trainOn(trainingData) +model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() + +ssc.start() +ssc.awaitTermination() + +{% endhighlight %} + +We can now save text files with data to the training or testing folders. +Each line should be a data point formatted as `(y,[x1,x2,x3])` where `y` is the label +and `x1,x2,x3` are the features. Anytime a text file is placed in `/training/data/dir` +the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions. +As you feed more data to the training directory, the predictions +will get better! + +
+ +
+ + ## Implementation (developer) Behind the scene, MLlib implements a simple distributed version of stochastic gradient descent From 568d250ebf47017e79f6112390c0af81ff50ab63 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 19 Aug 2014 20:45:50 -0400 Subject: [PATCH 2/2] Tweaks to wording / formatting --- docs/mllib-linear-methods.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md index 69ac3a62bbf10..9137f9dc1b692 100644 --- a/docs/mllib-linear-methods.md +++ b/docs/mllib-linear-methods.md @@ -521,7 +521,7 @@ print("Mean Squared Error = " + str(MSE)) ## Streaming linear regression When data arrive in a streaming fashion, it is useful to fit regression models online, -updating the parameters of the model as new data arrive. MLlib currently supports +updating the parameters of the model as new data arrives. MLlib currently supports streaming linear regression using ordinary least squares. The fitting is similar to that performed offline, except fitting occurs on each batch of data, so that the model continually updates to reflect the data from the stream. @@ -546,10 +546,10 @@ import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD {% endhighlight %} -Then we make input streams for training and testing data. We assume a Streaming Context `ssc` +Then we make input streams for training and testing data. We assume a StreamingContext `ssc` has already been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. For this example, we use labeled points in training and testing streams, -but in practice you will likely want to use unlabeled Vectors for test data. +but in practice you will likely want to use unlabeled vectors for test data. {% highlight scala %} @@ -562,8 +562,9 @@ We create our model by initializing the weights to 0 {% highlight scala %} +val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD() - .setInitialWeights(Vectors.zeros(3)) + .setInitialWeights(Vectors.zeros(numFeatures)) {% endhighlight %}