From b9a6eee0a7fd5acdf3af647797416b337ef4fa11 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 24 Jul 2014 15:37:41 -0700 Subject: [PATCH] Simplify locking behavior on unrollMemoryMap This gets rid of the need to synchronize on unrollMemoryMap in addition to putLock. Now we require all accesses on unrollMemoryMap to synchronize on putLock, including when the executor releases the unroll memory after a task ends. --- .../scala/org/apache/spark/SparkEnv.scala | 4 - .../org/apache/spark/executor/Executor.scala | 10 ++- .../apache/spark/storage/MemoryStore.scala | 87 +++++++++++-------- 3 files changed, 57 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index f4f26cadcc738..ae9147e499710 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -71,10 +71,6 @@ class SparkEnv ( // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() - // A mapping of thread ID to amount of memory, in bytes, used for unrolling a block - // All accesses should be manually synchronized - val unrollMemoryMap = mutable.HashMap[Long, Long]() - private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() // A general, soft-reference map for metadata needed during HadoopRDD split computation diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5e748c100d680..bd9d2cfb034be 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -266,11 +266,13 @@ private[spark] class Executor( } } } finally { - val threadId = Thread.currentThread().getId + // Release memory used by this thread for shuffles val shuffleMemoryMap = env.shuffleMemoryMap - val unrollMemoryMap = env.unrollMemoryMap - shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(threadId) } - unrollMemoryMap.synchronized { unrollMemoryMap.remove(threadId) } + shuffleMemoryMap.synchronized { + shuffleMemoryMap.remove(Thread.currentThread().getId) + } + // Release memory used by this thread for unrolling blocks + env.blockManager.memoryStore.releaseUnrollMemory() runningTasks.remove(taskId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index f115b8e106c32..d9da1478be71c 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -20,9 +20,9 @@ package org.apache.spark.storage import java.nio.ByteBuffer import java.util.LinkedHashMap +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.SparkEnv import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector @@ -44,20 +44,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // blocks from the memory store. private val putLock = new Object() - /** - * Mapping from thread ID to memory used for unrolling blocks. - * - * To avoid potential deadlocks, all accesses of this map in MemoryStore are assumed to - * first synchronize on `putLock` and then on `unrollMemoryMap`, in that particular order. - * This is lazy because SparkEnv does not exist when we mock this class in tests. - */ - private lazy val unrollMemoryMap = SparkEnv.get.unrollMemoryMap + // A mapping from thread ID to amount of memory used for unrolling a block (in bytes) + // All accesses of this map are assumed to have manually synchronized on `putLock` + private val unrollMemoryMap = mutable.HashMap[Long, Long]() /** * The amount of space ensured for unrolling values in memory, shared across all cores. * This space is not reserved in advance, but allocated dynamically by dropping existing blocks. */ - private val globalUnrollMemory = { + private val maxUnrollMemory: Long = { val unrollFraction = conf.getDouble("spark.storage.unrollFraction", 0.2) (maxMemory * unrollFraction).toLong } @@ -227,22 +222,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size val memoryGrowthFactor = 1.5 - - val threadId = Thread.currentThread().getId + // Underlying vector for unrolling the block var vector = new SizeTrackingVector[Any] - // Request memory for our vector and return whether the request is granted. This involves - // synchronizing on putLock and unrollMemoryMap (in that order), which could be expensive. + // Request memory for our vector and return whether the request is granted + // This involves synchronizing across all threads, which is expensive if called frequently def requestMemory(memoryToRequest: Long): Boolean = { putLock.synchronized { - unrollMemoryMap.synchronized { - val previouslyOccupiedMemory = unrollMemoryMap.get(threadId).getOrElse(0L) - val otherThreadsMemory = unrollMemoryMap.values.sum - previouslyOccupiedMemory - val availableMemory = freeMemory - otherThreadsMemory - val granted = availableMemory > memoryToRequest - if (granted) { unrollMemoryMap(threadId) = memoryToRequest } - granted - } + val otherThreadsMemory = currentUnrollMemory - threadCurrentUnrollMemory + val availableMemory = freeMemory - otherThreadsMemory + val granted = availableMemory > memoryToRequest + if (granted) { reserveUnrollMemory(memoryToRequest) } + granted } } @@ -261,17 +252,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Hold the put lock, in case another thread concurrently puts a block that takes // up the unrolling space we just ensured here putLock.synchronized { - unrollMemoryMap.synchronized { - if (!requestMemory(amountToRequest)) { - // If the first request is not granted, try again after ensuring free space - // If there is still not enough space, give up and drop the partition - val extraSpaceNeeded = globalUnrollMemory - unrollMemoryMap.values.sum - val result = ensureFreeSpace(blockId, extraSpaceNeeded) - droppedBlocks ++= result.droppedBlocks - keepUnrolling = requestMemory(amountToRequest) - } - memoryThreshold = amountToRequest + if (!requestMemory(amountToRequest)) { + // If the first request is not granted, try again after ensuring free space + // If there is still not enough space, give up and drop the partition + val extraSpaceNeeded = maxUnrollMemory - currentUnrollMemory + val result = ensureFreeSpace(blockId, extraSpaceNeeded) + droppedBlocks ++= result.droppedBlocks + keepUnrolling = requestMemory(amountToRequest) } + memoryThreshold = amountToRequest } } } @@ -292,9 +281,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // we release the memory claimed by this thread later on when the task finishes. if (keepUnrolling) { vector = null - unrollMemoryMap.synchronized { - unrollMemoryMap(threadId) = 0 - } + releaseUnrollMemory() } } } @@ -387,7 +374,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } // Take into account the amount of memory currently occupied by unrolling blocks - val freeSpace = unrollMemoryMap.synchronized { freeMemory - unrollMemoryMap.values.sum } + val freeSpace = freeMemory - currentUnrollMemory if (freeSpace < space) { val rddToAdd = getRddId(blockIdToAdd) @@ -439,6 +426,34 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def contains(blockId: BlockId): Boolean = { entries.synchronized { entries.containsKey(blockId) } } + + /** + * Reserve memory for unrolling blocks used by this thread. + */ + private def reserveUnrollMemory(memory: Long): Unit = putLock.synchronized { + unrollMemoryMap(Thread.currentThread().getId) = memory + } + + /** + * Release memory used by this thread for unrolling blocks. + */ + private[spark] def releaseUnrollMemory(): Unit = putLock.synchronized { + unrollMemoryMap.remove(Thread.currentThread().getId) + } + + /** + * Return the amount of memory currently occupied for unrolling blocks across all threads. + */ + private def currentUnrollMemory: Long = putLock.synchronized { + unrollMemoryMap.values.sum + } + + /** + * Return the amount of memory currently occupied for unrolling blocks by this thread. + */ + private def threadCurrentUnrollMemory: Long = putLock.synchronized { + unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L) + } } private[spark] case class ResultWithDroppedBlocks(