Skip to content

Commit

Permalink
adding binary and byte file support spark
Browse files Browse the repository at this point in the history
  • Loading branch information
kmader committed Jul 30, 2014
1 parent 81c5f12 commit 84035f1
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 1 deletion.
22 changes: 21 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.input.{WholeTextFileInputFormat,ByteInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -510,6 +510,26 @@ class SparkContext(config: SparkConf) extends Logging {
minPartitions).setName(path)
}

/**
* Get an RDD for a Hadoop-readable dataset as byte-streams for each file (useful for binary data)
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*/
def byteFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
classOf[ByteInputFormat],
classOf[String],
classOf[Array[Byte]],
updateConf,
minPartitions).setName(path)
}

/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
Expand Down
102 changes: 102 additions & 0 deletions core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.apache.spark.input

import scala.collection.JavaConversions._
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader


/**
* The new (Hadoop 2.0) InputFormat for while binary files (not be to be confused with the recordreader itself)
*/
@serializable abstract class BinaryFileInputFormat[T]
extends CombineFileInputFormat[String,T] {
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
/**
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
*/
def setMaxSplitSize(context: JobContext, minPartitions: Int) {
val files = listStatus(context)
val totalLen = files.map { file =>
if (file.isDir) 0L else file.getLen
}.sum

/** val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong **/
val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
super.setMaxSplitSize(maxSplitSize)
}

def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String,T]

}

/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole tiff file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file as a TSliceReader (to keep the size information
*/
@serializable abstract class BinaryRecordReader[T](
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[String, T] {

private val path = split.getPath(index)
private val fs = path.getFileSystem(context.getConfiguration)

// True means the current file has been processed, then skip it.
private var processed = false

private val key = path.toString
private var value: T = null.asInstanceOf[T]
override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
override def close() = {}

override def getProgress = if (processed) 1.0f else 0.0f

override def getCurrentKey = key

override def getCurrentValue = value

override def nextKeyValue = {
if (!processed) {
val fileIn = fs.open(path)
val innerBuffer = ByteStreams.toByteArray(fileIn)
value = parseByteArray(innerBuffer)
Closeables.close(fileIn, false)

processed = true
true
} else {
false
}
}
def parseByteArray(inArray: Array[Byte]): T
}

/**
* A demo class for extracting just the byte array itself
*/

@serializable class ByteInputFormat extends BinaryFileInputFormat[Array[Byte]] {
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)=
{
new CombineFileRecordReader[String,Array[Byte]](split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader])
}
}

@serializable class ByteRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends BinaryRecordReader[Array[Byte]](split,context,index) {

def parseByteArray(inArray: Array[Byte]) = inArray
}
48 changes: 48 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.apache.spark.rdd

/** Allows better control of the partitioning
*
*/
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.InterruptibleIterator
import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}

import org.apache.spark.input.BinaryFileInputFormat

private[spark] class BinaryFileRDD[T](
sc : SparkContext,
inputFormatClass: Class[_ <: BinaryFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
val jobContext = newJobContext(conf, jobId)
inputFormat.setMaxSplitSize(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
result
}
}

0 comments on commit 84035f1

Please sign in to comment.