Skip to content

Commit

Permalink
Free buffer memory in finally
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Jul 4, 2014
1 parent ea02eec commit d5dd3b4
Showing 1 changed file with 60 additions and 53 deletions.
113 changes: 60 additions & 53 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,65 +163,72 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val cacheMemoryMap = SparkEnv.get.cacheMemoryMap
var buffer = new SizeTrackingAppendOnlyBuffer[Any]

/* While adding values to the in-memory buffer, periodically check whether the memory
* restrictions for unrolling partitions are still satisfied. If not, stop immediately,
* and persist the partition to disk if specified by the storage level. This check is
* a safeguard against the scenario when a single partition does not fit in memory. */
while (values.hasNext && !dropPartition) {
buffer += values.next()
count += 1
if (count % memoryRequestPeriod == 1) {
// Calculate the amount of memory to request from the global memory pool
val currentSize = buffer.estimateSize()
val delta = math.max(currentSize - previousSize, 0)
val memoryToRequest = currentSize + delta
previousSize = currentSize

// Atomically check whether there is sufficient memory in the global pool to continue
cacheMemoryMap.synchronized {
val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L)
val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory

// Request for memory for the local buffer, and return whether request is granted
def requestForMemory(): Boolean = {
val availableMemory = blockManager.memoryStore.freeMemory - otherThreadsMemory
val granted = availableMemory > memoryToRequest
if (granted) { cacheMemoryMap(threadId) = memoryToRequest }
granted
}

// If the first request is not granted, try again after ensuring free space
// If there is still not enough space, give up and drop the partition
if (!requestForMemory()) {
val result = blockManager.memoryStore.ensureFreeSpace(key, globalBufferMemory)
updatedBlocks ++= result.droppedBlocks
dropPartition = !requestForMemory()
try {
/* While adding values to the in-memory buffer, periodically check whether the memory
* restrictions for unrolling partitions are still satisfied. If not, stop immediately,
* and persist the partition to disk if specified by the storage level. This check is
* a safeguard against the scenario when a single partition does not fit in memory. */
while (values.hasNext && !dropPartition) {
buffer += values.next()
count += 1
if (count % memoryRequestPeriod == 1) {
// Calculate the amount of memory to request from the global memory pool
val currentSize = buffer.estimateSize()
val delta = math.max(currentSize - previousSize, 0)
val memoryToRequest = currentSize + delta
previousSize = currentSize

// Atomically check whether there is sufficient memory in the global pool to continue
cacheMemoryMap.synchronized {
val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L)
val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory

// Request for memory for the local buffer, and return whether request is granted
def requestForMemory(): Boolean = {
val availableMemory = blockManager.memoryStore.freeMemory - otherThreadsMemory
val granted = availableMemory > memoryToRequest
if (granted) { cacheMemoryMap(threadId) = memoryToRequest }
granted
}

// If the first request is not granted, try again after ensuring free space
// If there is still not enough space, give up and drop the partition
if (!requestForMemory()) {
val result = blockManager.memoryStore.ensureFreeSpace(key, globalBufferMemory)
updatedBlocks ++= result.droppedBlocks
dropPartition = !requestForMemory()
}
}
}
}
}

if (!dropPartition) {
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++= blockManager.put(key, buffer.array, storageLevel, tellMaster = true)
buffer.iterator.asInstanceOf[Iterator[T]]
} else {
// We have exceeded our collective quota. This partition will not be cached in memory.
val persistToDisk = storageLevel.useDisk
logWarning(s"Failed to cache $key in memory! There is not enough space to unroll the " +
s"entire partition. " + (if (persistToDisk) "Persisting to disk instead." else ""))
var newValues = (buffer.iterator ++ values).asInstanceOf[Iterator[T]]
if (persistToDisk) {
val newLevel = StorageLevel(storageLevel.useDisk, useMemory = false,
storageLevel.useOffHeap, storageLevel.deserialized, storageLevel.replication)
newValues = putInBlockManager[T](key, newValues, newLevel, updatedBlocks)
// Free up buffer for other threads
buffer = null
cacheMemoryMap.synchronized {
cacheMemoryMap(threadId) = 0
if (!dropPartition) {
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++= blockManager.put(key, buffer.array, storageLevel, tellMaster = true)
buffer.iterator.asInstanceOf[Iterator[T]]
} else {
// We have exceeded our collective quota. This partition will not be cached in memory.
val persistToDisk = storageLevel.useDisk
logWarning(s"Failed to cache $key in memory! There is not enough space to unroll the " +
s"entire partition. " + (if (persistToDisk) "Persisting to disk instead." else ""))
var newValues = (buffer.iterator ++ values).asInstanceOf[Iterator[T]]
if (persistToDisk) {
val newLevel = StorageLevel(
storageLevel.useDisk,
useMemory = false,
storageLevel.useOffHeap,
deserialized = false,
storageLevel.replication)
newValues = putInBlockManager[T](key, newValues, newLevel, updatedBlocks)
}
newValues
}
} finally {
// Free up buffer for other threads
buffer = null
cacheMemoryMap.synchronized {
cacheMemoryMap(threadId) = 0
}
newValues
}
}
}
Expand Down

0 comments on commit d5dd3b4

Please sign in to comment.