From 776aec9e20f4a0c4beffe45ca07511bcd3fcba32 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 20 Jun 2014 15:58:46 -0700 Subject: [PATCH] Prevent OOM if a single RDD partition is too large The idea is to always use at least a fixed amount of memory (M bytes) to unroll RDD partitions. This space is not reserved, but instead allocated dynamically by dropping existing blocks when necessary. We maintain this buffer as a global quantity shared across all cores. The way we synchronize the usage of this buffer is very similar to the way we share memory across all threads for shuffle aggregation. In particular, each thread cautiously requests for more memory periodically, and if there is not enough global memory to grant the request, the thread concedes and spills. However, this relies on the accuracy of size estimation, which is not guaranteed. Therefore, as in the shuffle case, we need an equivalent spark.storage.safetyFraction in case size estimation is slightly off. We expose M to the user as spark.storage.bufferFraction, with a default value of 0.2. --- .../scala/org/apache/spark/CacheManager.scala | 86 +++++++++++++++++-- .../scala/org/apache/spark/SparkEnv.scala | 6 +- .../org/apache/spark/executor/Executor.scala | 8 +- .../apache/spark/storage/BlockManager.scala | 16 +++- .../org/apache/spark/storage/DiskStore.scala | 2 +- .../apache/spark/storage/MemoryStore.scala | 8 +- .../apache/spark/storage/TachyonStore.scala | 2 +- .../collection/ExternalAppendOnlyMap.scala | 4 +- .../util/collection/PrimitiveVector.scala | 10 +++ .../SizeTrackingAppendOnlyBuffer.scala | 4 + 10 files changed, 125 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 8e5b787e34c90..f30a28b2cfeed 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -17,10 +17,12 @@ package org.apache.spark -import scala.collection.mutable.{ArrayBuffer, HashSet} +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD import org.apache.spark.storage._ +import org.apache.spark.util.collection.SizeTrackingAppendOnlyBuffer /** * Spark class responsible for passing RDDs partition contents to the BlockManager and making @@ -29,7 +31,14 @@ import org.apache.spark.storage._ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { /** Keys of RDD partitions that are being computed/loaded. */ - private val loading = new HashSet[RDDBlockId]() + private val loading = new mutable.HashSet[RDDBlockId] + + /** + * The amount of space ensured for unrolling partitions, shared across all cores. + * This space is not reserved in advance, but allocated dynamically by dropping existing blocks. + * It must be a lazy val in order to access a mocked BlockManager's conf in tests properly. + */ + private lazy val globalBufferMemory = BlockManager.getBufferMemory(blockManager.conf) /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T]( @@ -142,9 +151,76 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * to the BlockManager as an iterator and expect to read it back later. This is because * we may end up dropping a partition from memory store before getting it back, e.g. * when the entirety of the RDD does not fit in memory. */ - val elements = values.toArray[Any] - updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator.asInstanceOf[Iterator[T]] + + var count = 0 // The number of elements unrolled so far + var dropPartition = false // Whether to drop the new partition from memory + var previousSize = 0L // Previous estimate of the size of our buffer + val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer + + val threadId = Thread.currentThread().getId + val cacheMemoryMap = SparkEnv.get.cacheMemoryMap + var buffer = new SizeTrackingAppendOnlyBuffer[Any] + + /* While adding values to the in-memory buffer, periodically check whether the memory + * restrictions for unrolling partitions are still satisfied. If not, stop immediately, + * and persist the partition to disk if specified by the storage level. This check is + * a safeguard against the scenario when a single partition does not fit in memory. */ + while (values.hasNext && !dropPartition) { + buffer += values.next() + count += 1 + if (count % memoryRequestPeriod == 1) { + // Calculate the amount of memory to request from the global memory pool + val currentSize = buffer.estimateSize() + val delta = math.max(currentSize - previousSize, 0) + val memoryToRequest = currentSize + delta + previousSize = currentSize + + // Atomically check whether there is sufficient memory in the global pool to continue + cacheMemoryMap.synchronized { + val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L) + val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory + + // Request for memory for the local buffer, and return whether request is granted + def requestForMemory(): Boolean = { + val availableMemory = blockManager.memoryStore.freeMemory - otherThreadsMemory + val granted = availableMemory > memoryToRequest + if (granted) { cacheMemoryMap(threadId) = memoryToRequest } + granted + } + + // If the first request is not granted, try again after ensuring free space + // If there is still not enough space, give up and drop the partition + if (!requestForMemory()) { + val result = blockManager.memoryStore.ensureFreeSpace(key, globalBufferMemory) + updatedBlocks ++= result.droppedBlocks + dropPartition = !requestForMemory() + } + } + } + } + + if (!dropPartition) { + // We have successfully unrolled the entire partition, so cache it in memory + updatedBlocks ++= blockManager.put(key, buffer.array, storageLevel, tellMaster = true) + buffer.iterator.asInstanceOf[Iterator[T]] + } else { + // We have exceeded our collective quota. This partition will not be cached in memory. + val persistToDisk = storageLevel.useDisk + logWarning(s"Failed to cache $key in memory! There is not enough space to unroll the " + + s"entire partition. " + (if (persistToDisk) "Persisting to disk instead." else "")) + var newValues = (buffer.iterator ++ values).asInstanceOf[Iterator[T]] + if (persistToDisk) { + val newLevel = StorageLevel(storageLevel.useDisk, useMemory = false, + storageLevel.useOffHeap, storageLevel.deserialized, storageLevel.replication) + newValues = putInBlockManager[T](key, newValues, newLevel, updatedBlocks) + // Free up buffer for other threads + buffer = null + cacheMemoryMap.synchronized { + cacheMemoryMap(threadId) = 0 + } + } + newValues + } } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 8dfa8cc4b5b3f..0d6bce064525b 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -67,10 +67,14 @@ class SparkEnv ( val metricsSystem: MetricsSystem, val conf: SparkConf) extends Logging { - // A mapping of thread ID to amount of memory used for shuffle in bytes + // A mapping of thread ID to amount of memory, in bytes, used for shuffle // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() + // A mapping of thread ID to amount of memory, in bytes, used for unrolling an RDD partition + // All accesses should be manually synchronized + val cacheMemoryMap = mutable.HashMap[Long, Long]() + private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() // A general, soft-reference map for metadata needed during HadoopRDD split computation diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index baee7a216a7c3..1c95f4d9ba136 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -259,11 +259,11 @@ private[spark] class Executor( } } } finally { - // TODO: Unregister shuffle memory only for ResultTask + val threadId = Thread.currentThread().getId val shuffleMemoryMap = env.shuffleMemoryMap - shuffleMemoryMap.synchronized { - shuffleMemoryMap.remove(Thread.currentThread().getId) - } + val cacheMemoryMap = env.cacheMemoryMap + shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(threadId) } + cacheMemoryMap.synchronized { cacheMemoryMap.remove(threadId) } runningTasks.remove(taskId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0444ffa346638..002f03ff622e4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -61,9 +61,9 @@ private[spark] class BlockManager( // Actual storage of where blocks are kept private var tachyonInitialized = false - private[storage] val memoryStore = new MemoryStore(this, maxMemory) - private[storage] val diskStore = new DiskStore(this, diskBlockManager) - private[storage] lazy val tachyonStore: TachyonStore = { + private[spark] val memoryStore = new MemoryStore(this, maxMemory) + private[spark] val diskStore = new DiskStore(this, diskBlockManager) + private[spark] lazy val tachyonStore: TachyonStore = { val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon") val appFolderName = conf.get("spark.tachyonStore.folderName") val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}" @@ -1049,9 +1049,17 @@ private[spark] class BlockManager( private[spark] object BlockManager extends Logging { private val ID_GENERATOR = new IdGenerator + /** Return the total amount of storage memory available. */ private def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) - (Runtime.getRuntime.maxMemory * memoryFraction).toLong + val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) + (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + /** Return the amount of storage memory used for unrolling RDD partitions. */ + def getBufferMemory(conf: SparkConf): Long = { + val bufferFraction = conf.getDouble("spark.storage.bufferFraction", 0.2) + (getMaxMemory(conf) * bufferFraction).toLong } def getHeartBeatFrequency(conf: SparkConf): Long = diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 4787e034f5f83..422179b44b189 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.Utils /** * Stores BlockManager blocks on disk. */ -private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) +private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) extends BlockStore(blockManager) with Logging { val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index c12a9d7021223..b4363b312bc64 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -30,7 +30,7 @@ private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) * Stores blocks in memory, either as Arrays of deserialized Java objects or as * serialized ByteBuffers. */ -private class MemoryStore(blockManager: BlockManager, maxMemory: Long) +private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) extends BlockStore(blockManager) { private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) @@ -212,7 +212,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * * Return whether there is enough free space, along with the blocks dropped in the process. */ - private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = { + private[spark] def ensureFreeSpace( + blockIdToAdd: BlockId, + space: Long): ResultWithDroppedBlocks = { logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory") val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] @@ -274,6 +276,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } -private case class ResultWithDroppedBlocks( +private[spark] case class ResultWithDroppedBlocks( success: Boolean, droppedBlocks: Seq[(BlockId, BlockStatus)]) diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index 99faf4c52f552..c843a50b70304 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.Utils /** * Stores BlockManager blocks on Tachyon. */ -private class TachyonStore( +private[spark] class TachyonStore( blockManager: BlockManager, tachyonManager: TachyonBlockManager) extends BlockStore(blockManager: BlockManager) with Logging { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 288badd3160f8..40e7521838f2b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -130,9 +130,9 @@ class ExternalAppendOnlyMap[K, V, C]( // this map to grow and, if possible, allocate the required amount shuffleMemoryMap.synchronized { val threadId = Thread.currentThread().getId - val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) + val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId).getOrElse(0L) val availableMemory = maxMemoryThreshold - - (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) + (shuffleMemoryMap.values.sum - previouslyOccupiedMemory) // Assume map growth factor is 2x shouldSpill = availableMemory < mapSize * 2 diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala index 3a22d998d1612..fff201f3b518e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala @@ -50,6 +50,16 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: def size: Int = _numElements + def iterator: Iterator[V] = new Iterator[V] { + var index = 0 + override def hasNext: Boolean = index < _numElements + override def next(): V = { + val value = _array(index) + index += 1 + value + } + } + /** Gets the underlying array backing this vector. */ def array: Array[V] = _array diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyBuffer.scala index 574c454557ec8..2ea4b1d996fe4 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyBuffer.scala @@ -36,4 +36,8 @@ private[spark] class SizeTrackingAppendOnlyBuffer[T: ClassTag] resetSamples() this } + + override def array: Array[T] = { + super.iterator.toArray + } }