Skip to content

Commit

Permalink
Merge pull request alteryx#32 from mridulm/master
Browse files Browse the repository at this point in the history
Address review comments, move to incubator spark

Also includes a small fix to speculative execution.

<edit> Continued from mesos/spark#914 </edit>

(cherry picked from commit d6ead47)
Signed-off-by: Reynold Xin <[email protected]>
  • Loading branch information
mateiz authored and rxin committed Oct 15, 2013
1 parent 7084217 commit d615b14
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 29 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import akka.util.Duration

import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap}


private[spark] sealed trait MapOutputTrackerMessage
Expand Down Expand Up @@ -71,7 +71,7 @@ private[spark] class MapOutputTracker extends Logging {
var cacheEpoch = epoch
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]

val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup)

// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util._
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.StorageStatus
import scala.Some
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.StorageStatus
Expand Down Expand Up @@ -119,7 +123,7 @@ class SparkContext(

// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup)

// Initalize the Spark UI
private[spark] val ui = new SparkUI(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable.{ListBuffer, Map, Set}
import scala.math

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.{BlockManager, StorageLevel}
import org.apache.spark.util.Utils

private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
Expand All @@ -36,7 +36,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:

def value = value_

def blockId: String = "broadcast_" + id
def blockId: String = BlockManager.toBroadcastId(id)

MultiTracker.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream

import org.apache.spark.{HttpServer, Logging, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashSet}
import org.apache.spark.storage.{BlockManager, StorageLevel}
import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashSet}


private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

def value = value_

def blockId: String = "broadcast_" + id
def blockId: String = BlockManager.toBroadcastId(id)

HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
Expand Down Expand Up @@ -82,7 +82,7 @@ private object HttpBroadcast extends Logging {
private var server: HttpServer = null

private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)

private lazy val compressionCodec = CompressionCodec.createCodec()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ import scala.collection.mutable.{ListBuffer, Map, Set}
import scala.math

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.{BlockManager, StorageLevel}
import org.apache.spark.util.Utils

private[spark] class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

def value = value_

def blockId = "broadcast_" + id
def blockId = BlockManager.toBroadcastId(id)

MultiTracker.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File

import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.storage.ShuffleBlockManager


private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
Expand Down Expand Up @@ -54,7 +55,7 @@ private[spark] object ShuffleSender {

val pResovler = new PathResolver {
override def getAbsolutePath(blockId: String): String = {
if (!blockId.startsWith("shuffle_")) {
if (!ShuffleBlockManager.isShuffle(blockId)) {
throw new Exception("Block " + blockId + " is not a shuffle block")
}
// Figure out which local directory it hashes to, and which subdirectory in that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}

/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
Expand Down Expand Up @@ -138,7 +138,7 @@ class DAGScheduler(
val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob]

val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)

// Start a thread to run the DAGScheduler event loop
def start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDDCheckpointData
import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}

private[spark] object ResultTask {

Expand All @@ -32,7 +32,7 @@ private[spark] object ResultTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]

val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues)
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues)

def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.storage._
import org.apache.spark.util.{TimeStampedHashMap, MetadataCleaner}
import org.apache.spark.util.{MetadataCleanerType, TimeStampedHashMap, MetadataCleaner}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDDCheckpointData

Expand All @@ -37,7 +37,7 @@ private[spark] object ShuffleMapTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]

val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues)
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_MAP_TASK, serializedInfoCache.clearOldValues)

