diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index f4f26cadcc738..ae9147e499710 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -71,10 +71,6 @@ class SparkEnv ( // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() - // A mapping of thread ID to amount of memory, in bytes, used for unrolling a block - // All accesses should be manually synchronized - val unrollMemoryMap = mutable.HashMap[Long, Long]() - private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() // A general, soft-reference map for metadata needed during HadoopRDD split computation diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5e748c100d680..bd9d2cfb034be 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -266,11 +266,13 @@ private[spark] class Executor( } } } finally { - val threadId = Thread.currentThread().getId + // Release memory used by this thread for shuffles val shuffleMemoryMap = env.shuffleMemoryMap - val unrollMemoryMap = env.unrollMemoryMap - shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(threadId) } - unrollMemoryMap.synchronized { unrollMemoryMap.remove(threadId) } + shuffleMemoryMap.synchronized { + shuffleMemoryMap.remove(Thread.currentThread().getId) + } + // Release memory used by this thread for unrolling blocks + env.blockManager.memoryStore.releaseUnrollMemory() runningTasks.remove(taskId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index f115b8e106c32..d9da1478be71c 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -20,9 +20,9 @@ package org.apache.spark.storage import java.nio.ByteBuffer import java.util.LinkedHashMap +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.SparkEnv import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector @@ -44,20 +44,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // blocks from the memory store. private val putLock = new Object() - /** - * Mapping from thread ID to memory used for unrolling blocks. - * - * To avoid potential deadlocks, all accesses of this map in MemoryStore are assumed to - * first synchronize on `putLock` and then on `unrollMemoryMap`, in that particular order. - * This is lazy because SparkEnv does not exist when we mock this class in tests. - */ - private lazy val unrollMemoryMap = SparkEnv.get.unrollMemoryMap + // A mapping from thread ID to amount of memory used for unrolling a block (in bytes) + // All accesses of this map are assumed to have manually synchronized on `putLock` + private val unrollMemoryMap = mutable.HashMap[Long, Long]() /** * The amount of space ensured for unrolling values in memory, shared across all cores. * This space is not reserved in advance, but allocated dynamically by dropping existing blocks. */ - private val globalUnrollMemory = { + private val maxUnrollMemory: Long = { val unrollFraction = conf.getDouble("spark.storage.unrollFraction", 0.2) (maxMemory * unrollFraction).toLong } @@ -227,22 +222,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size val memoryGrowthFactor = 1.5 - - val threadId = Thread.currentThread().getId + // Underlying vector for unrolling the block var vector = new SizeTrackingVector[Any] - // Request memory for our vector and return whether the request is granted. This involves - // synchronizing on putLock and unrollMemoryMap (in that order), which could be expensive. + // Request memory for our vector and return whether the request is granted + // This involves synchronizing across all threads, which is expensive if called frequently def requestMemory(memoryToRequest: Long): Boolean = { putLock.synchronized { - unrollMemoryMap.synchronized { - val previouslyOccupiedMemory = unrollMemoryMap.get(threadId).getOrElse(0L) - val otherThreadsMemory = unrollMemoryMap.values.sum - previouslyOccupiedMemory - val availableMemory = freeMemory - otherThreadsMemory - val granted = availableMemory > memoryToRequest - if (granted) { unrollMemoryMap(threadId) = memoryToRequest } - granted - } + val otherThreadsMemory = currentUnrollMemory - threadCurrentUnrollMemory + val availableMemory = freeMemory - otherThreadsMemory + val granted = availableMemory > memoryToRequest + if (granted) { reserveUnrollMemory(memoryToRequest) } + granted } } @@ -261,17 +252,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Hold the put lock, in case another thread concurrently puts a block that takes // up the unrolling space we just ensured here putLock.synchronized { - unrollMemoryMap.synchronized { - if (!requestMemory(amountToRequest)) { - // If the first request is not granted, try again after ensuring free space - // If there is still not enough space, give up and drop the partition - val extraSpaceNeeded = globalUnrollMemory - unrollMemoryMap.values.sum - val result = ensureFreeSpace(blockId, extraSpaceNeeded) - droppedBlocks ++= result.droppedBlocks - keepUnrolling = requestMemory(amountToRequest) - } - memoryThreshold = amountToRequest + if (!requestMemory(amountToRequest)) { + // If the first request is not granted, try again after ensuring free space + // If there is still not enough space, give up and drop the partition + val extraSpaceNeeded = maxUnrollMemory - currentUnrollMemory + val result = ensureFreeSpace(blockId, extraSpaceNeeded) + droppedBlocks ++= result.droppedBlocks + keepUnrolling = requestMemory(amountToRequest) } + memoryThreshold = amountToRequest } } } @@ -292,9 +281,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // we release the memory claimed by this thread later on when the task finishes. if (keepUnrolling) { vector = null - unrollMemoryMap.synchronized { - unrollMemoryMap(threadId) = 0 - } + releaseUnrollMemory() } } } @@ -387,7 +374,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } // Take into account the amount of memory currently occupied by unrolling blocks - val freeSpace = unrollMemoryMap.synchronized { freeMemory - unrollMemoryMap.values.sum } + val freeSpace = freeMemory - currentUnrollMemory if (freeSpace < space) { val rddToAdd = getRddId(blockIdToAdd) @@ -439,6 +426,34 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def contains(blockId: BlockId): Boolean = { entries.synchronized { entries.containsKey(blockId) } } + + /** + * Reserve memory for unrolling blocks used by this thread. + */ + private def reserveUnrollMemory(memory: Long): Unit = putLock.synchronized { + unrollMemoryMap(Thread.currentThread().getId) = memory + } + + /** + * Release memory used by this thread for unrolling blocks. + */ + private[spark] def releaseUnrollMemory(): Unit = putLock.synchronized { + unrollMemoryMap.remove(Thread.currentThread().getId) + } + + /** + * Return the amount of memory currently occupied for unrolling blocks across all threads. + */ + private def currentUnrollMemory: Long = putLock.synchronized { + unrollMemoryMap.values.sum + } + + /** + * Return the amount of memory currently occupied for unrolling blocks by this thread. + */ + private def threadCurrentUnrollMemory: Long = putLock.synchronized { + unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L) + } } private[spark] case class ResultWithDroppedBlocks(