Skip to content

Commit

Permalink
move whole text file API to Spark core
Browse files Browse the repository at this point in the history
  • Loading branch information
yinxusen committed Apr 2, 2014
1 parent d792cee commit cc97dca
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 170 deletions.
32 changes: 32 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -371,6 +372,37 @@ class SparkContext(
minSplits).map(pair => pair._2.toString)
}

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `val rdd = mlContext.wholeTextFile("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*/
def wholeTextFiles(path: String): RDD[(String, String)] = {
newAPIHadoopFile(
path,
classOf[WholeTextFileInputFormat],
classOf[String],
classOf[String])
}

/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,31 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*/
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `val rdd = mlContext.wholeTextFile("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*/
def wholeTextFiles(path: String): JavaRDD[(String, String)] = sc.wholeTextFiles(path)

/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.mllib.input
package org.apache.spark.input

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
Expand All @@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
* the value is the entire content of file.
*/

private[mllib] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
override protected def isSplitable(context: JobContext, file: Path): Boolean = false

override def createRecordReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.mllib.input
package org.apache.spark.input

import com.google.common.io.{ByteStreams, Closeables}

Expand All @@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file.
*/
private[mllib] class WholeTextFileRecordReader(
private[spark] class WholeTextFileRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.mllib.input
package org.apache.spark.input

import java.io.DataOutputStream
import java.io.File
Expand All @@ -31,11 +31,10 @@ import org.scalatest.FunSuite
import org.apache.hadoop.io.Text

import org.apache.spark.SparkContext
import org.apache.spark.mllib.MLContext._

/**
* Tests the correctness of
* [[org.apache.spark.mllib.input.WholeTextFileRecordReader WholeTextFileRecordReader]]. A temporary
* [[org.apache.spark.input.WholeTextFileRecordReader WholeTextFileRecordReader]]. A temporary
* directory is created as fake input. Temporal storage would be deleted in the end.
*/
class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
Expand Down Expand Up @@ -74,7 +73,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
createNativeFile(dir, filename, contents)
}

val res = sc.wholeTextFile(dir.toString).collect()
val res = sc.wholeTextFiles(dir.toString).collect()

assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
"Number of files read out does not fit with the actual value.")
Expand Down
162 changes: 0 additions & 162 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala

This file was deleted.

0 comments on commit cc97dca

Please sign in to comment.