diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 8e5b787e34c90..f30a28b2cfeed 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -17,10 +17,12 @@ package org.apache.spark -import scala.collection.mutable.{ArrayBuffer, HashSet} +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD import org.apache.spark.storage._ +import org.apache.spark.util.collection.SizeTrackingAppendOnlyBuffer /** * Spark class responsible for passing RDDs partition contents to the BlockManager and making @@ -29,7 +31,14 @@ import org.apache.spark.storage._ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { /** Keys of RDD partitions that are being computed/loaded. */ - private val loading = new HashSet[RDDBlockId]() + private val loading = new mutable.HashSet[RDDBlockId] + + /** + * The amount of space ensured for unrolling partitions, shared across all cores. + * This space is not reserved in advance, but allocated dynamically by dropping existing blocks. + * It must be a lazy val in order to access a mocked BlockManager's conf in tests properly. + */ + private lazy val globalBufferMemory = BlockManager.getBufferMemory(blockManager.conf) /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T]( @@ -142,9 +151,76 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * to the BlockManager as an iterator and expect to read it back later. This is because * we may end up dropping a partition from memory store before getting it back, e.g. * when the entirety of the RDD does not fit in memory. */ - val elements = values.toArray[Any] - updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator.asInstanceOf[Iterator[T]] + + var count = 0 // The number of elements unrolled so far + var dropPartition = false // Whether to drop the new partition from memory + var previousSize = 0L // Previous estimate of the size of our buffer + val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer + + val threadId = Thread.currentThread().getId + val cacheMemoryMap = SparkEnv.get.cacheMemoryMap + var buffer = new SizeTrackingAppendOnlyBuffer[Any] + + /* While adding values to the in-memory buffer, periodically check whether the memory + * restrictions for unrolling partitions are still satisfied. If not, stop immediately, + * and persist the partition to disk if specified by the storage level. This check is + * a safeguard against the scenario when a single partition does not fit in memory. */ + while (values.hasNext && !dropPartition) { + buffer += values.next() + count += 1 + if (count % memoryRequestPeriod == 1) { + // Calculate the amount of memory to request from the global memory pool + val currentSize = buffer.estimateSize() + val delta = math.max(currentSize - previousSize, 0) + val memoryToRequest = currentSize + delta + previousSize = currentSize + + // Atomically check whether there is sufficient memory in the global pool to continue + cacheMemoryMap.synchronized { + val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L) + val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory + + // Request for memory for the local buffer, and return whether request is granted + def requestForMemory(): Boolean = { + val availableMemory = blockManager.memoryStore.freeMemory - otherThreadsMemory + val granted = availableMemory > memoryToRequest + if (granted) { cacheMemoryMap(threadId) = memoryToRequest } + granted + } + + // If the first request is not granted, try again after ensuring free space + // If there is still not enough space, give up and drop the partition + if (!requestForMemory()) { + val result = blockManager.memoryStore.ensureFreeSpace(key, globalBufferMemory) + updatedBlocks ++= result.droppedBlocks + dropPartition = !requestForMemory() + } + } + } + } + + if (!dropPartition) { + // We have successfully unrolled the entire partition, so cache it in memory + updatedBlocks ++= blockManager.put(key, buffer.array, storageLevel, tellMaster = true) + buffer.iterator.asInstanceOf[Iterator[T]] + } else { + // We have exceeded our collective quota. This partition will not be cached in memory. + val persistToDisk = storageLevel.useDisk + logWarning(s"Failed to cache $key in memory! There is not enough space to unroll the " + + s"entire partition. " + (if (persistToDisk) "Persisting to disk instead." else "")) + var newValues = (buffer.iterator ++ values).asInstanceOf[Iterator[T]] + if (persistToDisk) { + val newLevel = StorageLevel(storageLevel.useDisk, useMemory = false, + storageLevel.useOffHeap, storageLevel.deserialized, storageLevel.replication) + newValues = putInBlockManager[T](key, newValues, newLevel, updatedBlocks) + // Free up buffer for other threads + buffer = null + cacheMemoryMap.synchronized { + cacheMemoryMap(threadId) = 0 + } + } + newValues + } } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 8dfa8cc4b5b3f..0d6bce064525b 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -67,10 +67,14 @@ class SparkEnv ( val metricsSystem: MetricsSystem, val conf: SparkConf) extends Logging { - // A mapping of thread ID to amount of memory used for shuffle in bytes + // A mapping of thread ID to amount of memory, in bytes, used for shuffle // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() + // A mapping of thread ID to amount of memory, in bytes, used for unrolling an RDD partition + // All accesses should be manually synchronized + val cacheMemoryMap = mutable.HashMap[Long, Long]() + private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() // A general, soft-reference map for metadata needed during HadoopRDD split computation diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index baee7a216a7c3..1c95f4d9ba136 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -259,11 +259,11 @@ private[spark] class Executor( } } } finally { - // TODO: Unregister shuffle memory only for ResultTask + val threadId = Thread.currentThread().getId val shuffleMemoryMap = env.shuffleMemoryMap - shuffleMemoryMap.synchronized { - shuffleMemoryMap.remove(Thread.currentThread().getId) - } + val cacheMemoryMap = env.cacheMemoryMap + shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(threadId) } + cacheMemoryMap.synchronized { cacheMemoryMap.remove(threadId) } runningTasks.remove(taskId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0444ffa346638..002f03ff622e4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -61,9 +61,9 @@ private[spark] class BlockManager( // Actual storage of where blocks are kept private var tachyonInitialized = false - private[storage] val memoryStore = new MemoryStore(this, maxMemory) - private[storage] val diskStore = new DiskStore(this, diskBlockManager) - private[storage] lazy val tachyonStore: TachyonStore = { + private[spark] val memoryStore = new MemoryStore(this, maxMemory) + private[spark] val diskStore = new DiskStore(this, diskBlockManager) + private[spark] lazy val tachyonStore: TachyonStore = { val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon") val appFolderName = conf.get("spark.tachyonStore.folderName") val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}" @@ -1049,9 +1049,17 @@ private[spark] class BlockManager( private[spark] object BlockManager extends Logging { private val ID_GENERATOR = new IdGenerator + /** Return the total amount of storage memory available. */ private def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) - (Runtime.getRuntime.maxMemory * memoryFraction).toLong + val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) + (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + /** Return the amount of storage memory used for unrolling RDD partitions. */ + def getBufferMemory(conf: SparkConf): Long = { + val bufferFraction = conf.getDouble("spark.storage.bufferFraction", 0.2) + (getMaxMemory(conf) * bufferFraction).toLong } def getHeartBeatFrequency(conf: SparkConf): Long = diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 4787e034f5f83..422179b44b189 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.Utils /** * Stores BlockManager blocks on disk. */ -private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) +private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) extends BlockStore(blockManager) with Logging { val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index c12a9d7021223..b4363b312bc64 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -30,7 +30,7 @@ private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) * Stores blocks in memory, either as Arrays of deserialized Java objects or as * serialized ByteBuffers. */ -private class MemoryStore(blockManager: BlockManager, maxMemory: Long) +private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) extends BlockStore(blockManager) { private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) @@ -212,7 +212,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * * Return whether there is enough free space, along with the blocks dropped in the process. */ - private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = { + private[spark] def ensureFreeSpace( + blockIdToAdd: BlockId, + space: Long): ResultWithDroppedBlocks = { logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory") val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] @@ -274,6 +276,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } -private case class ResultWithDroppedBlocks( +private[spark] case class ResultWithDroppedBlocks( success: Boolean, droppedBlocks: Seq[(BlockId, BlockStatus)]) diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index 99faf4c52f552..c843a50b70304 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.Utils /** * Stores BlockManager blocks on Tachyon. */ -private class TachyonStore( +private[spark] class TachyonStore( blockManager: BlockManager, tachyonManager: TachyonBlockManager) extends BlockStore(blockManager: BlockManager) with Logging { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 288badd3160f8..40e7521838f2b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -130,9 +130,9 @@ class ExternalAppendOnlyMap[K, V, C]( // this map to grow and, if possible, allocate the required amount shuffleMemoryMap.synchronized { val threadId = Thread.currentThread().getId - val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) + val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId).getOrElse(0L) val availableMemory = maxMemoryThreshold - - (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) + (shuffleMemoryMap.values.sum - previouslyOccupiedMemory) // Assume map growth factor is 2x shouldSpill = availableMemory < mapSize * 2 diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala index 3a22d998d1612..fff201f3b518e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala @@ -50,6 +50,16 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: def size: Int = _numElements + def iterator: Iterator[V] = new Iterator[V] { + var index = 0 + override def hasNext: Boolean = index < _numElements + override def next(): V = { + val value = _array(index) + index += 1 + value + } + } + /** Gets the underlying array backing this vector. */ def array: Array[V] = _array diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyBuffer.scala index 574c454557ec8..2ea4b1d996fe4 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyBuffer.scala @@ -36,4 +36,8 @@ private[spark] class SizeTrackingAppendOnlyBuffer[T: ClassTag] resetSamples() this } + + override def array: Array[T] = { + super.iterator.toArray + } }