From f57f3f2248050dbad4516b773cf225b10bc40d80 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 28 Jul 2015 11:22:56 -0700 Subject: [PATCH] More thread -> task changes --- .../apache/spark/storage/MemoryStore.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 18f728c0484c0..a008428da0ff1 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -48,7 +48,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val unrollMemoryMap = mutable.HashMap[Long, Long]() // Same as `unrollMemoryMap`, but for pending unroll memory as defined below. - // Pending unroll memory refers to the intermediate memory occupied by a thread + // Pending unroll memory refers to the intermediate memory occupied by a task // after the unroll but before the actual putting of the block in the cache. // This chunk of memory is expected to be released *as soon as* we finish // caching the corresponding block as opposed to until after the task finishes. @@ -251,15 +251,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) var elementsUnrolled = 0 // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true - // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing. + // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing. val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory val memoryCheckPeriod = 16 - // Memory currently reserved by this thread for this particular unrolling operation + // Memory currently reserved by this task for this particular unrolling operation var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size val memoryGrowthFactor = 1.5 - // Previous unroll memory held by this thread, for releasing later (only at the very end) + // Previous unroll memory held by this task, for releasing later (only at the very end) val previousMemoryReserved = currentUnrollMemoryForThisTask // Underlying vector for unrolling the block var vector = new SizeTrackingVector[Any] @@ -428,9 +428,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Take into account the amount of memory currently occupied by unrolling blocks // and minus the pending unroll memory for that block on current thread. - val threadId = Thread.currentThread().getId + val taskAttemptId = currentTaskAttemptId() val actualFreeMemory = freeMemory - currentUnrollMemory + - pendingUnrollMemoryMap.getOrElse(threadId, 0L) + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) if (actualFreeMemory < space) { val rddToAdd = getRddId(blockIdToAdd) @@ -456,7 +456,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) logInfo(s"${selectedBlocks.size} blocks selected for dropping") for (blockId <- selectedBlocks) { val entry = entries.synchronized { entries.get(blockId) } - // This should never be null as only one thread should be dropping + // This should never be null as only one task should be dropping // blocks and removing entries. However the check is still here for // future safety. if (entry != null) { @@ -503,7 +503,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** - * Release memory used by this thread for unrolling blocks. + * Release memory used by this task for unrolling blocks. * If the amount is not specified, remove the current task's allocation altogether. */ def releaseUnrollMemoryForThisTask(memory: Long = -1L): Unit = { @@ -513,7 +513,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) unrollMemoryMap.remove(taskAttemptId) } else { unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, memory) - memory - // If this thread claims no more unroll memory, release it completely + // If this task claims no more unroll memory, release it completely if (unrollMemoryMap(taskAttemptId) <= 0) { unrollMemoryMap.remove(taskAttemptId) } @@ -554,7 +554,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Return the amount of memory currently occupied for unrolling blocks by this task. */ def currentUnrollMemoryForThisTask: Long = accountingLock.synchronized { - unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L) + unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) } /**