Skip to content

Commit

Permalink
Prevent OOM if a single RDD partition is too large
Browse files Browse the repository at this point in the history
The idea is to always use at least a fixed amount of memory (M bytes)
to unroll RDD partitions. This space is not reserved, but instead
allocated dynamically by dropping existing blocks when necessary.

We maintain this buffer as a global quantity shared across all cores.
The way we synchronize the usage of this buffer is very similar to the
way we share memory across all threads for shuffle aggregation. In
particular, each thread cautiously requests for more memory
periodically,
and if there is not enough global memory to grant the request, the
thread concedes and spills. However, this relies on the accuracy of
size estimation, which is not guaranteed. Therefore, as in the shuffle
case, we need an equivalent spark.storage.safetyFraction in case size
estimation is slightly off.

We expose M to the user as spark.storage.bufferFraction, with a default
value of 0.2.
  • Loading branch information
andrewor14 committed Jun 21, 2014
1 parent bbd3eea commit 776aec9
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 21 deletions.
86 changes: 81 additions & 5 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark

import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util.collection.SizeTrackingAppendOnlyBuffer

/**
* Spark class responsible for passing RDDs partition contents to the BlockManager and making
Expand All @@ -29,7 +31,14 @@ import org.apache.spark.storage._
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

/** Keys of RDD partitions that are being computed/loaded. */
private val loading = new HashSet[RDDBlockId]()
private val loading = new mutable.HashSet[RDDBlockId]

/**
* The amount of space ensured for unrolling partitions, shared across all cores.
* This space is not reserved in advance, but allocated dynamically by dropping existing blocks.
* It must be a lazy val in order to access a mocked BlockManager's conf in tests properly.
*/
private lazy val globalBufferMemory = BlockManager.getBufferMemory(blockManager.conf)

/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
Expand Down Expand Up @@ -142,9 +151,76 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back, e.g.
* when the entirety of the RDD does not fit in memory. */
val elements = values.toArray[Any]
updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]

var count = 0 // The number of elements unrolled so far
var dropPartition = false // Whether to drop the new partition from memory
var previousSize = 0L // Previous estimate of the size of our buffer
val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer

val threadId = Thread.currentThread().getId
val cacheMemoryMap = SparkEnv.get.cacheMemoryMap
var buffer = new SizeTrackingAppendOnlyBuffer[Any]

/* While adding values to the in-memory buffer, periodically check whether the memory
* restrictions for unrolling partitions are still satisfied. If not, stop immediately,
* and persist the partition to disk if specified by the storage level. This check is
* a safeguard against the scenario when a single partition does not fit in memory. */
while (values.hasNext && !dropPartition) {
buffer += values.next()
count += 1
if (count % memoryRequestPeriod == 1) {
// Calculate the amount of memory to request from the global memory pool
val currentSize = buffer.estimateSize()
val delta = math.max(currentSize - previousSize, 0)
val memoryToRequest = currentSize + delta
previousSize = currentSize

// Atomically check whether there is sufficient memory in the global pool to continue
cacheMemoryMap.synchronized {
val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L)
val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory

// Request for memory for the local buffer, and return whether request is granted
def requestForMemory(): Boolean = {
val availableMemory = blockManager.memoryStore.freeMemory - otherThreadsMemory
val granted = availableMemory > memoryToRequest
if (granted) { cacheMemoryMap(threadId) = memoryToRequest }
granted
}

// If the first request is not granted, try again after ensuring free space
// If there is still not enough space, give up and drop the partition
if (!requestForMemory()) {
val result = blockManager.memoryStore.ensureFreeSpace(key, globalBufferMemory)
updatedBlocks ++= result.droppedBlocks
dropPartition = !requestForMemory()
}
}
}
}

