diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala deleted file mode 100644 index b90adca8b24b5..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.util - -import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint} -import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.streaming.StreamingContext - -/** - * Helper methods to load streaming data for MLLib applications. - */ -@Experimental -object MLStreamingUtils { - - /** - * Loads streaming labeled points from a stream of text files - * where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`. - * - * @param ssc Streaming context - * @param path Directory path in any Hadoop-supported file system URI - * @return Labeled points stored as a DStream[LabeledPoint] - */ - def loadLabeledPointsFromText(ssc: StreamingContext, path: String): DStream[LabeledPoint] = - ssc.textFileStream(path).map(LabeledPointParser.parse) - -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index aaf92a1a8869a..58074b71e08b9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -30,6 +30,8 @@ import org.apache.spark.util.random.BernoulliSampler import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.DStream /** * Helper methods to load, save and pre-process data used in ML Lib. @@ -212,6 +214,17 @@ object MLUtils { def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] = loadLabeledPoints(sc, dir, sc.defaultMinPartitions) + /** + * Loads streaming labeled points from a stream of text files + * where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`. + * + * @param ssc Streaming context + * @param dir Directory path in any Hadoop-supported file system URI + * @return Labeled points stored as a DStream[LabeledPoint] + */ + def loadStreamingLabeledPoints(ssc: StreamingContext, dir: String): DStream[LabeledPoint] = + ssc.textFileStream(dir).map(LabeledPointParser.parse) + /** * Load labeled data from a file. The data format used here is * , ...