Skip to content

Commit

Permalink
Add initial memory request threshold + rename a few things
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Jul 18, 2014
1 parent e9c3cb0 commit a10b0e7
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,20 +191,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
: Either[Array[Any], Iterator[Any]] = {

var count = 0 // The number of elements unrolled so far
var enoughMemory = true // Whether there is enough memory to unroll this block
var previousSize = 0L // Previous estimate of the size of our buffer
val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer
var count = 0 // The number of elements unrolled so far
var atMemoryLimit = false // Whether we are at the memory limit for unrolling blocks
var previousSize = 0L // Previous estimate of the size of our buffer
val memoryRequestPeriod = 1000 // How frequently we request more memory for our buffer
val memoryRequestThreshold = 100 // Before this is exceeded, request memory every round

val threadId = Thread.currentThread().getId
val unrollMemoryMap = SparkEnv.get.unrollMemoryMap
var buffer = new SizeTrackingAppendOnlyBuffer[Any]

try {
while (values.hasNext && enoughMemory) {
while (values.hasNext && !atMemoryLimit) {
buffer += values.next()
count += 1
if (count % memoryRequestPeriod == 1) {
if (count % memoryRequestPeriod == 1 || count < memoryRequestThreshold) {
// Calculate the amount of memory to request from the global memory pool
val currentSize = buffer.estimateSize()
val delta = if (previousSize > 0) math.max(currentSize - previousSize, 0) else 0
Expand All @@ -216,8 +217,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val previouslyOccupiedMemory = unrollMemoryMap.get(threadId).getOrElse(0L)
val otherThreadsMemory = unrollMemoryMap.values.sum - previouslyOccupiedMemory

// Request for memory for the local buffer, and return whether request is granted
def requestForMemory(): Boolean = {
// Request memory for the local buffer and return whether request is granted
def requestMemory(): Boolean = {
val availableMemory = freeMemory - otherThreadsMemory
val granted = availableMemory > memoryToRequest
if (granted) { unrollMemoryMap(threadId) = memoryToRequest }
Expand All @@ -226,16 +227,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)

// If the first request is not granted, try again after ensuring free space
// If there is still not enough space, give up and drop the partition
if (!requestForMemory()) {
if (!requestMemory()) {
val result = ensureFreeSpace(blockId, globalBufferMemory)
droppedBlocks ++= result.droppedBlocks
enoughMemory = requestForMemory()
atMemoryLimit = !requestMemory()
}
}
}
}

if (enoughMemory) {
if (!atMemoryLimit) {
// We successfully unrolled the entirety of this block
Left(buffer.array)
} else {
Expand All @@ -245,7 +246,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)

} finally {
// Unless we return an iterator that depends on the buffer, free up space for other threads
if (enoughMemory) {
if (!atMemoryLimit) {
buffer = null
unrollMemoryMap.synchronized {
unrollMemoryMap(threadId) = 0
Expand Down

0 comments on commit a10b0e7

Please sign in to comment.