Skip to content

Commit

Permalink
Merge pull request #20 from harveyfeng/hadoop-config-cache
Browse files Browse the repository at this point in the history
Allow users to pass broadcasted Configurations and cache InputFormats across Hadoop file reads.

Note: originally from mesos/spark#942

Currently motivated by Shark queries on Hive-partitioned tables, where there's a JobConf broadcast for every Hive-partition (i.e., every subdirectory read). The only thing different about those JobConfs is the input path - the Hadoop Configuration that the JobConfs are constructed from remain the same.
This PR only modifies the old Hadoop API RDDs, but similar additions to the new API might reduce computation latencies a little bit for high-frequency FileInputDStreams (which only uses the new API right now).

As a small bonus, added InputFormats caching, to avoid reflection calls for every RDD#compute().

Few other notes:

Added a general soft-reference hashmap in SparkHadoopUtil because I wanted to avoid adding another class to SparkEnv.
SparkContext default hadoopConfiguration isn't cached. There's no equals() method for Configuration, so there isn't a good way to determine when configuration properties have changed.
  • Loading branch information
mateiz committed Oct 6, 2013
2 parents 8fc68d0 + 6a2bbec commit 4a25b11
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 34 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import org.apache.spark.rdd.RDD
sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
private val loading = new HashSet[String]

/** Keys of RDD splits that are being computed/loaded. */
private val loading = new HashSet[String]()

/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
Expand Down
39 changes: 31 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor

import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand Down Expand Up @@ -83,9 +84,11 @@ class SparkContext(
val sparkHome: String = null,
val jars: Seq[String] = Nil,
val environment: Map[String, String] = Map(),
// This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too.
// This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())
// This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc)
// too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
// of data-local splits on host
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
scala.collection.immutable.Map())
extends Logging {

// Ensure logging is initialized before we spawn any threads
Expand Down Expand Up @@ -238,7 +241,8 @@ class SparkContext(
val env = SparkEnv.get
val conf = env.hadoop.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
Expand Down Expand Up @@ -337,6 +341,8 @@ class SparkContext(
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkEnv.get.hadoop.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}

Expand All @@ -347,10 +353,27 @@ class SparkContext(
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
) : RDD[(K, V)] = {
val conf = new JobConf(hadoopConfiguration)
FileInputFormat.setInputPaths(conf, path)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}

/**
* Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
* that has already been broadcast, assuming that it's safe to use it to construct a
* HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
*/
def hadoopFile[K, V](
path: String,
confBroadcast: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int
): RDD[(K, V)] = {
new HadoopFileRDD(
this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}

/**
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/

package org.apache.spark.deploy

import com.google.common.collect.MapMaker

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf

Expand All @@ -24,11 +27,16 @@ import org.apache.hadoop.mapred.JobConf
* Contains util methods to interact with Hadoop from spark.
*/
class SparkHadoopUtil {
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()

// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop
// subsystems
def newConfiguration(): Configuration = new Configuration()

// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
// Add any user credentials to the job conf which are necessary for running on a secure Hadoop
// cluster
def addCredentials(conf: JobConf) {}

def isYarnMode(): Boolean = { false }
Expand Down
140 changes: 117 additions & 23 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,55 @@ package org.apache.spark.rdd

import java.io.EOFException

import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}

/**
* An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
* system, or S3).
* This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
* across multiple reads; the 'path' is the only variable that is different across new JobConfs
* created from the Configuration.
*/
class HadoopFileRDD[K, V](
sc: SparkContext,
path: String,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) {

override def getJobConf(): JobConf = {
if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
// getJobConf() has been called previously, so there is already a local cache of the JobConf
// needed by this RDD.
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a new JobConf, set the input file/directory paths to read from, and cache the
// JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through
// HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple
// getJobConf() calls for this RDD in the local process.
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
FileInputFormat.setInputPaths(newJobConf, path)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
}
}
}

/**
* A Spark split class that wraps around a Hadoop InputSplit.
Expand All @@ -45,52 +83,95 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
}

/**
* An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
* system, or S3, tables in HBase, etc).
* A base class that provides core functionality for reading data partitions stored in Hadoop.
*/
class HadoopRDD[K, V](
sc: SparkContext,
@transient conf: JobConf,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging {

// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
def this(
sc: SparkContext,
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
inputFormatClass,
keyClass,
valueClass,
minSplits)
}

protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)

protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) {
// A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
return conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
// getJobConf() has been called previously, so there is already a local cache of the JobConf
// needed by this RDD.
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
}
}

protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
}
// Once an InputFormat for this RDD is created, cache it so that only one reflection call is
// done in each local process.
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
if (newInputFormat.isInstanceOf[Configurable]) {
newInputFormat.asInstanceOf[Configurable].setConf(conf)
}
HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
return newInputFormat
}

override def getPartitions: Array[Partition] = {
val env = SparkEnv.get
env.hadoop.addCredentials(conf)
val inputFormat = createInputFormat(conf)
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
val inputSplits = inputFormat.getSplits(conf, minSplits)
val inputSplits = inputFormat.getSplits(jobConf, minSplits)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}

def createInputFormat(conf: JobConf): InputFormat[K, V] = {
ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
}

override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null

val conf = confBroadcast.value.value
val fmt = createInputFormat(conf)
if (fmt.isInstanceOf[Configurable]) {
fmt.asInstanceOf[Configurable].setConf(conf)
}
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
Expand Down Expand Up @@ -127,5 +208,18 @@ class HadoopRDD[K, V](
// Do nothing. Hadoop RDD should not be checkpointed.
}

def getConf: Configuration = confBroadcast.value.value
def getConf: Configuration = getJobConf()
}

private[spark] object HadoopRDD {
/**
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process.
*/
def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)

def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)

def putCachedMetadata(key: String, value: Any) =
SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
}

0 comments on commit 4a25b11

Please sign in to comment.