From 84035f13028553c595a46a0c0f60b5e448ae3db5 Mon Sep 17 00:00:00 2001 From: Kevin Mader Date: Wed, 30 Jul 2014 13:26:24 +0200 Subject: [PATCH] adding binary and byte file support spark --- .../scala/org/apache/spark/SparkContext.scala | 22 +++- .../apache/spark/input/BinaryFileInput.scala | 102 ++++++++++++++++++ .../org/apache/spark/rdd/BinaryFileRDD.scala | 48 +++++++++ 3 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala create mode 100644 core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3e6addeaf04a8..af9a1ce935821 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -40,7 +40,7 @@ import org.apache.mesos.MesosNativeLibrary import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} -import org.apache.spark.input.WholeTextFileInputFormat +import org.apache.spark.input.{WholeTextFileInputFormat,ByteInputFormat} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -510,6 +510,26 @@ class SparkContext(config: SparkConf) extends Logging { minPartitions).setName(path) } + /** + * Get an RDD for a Hadoop-readable dataset as byte-streams for each file (useful for binary data) + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + */ + def byteFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = { + val job = new NewHadoopJob(hadoopConfiguration) + NewFileInputFormat.addInputPath(job, new Path(path)) + val updateConf = job.getConfiguration + new BinaryFileRDD( + this, + classOf[ByteInputFormat], + classOf[String], + classOf[Array[Byte]], + updateConf, + minPartitions).setName(path) + } + /** * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable), diff --git a/core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala b/core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala new file mode 100644 index 0000000000000..683784c8873b2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala @@ -0,0 +1,102 @@ +package org.apache.spark.input + +import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.RecordReader +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader + + +/** + * The new (Hadoop 2.0) InputFormat for while binary files (not be to be confused with the recordreader itself) + */ +@serializable abstract class BinaryFileInputFormat[T] + extends CombineFileInputFormat[String,T] { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + /** + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minPartitions: Int) { + val files = listStatus(context) + val totalLen = files.map { file => + if (file.isDir) 0L else file.getLen + }.sum + + /** val maxSplitSize = Math.ceil(totalLen * 1.0 / + (if (minPartitions == 0) 1 else minPartitions)).toLong **/ + val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong + super.setMaxSplitSize(maxSplitSize) + } + + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String,T] + +} + +/** + * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole tiff file + * out in a key-value pair, where the key is the file path and the value is the entire content of + * the file as a TSliceReader (to keep the size information + */ +@serializable abstract class BinaryRecordReader[T]( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends RecordReader[String, T] { + + private val path = split.getPath(index) + private val fs = path.getFileSystem(context.getConfiguration) + + // True means the current file has been processed, then skip it. + private var processed = false + + private val key = path.toString + private var value: T = null.asInstanceOf[T] + override def initialize(split: InputSplit, context: TaskAttemptContext) = {} + override def close() = {} + + override def getProgress = if (processed) 1.0f else 0.0f + + override def getCurrentKey = key + + override def getCurrentValue = value + + override def nextKeyValue = { + if (!processed) { + val fileIn = fs.open(path) + val innerBuffer = ByteStreams.toByteArray(fileIn) + value = parseByteArray(innerBuffer) + Closeables.close(fileIn, false) + + processed = true + true + } else { + false + } + } + def parseByteArray(inArray: Array[Byte]): T +} + +/** + * A demo class for extracting just the byte array itself + */ + +@serializable class ByteInputFormat extends BinaryFileInputFormat[Array[Byte]] { + override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)= + { + new CombineFileRecordReader[String,Array[Byte]](split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader]) + } +} + +@serializable class ByteRecordReader( + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends BinaryRecordReader[Array[Byte]](split,context,index) { + + def parseByteArray(inArray: Array[Byte]) = inArray +} diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala new file mode 100644 index 0000000000000..7fdc553659f67 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -0,0 +1,48 @@ +package org.apache.spark.rdd + +/** Allows better control of the partitioning + * + */ +import java.text.SimpleDateFormat +import java.util.Date + +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.input.WholeTextFileInputFormat +import org.apache.spark.InterruptibleIterator +import org.apache.spark.Logging +import org.apache.spark.Partition +import org.apache.spark.SerializableWritable +import org.apache.spark.{SparkContext, TaskContext} + +import org.apache.spark.input.BinaryFileInputFormat + +private[spark] class BinaryFileRDD[T]( + sc : SparkContext, + inputFormatClass: Class[_ <: BinaryFileInputFormat[T]], + keyClass: Class[String], + valueClass: Class[T], + @transient conf: Configuration, + minPartitions: Int) + extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { + + override def getPartitions: Array[Partition] = { + val inputFormat = inputFormatClass.newInstance + inputFormat match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => + } + val jobContext = newJobContext(conf, jobId) + inputFormat.setMaxSplitSize(jobContext, minPartitions) + val rawSplits = inputFormat.getSplits(jobContext).toArray + val result = new Array[Partition](rawSplits.size) + for (i <- 0 until rawSplits.size) { + result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + } + result + } +}