Skip to content

Commit

Permalink
cacheMemoryMap -> unrollMemoryMap
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Jul 18, 2014
1 parent 198e374 commit e9c3cb0
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 14 deletions.
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ package org.apache.spark
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util.collection.SizeTrackingAppendOnlyBuffer

/**
* Spark class responsible for passing RDDs partition contents to the BlockManager and making
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ class SparkEnv (
val metricsSystem: MetricsSystem,
val conf: SparkConf) extends Logging {

// A mapping of thread ID to amount of memory, in bytes, used for shuffle
// A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()

// A mapping of thread ID to amount of memory, in bytes, used for unrolling an RDD partition
// A mapping of thread ID to amount of memory, in bytes, used for unrolling a block
// All accesses should be manually synchronized
val cacheMemoryMap = mutable.HashMap[Long, Long]()
val unrollMemoryMap = mutable.HashMap[Long, Long]()

private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ private[spark] class Executor(
} finally {
val threadId = Thread.currentThread().getId
val shuffleMemoryMap = env.shuffleMemoryMap
val cacheMemoryMap = env.cacheMemoryMap
val unrollMemoryMap = env.unrollMemoryMap
shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(threadId) }
cacheMemoryMap.synchronized { cacheMemoryMap.remove(threadId) }
unrollMemoryMap.synchronized { unrollMemoryMap.remove(threadId) }
runningTasks.remove(taskId)
}
}
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer

val threadId = Thread.currentThread().getId
val cacheMemoryMap = SparkEnv.get.cacheMemoryMap
val unrollMemoryMap = SparkEnv.get.unrollMemoryMap
var buffer = new SizeTrackingAppendOnlyBuffer[Any]

try {
Expand All @@ -212,15 +212,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
previousSize = currentSize

// Atomically check whether there is sufficient memory in the global pool to continue
cacheMemoryMap.synchronized {
val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L)
val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory
unrollMemoryMap.synchronized {
val previouslyOccupiedMemory = unrollMemoryMap.get(threadId).getOrElse(0L)
val otherThreadsMemory = unrollMemoryMap.values.sum - previouslyOccupiedMemory

// Request for memory for the local buffer, and return whether request is granted
def requestForMemory(): Boolean = {
val availableMemory = freeMemory - otherThreadsMemory
val granted = availableMemory > memoryToRequest
if (granted) { cacheMemoryMap(threadId) = memoryToRequest }
if (granted) { unrollMemoryMap(threadId) = memoryToRequest }
granted
}

Expand All @@ -247,8 +247,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// Unless we return an iterator that depends on the buffer, free up space for other threads
if (enoughMemory) {
buffer = null
cacheMemoryMap.synchronized {
cacheMemoryMap(threadId) = 0
unrollMemoryMap.synchronized {
unrollMemoryMap(threadId) = 0
}
}
}
Expand Down

0 comments on commit e9c3cb0

Please sign in to comment.