From e9c3cb03f54f4760f2f8635feef4852a92008842 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 17 Jul 2014 17:09:34 -0700 Subject: [PATCH] cacheMemoryMap -> unrollMemoryMap --- .../main/scala/org/apache/spark/CacheManager.scala | 2 -- .../src/main/scala/org/apache/spark/SparkEnv.scala | 6 +++--- .../scala/org/apache/spark/executor/Executor.scala | 4 ++-- .../org/apache/spark/storage/MemoryStore.scala | 14 +++++++------- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 07b32680a6c8a..4e9999ad118a3 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -20,10 +20,8 @@ package org.apache.spark import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.executor.InputMetrics import org.apache.spark.rdd.RDD import org.apache.spark.storage._ -import org.apache.spark.util.collection.SizeTrackingAppendOnlyBuffer /** * Spark class responsible for passing RDDs partition contents to the BlockManager and making diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 8923d44e3b439..f4f26cadcc738 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -67,13 +67,13 @@ class SparkEnv ( val metricsSystem: MetricsSystem, val conf: SparkConf) extends Logging { - // A mapping of thread ID to amount of memory, in bytes, used for shuffle + // A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() - // A mapping of thread ID to amount of memory, in bytes, used for unrolling an RDD partition + // A mapping of thread ID to amount of memory, in bytes, used for unrolling a block // All accesses should be manually synchronized - val cacheMemoryMap = mutable.HashMap[Long, Long]() + val unrollMemoryMap = mutable.HashMap[Long, Long]() private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 7bf8084b8b0e3..5e748c100d680 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -268,9 +268,9 @@ private[spark] class Executor( } finally { val threadId = Thread.currentThread().getId val shuffleMemoryMap = env.shuffleMemoryMap - val cacheMemoryMap = env.cacheMemoryMap + val unrollMemoryMap = env.unrollMemoryMap shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(threadId) } - cacheMemoryMap.synchronized { cacheMemoryMap.remove(threadId) } + unrollMemoryMap.synchronized { unrollMemoryMap.remove(threadId) } runningTasks.remove(taskId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 2b8fb9c7d3137..08782ca0835e0 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -197,7 +197,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer val threadId = Thread.currentThread().getId - val cacheMemoryMap = SparkEnv.get.cacheMemoryMap + val unrollMemoryMap = SparkEnv.get.unrollMemoryMap var buffer = new SizeTrackingAppendOnlyBuffer[Any] try { @@ -212,15 +212,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) previousSize = currentSize // Atomically check whether there is sufficient memory in the global pool to continue - cacheMemoryMap.synchronized { - val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L) - val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory + unrollMemoryMap.synchronized { + val previouslyOccupiedMemory = unrollMemoryMap.get(threadId).getOrElse(0L) + val otherThreadsMemory = unrollMemoryMap.values.sum - previouslyOccupiedMemory // Request for memory for the local buffer, and return whether request is granted def requestForMemory(): Boolean = { val availableMemory = freeMemory - otherThreadsMemory val granted = availableMemory > memoryToRequest - if (granted) { cacheMemoryMap(threadId) = memoryToRequest } + if (granted) { unrollMemoryMap(threadId) = memoryToRequest } granted } @@ -247,8 +247,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Unless we return an iterator that depends on the buffer, free up space for other threads if (enoughMemory) { buffer = null - cacheMemoryMap.synchronized { - cacheMemoryMap(threadId) = 0 + unrollMemoryMap.synchronized { + unrollMemoryMap(threadId) = 0 } } }