From d5dd3b4aac6b3992591813f3bd1fcfb9f2adbec2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 3 Jul 2014 19:53:31 -0700 Subject: [PATCH] Free buffer memory in finally --- .../scala/org/apache/spark/CacheManager.scala | 113 ++++++++++-------- 1 file changed, 60 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index ee25458f2ad48..f0eef9bb27945 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -163,65 +163,72 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val cacheMemoryMap = SparkEnv.get.cacheMemoryMap var buffer = new SizeTrackingAppendOnlyBuffer[Any] - /* While adding values to the in-memory buffer, periodically check whether the memory - * restrictions for unrolling partitions are still satisfied. If not, stop immediately, - * and persist the partition to disk if specified by the storage level. This check is - * a safeguard against the scenario when a single partition does not fit in memory. */ - while (values.hasNext && !dropPartition) { - buffer += values.next() - count += 1 - if (count % memoryRequestPeriod == 1) { - // Calculate the amount of memory to request from the global memory pool - val currentSize = buffer.estimateSize() - val delta = math.max(currentSize - previousSize, 0) - val memoryToRequest = currentSize + delta - previousSize = currentSize - - // Atomically check whether there is sufficient memory in the global pool to continue - cacheMemoryMap.synchronized { - val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L) - val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory - - // Request for memory for the local buffer, and return whether request is granted - def requestForMemory(): Boolean = { - val availableMemory = blockManager.memoryStore.freeMemory - otherThreadsMemory - val granted = availableMemory > memoryToRequest - if (granted) { cacheMemoryMap(threadId) = memoryToRequest } - granted - } - - // If the first request is not granted, try again after ensuring free space - // If there is still not enough space, give up and drop the partition - if (!requestForMemory()) { - val result = blockManager.memoryStore.ensureFreeSpace(key, globalBufferMemory) - updatedBlocks ++= result.droppedBlocks - dropPartition = !requestForMemory() + try { + /* While adding values to the in-memory buffer, periodically check whether the memory + * restrictions for unrolling partitions are still satisfied. If not, stop immediately, + * and persist the partition to disk if specified by the storage level. This check is + * a safeguard against the scenario when a single partition does not fit in memory. */ + while (values.hasNext && !dropPartition) { + buffer += values.next() + count += 1 + if (count % memoryRequestPeriod == 1) { + // Calculate the amount of memory to request from the global memory pool + val currentSize = buffer.estimateSize() + val delta = math.max(currentSize - previousSize, 0) + val memoryToRequest = currentSize + delta + previousSize = currentSize + + // Atomically check whether there is sufficient memory in the global pool to continue + cacheMemoryMap.synchronized { + val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L) + val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory + + // Request for memory for the local buffer, and return whether request is granted + def requestForMemory(): Boolean = { + val availableMemory = blockManager.memoryStore.freeMemory - otherThreadsMemory + val granted = availableMemory > memoryToRequest + if (granted) { cacheMemoryMap(threadId) = memoryToRequest } + granted + } + + // If the first request is not granted, try again after ensuring free space + // If there is still not enough space, give up and drop the partition + if (!requestForMemory()) { + val result = blockManager.memoryStore.ensureFreeSpace(key, globalBufferMemory) + updatedBlocks ++= result.droppedBlocks + dropPartition = !requestForMemory() + } } } } - } - if (!dropPartition) { - // We have successfully unrolled the entire partition, so cache it in memory - updatedBlocks ++= blockManager.put(key, buffer.array, storageLevel, tellMaster = true) - buffer.iterator.asInstanceOf[Iterator[T]] - } else { - // We have exceeded our collective quota. This partition will not be cached in memory. - val persistToDisk = storageLevel.useDisk - logWarning(s"Failed to cache $key in memory! There is not enough space to unroll the " + - s"entire partition. " + (if (persistToDisk) "Persisting to disk instead." else "")) - var newValues = (buffer.iterator ++ values).asInstanceOf[Iterator[T]] - if (persistToDisk) { - val newLevel = StorageLevel(storageLevel.useDisk, useMemory = false, - storageLevel.useOffHeap, storageLevel.deserialized, storageLevel.replication) - newValues = putInBlockManager[T](key, newValues, newLevel, updatedBlocks) - // Free up buffer for other threads - buffer = null - cacheMemoryMap.synchronized { - cacheMemoryMap(threadId) = 0 + if (!dropPartition) { + // We have successfully unrolled the entire partition, so cache it in memory + updatedBlocks ++= blockManager.put(key, buffer.array, storageLevel, tellMaster = true) + buffer.iterator.asInstanceOf[Iterator[T]] + } else { + // We have exceeded our collective quota. This partition will not be cached in memory. + val persistToDisk = storageLevel.useDisk + logWarning(s"Failed to cache $key in memory! There is not enough space to unroll the " + + s"entire partition. " + (if (persistToDisk) "Persisting to disk instead." else "")) + var newValues = (buffer.iterator ++ values).asInstanceOf[Iterator[T]] + if (persistToDisk) { + val newLevel = StorageLevel( + storageLevel.useDisk, + useMemory = false, + storageLevel.useOffHeap, + deserialized = false, + storageLevel.replication) + newValues = putInBlockManager[T](key, newValues, newLevel, updatedBlocks) } + newValues + } + } finally { + // Free up buffer for other threads + buffer = null + cacheMemoryMap.synchronized { + cacheMemoryMap(threadId) = 0 } - newValues } } }