Skip to content

Commit

Permalink
Simplify locking behavior on unrollMemoryMap
Browse files Browse the repository at this point in the history
This gets rid of the need to synchronize on unrollMemoryMap in
addition to putLock. Now we require all accesses on unrollMemoryMap
to synchronize on putLock, including when the executor releases the
unroll memory after a task ends.
  • Loading branch information
andrewor14 committed Jul 24, 2014
1 parent ed6cda4 commit b9a6eee
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 44 deletions.
4 changes: 0 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ class SparkEnv (
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()

// A mapping of thread ID to amount of memory, in bytes, used for unrolling a block
// All accesses should be manually synchronized
val unrollMemoryMap = mutable.HashMap[Long, Long]()

private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

// A general, soft-reference map for metadata needed during HadoopRDD split computation
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,13 @@ private[spark] class Executor(
}
}
} finally {
val threadId = Thread.currentThread().getId
// Release memory used by this thread for shuffles
val shuffleMemoryMap = env.shuffleMemoryMap
val unrollMemoryMap = env.unrollMemoryMap
shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(threadId) }
unrollMemoryMap.synchronized { unrollMemoryMap.remove(threadId) }
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
// Release memory used by this thread for unrolling blocks
env.blockManager.memoryStore.releaseUnrollMemory()
runningTasks.remove(taskId)
}
}
Expand Down
87 changes: 51 additions & 36 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package org.apache.spark.storage
import java.nio.ByteBuffer
import java.util.LinkedHashMap

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkEnv
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector

Expand All @@ -44,20 +44,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// blocks from the memory store.
private val putLock = new Object()

/**
* Mapping from thread ID to memory used for unrolling blocks.
*
* To avoid potential deadlocks, all accesses of this map in MemoryStore are assumed to
* first synchronize on `putLock` and then on `unrollMemoryMap`, in that particular order.
* This is lazy because SparkEnv does not exist when we mock this class in tests.
*/
private lazy val unrollMemoryMap = SparkEnv.get.unrollMemoryMap
// A mapping from thread ID to amount of memory used for unrolling a block (in bytes)
// All accesses of this map are assumed to have manually synchronized on `putLock`
private val unrollMemoryMap = mutable.HashMap[Long, Long]()

/**
* The amount of space ensured for unrolling values in memory, shared across all cores.
* This space is not reserved in advance, but allocated dynamically by dropping existing blocks.
*/
private val globalUnrollMemory = {
private val maxUnrollMemory: Long = {
val unrollFraction = conf.getDouble("spark.storage.unrollFraction", 0.2)
(maxMemory * unrollFraction).toLong
}
Expand Down Expand Up @@ -227,22 +222,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = 1.5

val threadId = Thread.currentThread().getId
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[Any]

// Request memory for our vector and return whether the request is granted. This involves
// synchronizing on putLock and unrollMemoryMap (in that order), which could be expensive.
// Request memory for our vector and return whether the request is granted
// This involves synchronizing across all threads, which is expensive if called frequently
def requestMemory(memoryToRequest: Long): Boolean = {
putLock.synchronized {
unrollMemoryMap.synchronized {
val previouslyOccupiedMemory = unrollMemoryMap.get(threadId).getOrElse(0L)
val otherThreadsMemory = unrollMemoryMap.values.sum - previouslyOccupiedMemory
val availableMemory = freeMemory - otherThreadsMemory
val granted = availableMemory > memoryToRequest
if (granted) { unrollMemoryMap(threadId) = memoryToRequest }
granted
}
val otherThreadsMemory = currentUnrollMemory - threadCurrentUnrollMemory
val availableMemory = freeMemory - otherThreadsMemory
val granted = availableMemory > memoryToRequest
if (granted) { reserveUnrollMemory(memoryToRequest) }
granted
}
}

Expand All @@ -261,17 +252,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// Hold the put lock, in case another thread concurrently puts a block that takes
// up the unrolling space we just ensured here
putLock.synchronized {
unrollMemoryMap.synchronized {
if (!requestMemory(amountToRequest)) {
// If the first request is not granted, try again after ensuring free space
// If there is still not enough space, give up and drop the partition
val extraSpaceNeeded = globalUnrollMemory - unrollMemoryMap.values.sum
val result = ensureFreeSpace(blockId, extraSpaceNeeded)
droppedBlocks ++= result.droppedBlocks
keepUnrolling = requestMemory(amountToRequest)
}
memoryThreshold = amountToRequest
if (!requestMemory(amountToRequest)) {
// If the first request is not granted, try again after ensuring free space
// If there is still not enough space, give up and drop the partition
val extraSpaceNeeded = maxUnrollMemory - currentUnrollMemory
val result = ensureFreeSpace(blockId, extraSpaceNeeded)
droppedBlocks ++= result.droppedBlocks
keepUnrolling = requestMemory(amountToRequest)
}
memoryThreshold = amountToRequest
}
}
}
Expand All @@ -292,9 +281,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// we release the memory claimed by this thread later on when the task finishes.
if (keepUnrolling) {
vector = null
unrollMemoryMap.synchronized {
unrollMemoryMap(threadId) = 0
}
releaseUnrollMemory()
}
}
}
Expand Down Expand Up @@ -387,7 +374,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}

// Take into account the amount of memory currently occupied by unrolling blocks
val freeSpace = unrollMemoryMap.synchronized { freeMemory - unrollMemoryMap.values.sum }
val freeSpace = freeMemory - currentUnrollMemory

if (freeSpace < space) {
val rddToAdd = getRddId(blockIdToAdd)
Expand Down Expand Up @@ -439,6 +426,34 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def contains(blockId: BlockId): Boolean = {
entries.synchronized { entries.containsKey(blockId) }
}

/**
* Reserve memory for unrolling blocks used by this thread.
*/
private def reserveUnrollMemory(memory: Long): Unit = putLock.synchronized {
unrollMemoryMap(Thread.currentThread().getId) = memory
}

/**
* Release memory used by this thread for unrolling blocks.
*/
private[spark] def releaseUnrollMemory(): Unit = putLock.synchronized {
unrollMemoryMap.remove(Thread.currentThread().getId)
}

/**
* Return the amount of memory currently occupied for unrolling blocks across all threads.
*/
private def currentUnrollMemory: Long = putLock.synchronized {
unrollMemoryMap.values.sum
}

/**
* Return the amount of memory currently occupied for unrolling blocks by this thread.
*/
private def threadCurrentUnrollMemory: Long = putLock.synchronized {
unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
}
}

private[spark] case class ResultWithDroppedBlocks(
Expand Down

0 comments on commit b9a6eee

Please sign in to comment.