if (!dropPartition) {
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++= blockManager.put(key, buffer.array, storageLevel, tellMaster = true)
buffer.iterator.asInstanceOf[Iterator[T]]
} else {
// We have exceeded our collective quota. This partition will not be cached in memory.
val persistToDisk = storageLevel.useDisk
logWarning(s"Failed to cache $key in memory! There is not enough space to unroll the " +
s"entire partition. " + (if (persistToDisk) "Persisting to disk instead." else ""))
var newValues = (buffer.iterator ++ values).asInstanceOf[Iterator[T]]
if (persistToDisk) {
val newLevel = StorageLevel(storageLevel.useDisk, useMemory = false,
storageLevel.useOffHeap, storageLevel.deserialized, storageLevel.replication)
newValues = putInBlockManager[T](key, newValues, newLevel, updatedBlocks)
// Free up buffer for other threads
buffer = null
cacheMemoryMap.synchronized {
cacheMemoryMap(threadId) = 0
}
}
newValues
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ class SparkEnv (
val metricsSystem: MetricsSystem,
val conf: SparkConf) extends Logging {

// A mapping of thread ID to amount of memory used for shuffle in bytes
// A mapping of thread ID to amount of memory, in bytes, used for shuffle
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()

// A mapping of thread ID to amount of memory, in bytes, used for unrolling an RDD partition
// All accesses should be manually synchronized
val cacheMemoryMap = mutable.HashMap[Long, Long]()

private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

// A general, soft-reference map for metadata needed during HadoopRDD split computation
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,11 @@ private[spark] class Executor(
}
}
} finally {
// TODO: Unregister shuffle memory only for ResultTask
val threadId = Thread.currentThread().getId
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
val cacheMemoryMap = env.cacheMemoryMap
shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(threadId) }
cacheMemoryMap.synchronized { cacheMemoryMap.remove(threadId) }
runningTasks.remove(taskId)
}
}
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ private[spark] class BlockManager(

// Actual storage of where blocks are kept
private var tachyonInitialized = false
private[storage] val memoryStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
private[storage] lazy val tachyonStore: TachyonStore = {
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
private[spark] lazy val tachyonStore: TachyonStore = {
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
val appFolderName = conf.get("spark.tachyonStore.folderName")
val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
Expand Down Expand Up @@ -1049,9 +1049,17 @@ private[spark] class BlockManager(
private[spark] object BlockManager extends Logging {
private val ID_GENERATOR = new IdGenerator

/** Return the total amount of storage memory available. */
private def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}

/** Return the amount of storage memory used for unrolling RDD partitions. */
def getBufferMemory(conf: SparkConf): Long = {
val bufferFraction = conf.getDouble("spark.storage.bufferFraction", 0.2)
(getMaxMemory(conf) * bufferFraction).toLong
}

def getHeartBeatFrequency(conf: SparkConf): Long =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.Utils
/**
* Stores BlockManager blocks on disk.
*/
private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
extends BlockStore(blockManager) with Logging {

val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
* Stores blocks in memory, either as Arrays of deserialized Java objects or as
* serialized ByteBuffers.
*/
private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {

private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
Expand Down Expand Up @@ -212,7 +212,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
*
* Return whether there is enough free space, along with the blocks dropped in the process.
*/
private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = {
private[spark] def ensureFreeSpace(
blockIdToAdd: BlockId,
space: Long): ResultWithDroppedBlocks = {
logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory")

val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
Expand Down Expand Up @@ -274,6 +276,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

private case class ResultWithDroppedBlocks(
private[spark] case class ResultWithDroppedBlocks(
success: Boolean,
droppedBlocks: Seq[(BlockId, BlockStatus)])
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.Utils
/**
* Stores BlockManager blocks on Tachyon.
*/
private class TachyonStore(
private[spark] class TachyonStore(
blockManager: BlockManager,
tachyonManager: TachyonBlockManager)
extends BlockStore(blockManager: BlockManager) with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ class ExternalAppendOnlyMap[K, V, C](
// this map to grow and, if possible, allocate the required amount
shuffleMemoryMap.synchronized {
val threadId = Thread.currentThread().getId
val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId).getOrElse(0L)
val availableMemory = maxMemoryThreshold -
(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
(shuffleMemoryMap.values.sum - previouslyOccupiedMemory)

// Assume map growth factor is 2x
shouldSpill = availableMemory < mapSize * 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize:

def size: Int = _numElements

def iterator: Iterator[V] = new Iterator[V] {
var index = 0
override def hasNext: Boolean = index < _numElements
override def next(): V = {
val value = _array(index)
index += 1
value
}
}

/** Gets the underlying array backing this vector. */
def array: Array[V] = _array

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ private[spark] class SizeTrackingAppendOnlyBuffer[T: ClassTag]
resetSamples()
this
}

override def array: Array[T] = {
super.iterator.toArray
}
}

0 comments on commit 776aec9

Please sign in to comment.