def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,11 +630,11 @@ private[spark] class ClusterTaskSetManager(
var foundTasks = false
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
if (tasksSuccessful >= minFinishedForSpeculation) {
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
val time = clock.getTime()
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
val medianDuration = durations(min((0.5 * numTasks).round.toInt, durations.size - 1))
val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
// TODO: Threshold should also look at standard deviation of task durations and have a lower
// bound based on that.
Expand Down
39 changes: 34 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ private[spark] class BlockManager(

var heartBeatTask: Cancellable = null

val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks)
private val broadcastCleaner = new MetadataCleaner(MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks)
initialize()

// The compression codec to use. Note that the "lazy" val is necessary because we want to delay
Expand Down Expand Up @@ -921,13 +922,36 @@ private[spark] class BlockManager(
}
}

def dropOldBlocks(cleanupTime: Long) {
logInfo("Dropping blocks older than " + cleanupTime)
private def dropOldNonBroadcastBlocks(cleanupTime: Long) {
logInfo("Dropping non broadcast blocks older than " + cleanupTime)
val iterator = blockInfo.internalMap.entrySet().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
if (time < cleanupTime) {
if (time < cleanupTime && ! BlockManager.isBroadcastBlock(id) ) {
info.synchronized {
val level = info.level
if (level.useMemory) {
memoryStore.remove(id)
}
if (level.useDisk) {
diskStore.remove(id)
}
iterator.remove()
logInfo("Dropped block " + id)
}
reportBlockStatus(id, info)
}
}
}

private def dropOldBroadcastBlocks(cleanupTime: Long) {
logInfo("Dropping broadcast blocks older than " + cleanupTime)
val iterator = blockInfo.internalMap.entrySet().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
if (time < cleanupTime && BlockManager.isBroadcastBlock(id) ) {
info.synchronized {
val level = info.level
if (level.useMemory) {
Expand All @@ -947,7 +971,7 @@ private[spark] class BlockManager(
def shouldCompress(blockId: String): Boolean = {
if (ShuffleBlockManager.isShuffle(blockId)) {
compressShuffle
} else if (blockId.startsWith("broadcast_")) {
} else if (BlockManager.isBroadcastBlock(blockId)) {
compressBroadcast
} else if (blockId.startsWith("rdd_")) {
compressRdds
Expand Down Expand Up @@ -1004,6 +1028,7 @@ private[spark] class BlockManager(
memoryStore.clear()
diskStore.clear()
metadataCleaner.cancel()
broadcastCleaner.cancel()
logInfo("BlockManager stopped")
}
}
Expand Down Expand Up @@ -1077,5 +1102,9 @@ private[spark] object BlockManager extends Logging {
{
blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host))
}

def isBroadcastBlock(blockId: String): Boolean = null != blockId && blockId.startsWith("broadcast_")

def toBroadcastId(id: Long): String = "broadcast_" + id
}

36 changes: 33 additions & 3 deletions core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@

package org.apache.spark.util

import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
import java.util.{TimerTask, Timer}
import org.apache.spark.Logging


/**
* Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
*/
class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit) extends Logging {
val name = cleanerType.toString

private val delaySeconds = MetadataCleaner.getDelaySeconds
private val periodSeconds = math.max(10, delaySeconds / 10)
private val timer = new Timer(name + " cleanup timer", true)


private val task = new TimerTask {
override def run() {
try {
Expand All @@ -53,9 +55,37 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
}
}

object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext", "HttpBroadcast", "DagScheduler", "ResultTask",
"ShuffleMapTask", "BlockManager", "BroadcastVars") {

val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, SHUFFLE_MAP_TASK, BLOCK_MANAGER, BROADCAST_VARS = Value

type MetadataCleanerType = Value

def systemProperty(which: MetadataCleanerType.MetadataCleanerType) = "spark.cleaner.ttl." + which.toString
}

object MetadataCleaner {

// using only sys props for now : so that workers can also get to it while preserving earlier behavior.
def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt
def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.ttl", delay.toString) }

def getDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
System.getProperty(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds.toString).toInt
}

def setDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType, delay: Int) {
System.setProperty(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
}

def setDelaySeconds(delay: Int, resetAll: Boolean = true) {
// override for all ?
System.setProperty("spark.cleaner.ttl", delay.toString)
if (resetAll) {
for (cleanerType <- MetadataCleanerType.values) {
System.clearProperty(MetadataCleanerType.systemProperty(cleanerType))
}
}
}
}

0 comments on commit d615b14

Please sign in to comment.