From a10b0e72a4941ae8ca19d8f528b40ba25a129c6d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 17 Jul 2014 17:21:32 -0700 Subject: [PATCH] Add initial memory request threshold + rename a few things --- .../apache/spark/storage/MemoryStore.scala | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 08782ca0835e0..6791369795954 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -191,20 +191,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) : Either[Array[Any], Iterator[Any]] = { - var count = 0 // The number of elements unrolled so far - var enoughMemory = true // Whether there is enough memory to unroll this block - var previousSize = 0L // Previous estimate of the size of our buffer - val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer + var count = 0 // The number of elements unrolled so far + var atMemoryLimit = false // Whether we are at the memory limit for unrolling blocks + var previousSize = 0L // Previous estimate of the size of our buffer + val memoryRequestPeriod = 1000 // How frequently we request more memory for our buffer + val memoryRequestThreshold = 100 // Before this is exceeded, request memory every round val threadId = Thread.currentThread().getId val unrollMemoryMap = SparkEnv.get.unrollMemoryMap var buffer = new SizeTrackingAppendOnlyBuffer[Any] try { - while (values.hasNext && enoughMemory) { + while (values.hasNext && !atMemoryLimit) { buffer += values.next() count += 1 - if (count % memoryRequestPeriod == 1) { + if (count % memoryRequestPeriod == 1 || count < memoryRequestThreshold) { // Calculate the amount of memory to request from the global memory pool val currentSize = buffer.estimateSize() val delta = if (previousSize > 0) math.max(currentSize - previousSize, 0) else 0 @@ -216,8 +217,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val previouslyOccupiedMemory = unrollMemoryMap.get(threadId).getOrElse(0L) val otherThreadsMemory = unrollMemoryMap.values.sum - previouslyOccupiedMemory - // Request for memory for the local buffer, and return whether request is granted - def requestForMemory(): Boolean = { + // Request memory for the local buffer and return whether request is granted + def requestMemory(): Boolean = { val availableMemory = freeMemory - otherThreadsMemory val granted = availableMemory > memoryToRequest if (granted) { unrollMemoryMap(threadId) = memoryToRequest } @@ -226,16 +227,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // If the first request is not granted, try again after ensuring free space // If there is still not enough space, give up and drop the partition - if (!requestForMemory()) { + if (!requestMemory()) { val result = ensureFreeSpace(blockId, globalBufferMemory) droppedBlocks ++= result.droppedBlocks - enoughMemory = requestForMemory() + atMemoryLimit = !requestMemory() } } } } - if (enoughMemory) { + if (!atMemoryLimit) { // We successfully unrolled the entirety of this block Left(buffer.array) } else { @@ -245,7 +246,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } finally { // Unless we return an iterator that depends on the buffer, free up space for other threads - if (enoughMemory) { + if (!atMemoryLimit) { buffer = null unrollMemoryMap.synchronized { unrollMemoryMap(threadId) = 0