diff --git a/mllib/src/main/java/org/apache/spark/mllib/input/BatchFilesRecordReader.java b/mllib/src/main/java/org/apache/spark/mllib/input/BatchFilesRecordReader.java deleted file mode 100644 index 5ec8b7ad18319..0000000000000 --- a/mllib/src/main/java/org/apache/spark/mllib/input/BatchFilesRecordReader.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.input; - -import java.io.IOException; - -import com.google.common.io.Closeables; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * Reads an entire file out in bytes format in format. - */ - -public class BatchFilesRecordReader extends RecordReader { - private long startOffset; - private int length; - private Path path; - - private String key = null; - private Text value = null; - - private boolean processed = false; - - private FileSystem fs; - - public BatchFilesRecordReader( - CombineFileSplit split, - TaskAttemptContext context, - Integer index) - throws IOException { - path = split.getPath(index); - startOffset = split.getOffset(index); - length = (int) split.getLength(index); - fs = path.getFileSystem(context.getConfiguration()); - } - - @Override - public void initialize(InputSplit arg0, TaskAttemptContext arg1) - throws IOException, InterruptedException { - } - - @Override - public void close() throws IOException { - } - - @Override - public float getProgress() throws IOException { - return processed ? 1.0f : 0.0f; - } - - @Override - public String getCurrentKey() throws IOException, InterruptedException { - return key; - } - - @Override - public Text getCurrentValue() throws IOException, InterruptedException{ - return value; - } - - @Override - public boolean nextKeyValue() throws IOException { - if (!processed) { - if (key == null) { - key = path.getName(); - } - if (value == null) { - value = new Text(); - } - - FSDataInputStream fileIn = null; - try { - fileIn = fs.open(path); - fileIn.seek(startOffset); - byte[] innerBuffer = new byte[length]; - IOUtils.readFully(fileIn, innerBuffer, 0, length); - value.set(innerBuffer, 0, length); - } finally { - Closeables.close(fileIn, false); - } - processed = true; - return true; - } - return false; - } -} diff --git a/mllib/src/main/java/org/apache/spark/mllib/input/BatchFilesInputFormat.java b/mllib/src/main/java/org/apache/spark/mllib/input/WholeTextFileInputFormat.java similarity index 70% rename from mllib/src/main/java/org/apache/spark/mllib/input/BatchFilesInputFormat.java rename to mllib/src/main/java/org/apache/spark/mllib/input/WholeTextFileInputFormat.java index b25df870f2d57..2a8853dc7d4d8 100644 --- a/mllib/src/main/java/org/apache/spark/mllib/input/BatchFilesInputFormat.java +++ b/mllib/src/main/java/org/apache/spark/mllib/input/WholeTextFileInputFormat.java @@ -31,22 +31,22 @@ /** * The specific InputFormat reads files in HDFS or local disk. It will be called by - * HadoopRDD to generate new BatchFilesRecordReader. + * HadoopRDD to generate new WholeTextFileRecordReader. */ -public class BatchFilesInputFormat - extends CombineFileInputFormat { +public class WholeTextFileInputFormat + extends CombineFileInputFormat { - @Override - protected boolean isSplitable(JobContext context, Path file) { - return false; - } - @Override - public RecordReader createRecordReader( - InputSplit split, - TaskAttemptContext context) throws IOException { - return new CombineFileRecordReader( - (CombineFileSplit)split, - context, - (Class) BatchFilesRecordReader.class); - } + @Override + protected boolean isSplitable(JobContext context, Path file) { + return false; + } + @Override + public RecordReader createRecordReader( + InputSplit split, + TaskAttemptContext context) throws IOException { + return new CombineFileRecordReader( + (CombineFileSplit)split, + context, + (Class) WholeTextFileRecordReader.class); + } } diff --git a/mllib/src/main/java/org/apache/spark/mllib/input/WholeTextFileRecordReader.java b/mllib/src/main/java/org/apache/spark/mllib/input/WholeTextFileRecordReader.java new file mode 100644 index 0000000000000..e7758c5af87a7 --- /dev/null +++ b/mllib/src/main/java/org/apache/spark/mllib/input/WholeTextFileRecordReader.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.input; + +import java.io.IOException; + +import com.google.common.io.Closeables; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Reads an entire file out in format. + */ + +public class WholeTextFileRecordReader extends RecordReader { + private Path path; + + private String key = null; + private Text value = null; + + private boolean processed = false; + + private FileSystem fs; + + public WholeTextFileRecordReader( + CombineFileSplit split, + TaskAttemptContext context, + Integer index) + throws IOException { + path = split.getPath(index); + fs = path.getFileSystem(context.getConfiguration()); + } + + @Override + public void initialize(InputSplit arg0, TaskAttemptContext arg1) + throws IOException, InterruptedException { + } + + @Override + public void close() throws IOException { + } + + @Override + public float getProgress() throws IOException { + return processed ? 1.0f : 0.0f; + } + + @Override + public String getCurrentKey() throws IOException, InterruptedException { + return key; + } + + @Override + public Text getCurrentValue() throws IOException, InterruptedException{ + return value; + } + + @Override + public boolean nextKeyValue() throws IOException { + if (!processed) { + if (key == null) { + key = path.getName(); + } + if (value == null) { + value = new Text(); + } + + FSDataInputStream fileIn = null; + try { + fileIn = fs.open(path); + byte[] innerBuffer = IOUtils.toByteArray(fileIn); + value.set(innerBuffer, 0, innerBuffer.length); + } finally { + Closeables.close(fileIn, false); + } + processed = true; + return true; + } + return false; + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 4bcae4b47aabe..962442d940b37 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -20,11 +20,11 @@ package org.apache.spark.mllib.util import org.apache.hadoop.io.Text import org.jblas.DoubleMatrix -import org.apache.spark.SparkContext +import org.apache.spark.mllib.input.WholeTextFileInputFormat +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.input.BatchFilesInputFormat /** * Helper methods to load, save and pre-process data used in ML Lib. @@ -124,7 +124,7 @@ object MLUtils { } /** - * Reads a bunch of small files from HDFS, or a local file system (available on all nodes), or any + * Reads a bunch of whole files from HDFS, or a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return an RDD[(String, String)]. * * @param path The directory you should specified, such as @@ -133,10 +133,10 @@ object MLUtils { * @return RDD[(fileName: String, content: String)] * i.e. the first is the file name of a file, the second one is its content. */ - def smallTextFiles(sc: SparkContext, path: String): RDD[(String, String)] = { + def wholeTextFile(sc: SparkContext, path: String): RDD[(String, String)] = { sc.newAPIHadoopFile( path, - classOf[BatchFilesInputFormat], + classOf[WholeTextFileInputFormat], classOf[String], classOf[Text]).mapValues(_.toString) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/SmallTextFilesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/WholeTextFileSuite.scala similarity index 68% rename from mllib/src/test/scala/org/apache/spark/mllib/util/SmallTextFilesSuite.scala rename to mllib/src/test/scala/org/apache/spark/mllib/util/WholeTextFileSuite.scala index 05a9939d2f163..768110daa7362 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/SmallTextFilesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/WholeTextFileSuite.scala @@ -18,38 +18,43 @@ package org.apache.spark.mllib.util -import java.io.{InputStreamReader, BufferedReader, DataOutputStream, FileOutputStream} +import java.io.{BufferedReader, DataOutputStream, FileOutputStream, InputStreamReader} import java.nio.file.Files import java.nio.file.{Path => JPath} import java.nio.file.{Paths => JPaths} +import scala.collection.immutable.IndexedSeq + import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite -import org.apache.hadoop.hdfs.MiniDFSCluster import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path +import org.apache.hadoop.hdfs.MiniDFSCluster import org.apache.hadoop.io.Text import org.apache.spark.mllib.util.MLUtils._ import org.apache.spark.SparkContext /** - * Tests HDFS IO and local disk IO of [[smallTextFiles]] in MLutils. HDFS tests create a mock DFS in + * Tests HDFS IO and local disk IO of [[wholeTextFile]] in MLutils. HDFS tests create a mock DFS in * memory, while local disk test create a temp directory. All these temporal storages are deleted * in the end. */ -class SmallTextFilesSuite extends FunSuite with BeforeAndAfterAll { +class WholeTextFileSuite extends FunSuite with BeforeAndAfterAll { private var sc: SparkContext = _ private var dfs: MiniDFSCluster = _ override def beforeAll() { sc = new SparkContext("local", "test") - sc.hadoopConfiguration.set("dfs.datanode.data.dir.perm", SmallTextFilesSuite.dirPermission()) - dfs = new MiniDFSCluster(sc.hadoopConfiguration, 4, true, - Array("/rack0", "/rack0", "/rack1", "/rack1"), - Array("host0", "host1", "host2", "host3")) + sc.hadoopConfiguration.set("dfs.datanode.data.dir.perm", WholeTextFileSuite.dirPermission()) + dfs = new MiniDFSCluster( + sc.hadoopConfiguration, + 4, + true, + Array("/rack0", "/rack0", "/rack1", "/rack1"), + Array("host0", "host1", "host2", "host3")) } override def afterAll() { @@ -72,16 +77,16 @@ class SmallTextFilesSuite extends FunSuite with BeforeAndAfterAll { /** * This code will test the behaviors on HDFS. There are three aspects to test: - * 1) is all files are read. - * 2) is the fileNames are read correctly. - * 3) is the contents must be the same. + * 1) is all files are read. + * 2) is the fileNames are read correctly. + * 3) is the contents must be the same. */ test("Small file input || HDFS IO") { val fs: FileSystem = dfs.getFileSystem val dir = "/foo/" val inputDir: Path = new Path(dir) - SmallTextFilesSuite.fileNames.zip(SmallTextFilesSuite.filesContents).foreach { + WholeTextFileSuite.fileNames.zip(WholeTextFileSuite.filesContents).foreach { case (fname, contents) => createHDFSFile(fs, inputDir, fname, contents) } @@ -93,15 +98,15 @@ class SmallTextFilesSuite extends FunSuite with BeforeAndAfterAll { s"hdfs://${dfs.getNameNode.getNameNodeAddress.getHostName}:${dfs.getNameNodePort}$dir" println(s"HDFS address dir is $hdfsAddressDir") - val res = smallTextFiles(sc, hdfsAddressDir).collect() + val res = wholeTextFile(sc, hdfsAddressDir).collect() - assert(res.size === SmallTextFilesSuite.fileNames.size, + assert(res.size === WholeTextFileSuite.fileNames.size, "Number of files read out do not fit with the actual value") for ((fname, contents) <- res) { - assert(SmallTextFilesSuite.fileNames.contains(fname), + assert(WholeTextFileSuite.fileNames.contains(fname), s"Missing file name $fname.") - assert(contents.hashCode === SmallTextFilesSuite.hashCodeOfContents(fname), + assert(contents.hashCode === WholeTextFileSuite.hashCodeOfContents(fname), s"file $fname contents can not match") } } @@ -115,35 +120,35 @@ class SmallTextFilesSuite extends FunSuite with BeforeAndAfterAll { /** * This code will test the behaviors on native file system. There are three aspects: - * 1) is all files are read. - * 2) is the fileNames are read correctly. - * 3) is the contents must be the same. + * 1) is all files are read. + * 2) is the fileNames are read correctly. + * 3) is the contents must be the same. */ test("Small file input || native disk IO") { sc.hadoopConfiguration.clear() - val dir = Files.createTempDirectory("smallfiles") + val dir = Files.createTempDirectory("wholefiles") println(s"native disk address is ${dir.toString}") - SmallTextFilesSuite.fileNames.zip(SmallTextFilesSuite.filesContents).foreach { + WholeTextFileSuite.fileNames.zip(WholeTextFileSuite.filesContents).foreach { case (fname, contents) => createNativeFile(dir, fname, contents) } - val res = smallTextFiles(sc, dir.toString).collect() + val res = wholeTextFile(sc, dir.toString).collect() - assert(res.size === SmallTextFilesSuite.fileNames.size, + assert(res.size === WholeTextFileSuite.fileNames.size, "Number of files read out do not fit with the actual value") for ((fname, contents) <- res) { - assert(SmallTextFilesSuite.fileNames.contains(fname), + assert(WholeTextFileSuite.fileNames.contains(fname), s"Missing file name $fname.") - assert(contents.hashCode === SmallTextFilesSuite.hashCodeOfContents(fname), + assert(contents.hashCode === WholeTextFileSuite.hashCodeOfContents(fname), s"file $fname contents can not match") } - SmallTextFilesSuite.fileNames.foreach { fname => + WholeTextFileSuite.fileNames.foreach { fname => Files.deleteIfExists(JPaths.get(s"${dir.toString}/$fname")) } Files.deleteIfExists(dir) @@ -151,23 +156,18 @@ class SmallTextFilesSuite extends FunSuite with BeforeAndAfterAll { } /** - * Some final values are defined here. chineseWordsSpark is refer to the Chinese character version - * of "Spark", we use UTF-8 to encode the bytes together, with a '\n' in the end. fileNames and - * fileContents represent the test data that will be used later. hashCodeOfContents is a Map of - * fileName to the hashcode of contents, which is used for the comparison of contents, i.e. the - * "read in" contents should be same with the "read out" ones. + * Some final values are defined here. fileNames and fileContents represent the test data that will + * be used later. hashCodeOfContents is a Map of fileName to the hashcode of contents, which is used + * for the comparison of contents. */ -object SmallTextFilesSuite { - private val chineseWordsSpark = Array( - 0xe7.toByte, 0x81.toByte, 0xab.toByte, - 0xe8.toByte, 0x8a.toByte, 0xb1.toByte, - '\n'.toByte) +object WholeTextFileSuite { + private val testWords: IndexedSeq[Byte] = "Spark is easy to use.\n".map(_.toByte) private val fileNames = Array("part-00000", "part-00001", "part-00002") - private val filesContents = Array(7, 70, 700).map { upperBound => - Stream.continually(chineseWordsSpark.toList.toStream).flatten.take(upperBound).toArray + private val filesContents = Array(10, 100, 1000).map { upperBound => + Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray } private val hashCodeOfContents = fileNames.zip(filesContents).map { case (fname, contents) => diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 24d47d7e4631f..4dc666592fc2f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -361,6 +361,7 @@ object SparkBuild extends Build { "com.sun.jersey" % "jersey-core" % "1.8" % "test", "org.mortbay.jetty" % "jetty" % "6.1.26" % "test", "org.apache.hadoop" % "hadoop-test" % hadoopVersion % "test", + "commons-io" % "commons-io" % "2.4", "org.jblas" % "jblas" % "1.2.3" ) )