Skip to content

Commit

Permalink
More thread -> task changes
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 28, 2015
1 parent fa78ee8 commit f57f3f2
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// All accesses of this map are assumed to have manually synchronized on `accountingLock`
private val unrollMemoryMap = mutable.HashMap[Long, Long]()
// Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
// Pending unroll memory refers to the intermediate memory occupied by a thread
// Pending unroll memory refers to the intermediate memory occupied by a task
// after the unroll but before the actual putting of the block in the cache.
// This chunk of memory is expected to be released *as soon as* we finish
// caching the corresponding block as opposed to until after the task finishes.
Expand Down Expand Up @@ -251,15 +251,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing.
// Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing.
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
val memoryCheckPeriod = 16
// Memory currently reserved by this thread for this particular unrolling operation
// Memory currently reserved by this task for this particular unrolling operation
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = 1.5
// Previous unroll memory held by this thread, for releasing later (only at the very end)
// Previous unroll memory held by this task, for releasing later (only at the very end)
val previousMemoryReserved = currentUnrollMemoryForThisTask
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[Any]
Expand Down Expand Up @@ -428,9 +428,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)

// Take into account the amount of memory currently occupied by unrolling blocks
// and minus the pending unroll memory for that block on current thread.
val threadId = Thread.currentThread().getId
val taskAttemptId = currentTaskAttemptId()
val actualFreeMemory = freeMemory - currentUnrollMemory +
pendingUnrollMemoryMap.getOrElse(threadId, 0L)
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L)

if (actualFreeMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
Expand All @@ -456,7 +456,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one thread should be dropping
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
Expand Down Expand Up @@ -503,7 +503,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}

/**
* Release memory used by this thread for unrolling blocks.
* Release memory used by this task for unrolling blocks.
* If the amount is not specified, remove the current task's allocation altogether.
*/
def releaseUnrollMemoryForThisTask(memory: Long = -1L): Unit = {
Expand All @@ -513,7 +513,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
unrollMemoryMap.remove(taskAttemptId)
} else {
unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, memory) - memory
// If this thread claims no more unroll memory, release it completely
// If this task claims no more unroll memory, release it completely
if (unrollMemoryMap(taskAttemptId) <= 0) {
unrollMemoryMap.remove(taskAttemptId)
}
Expand Down Expand Up @@ -554,7 +554,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Return the amount of memory currently occupied for unrolling blocks by this task.
*/
def currentUnrollMemoryForThisTask: Long = accountingLock.synchronized {
unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
}

/**
Expand Down

0 comments on commit f57f3f2

Please sign in to comment.