diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index ad01078528df6..3aca4e9e74546 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -234,107 +234,100 @@ private[spark] class MemoryStore( val ccbst = new CsdCacheBlockSizeTracker(csdCacheBlockSizeLimit) // Unroll this block safely, checking whether we have exceeded our threshold periodically - try { - var currentSize = 0L - while (values.hasNext && keepUnrolling && ccbst.shouldCache) { - vector += values.next() - if (elementsUnrolled % memoryCheckPeriod == 0) { - // If our vector's size has exceeded the threshold, request more memory - currentSize = vector.estimateSize() - if (ccbst.shouldTurnOffCache(currentSize)) { - ccbst.turnOffCache() - } else if (currentSize >= memoryThreshold) { - val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - keepUnrolling = - reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP) - if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest - } - // New threshold is currentSize * memoryGrowthFactor - memoryThreshold += amountToRequest + var currentSize = 0L + while (values.hasNext && keepUnrolling && ccbst.shouldCache) { + vector += values.next() + if (elementsUnrolled % memoryCheckPeriod == 0) { + // If our vector's size has exceeded the threshold, request more memory + currentSize = vector.estimateSize() + if (ccbst.shouldTurnOffCache(currentSize)) { + ccbst.turnOffCache() + } else if (currentSize >= memoryThreshold) { + val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong + keepUnrolling = + reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP) + if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest } + // New threshold is currentSize * memoryGrowthFactor + memoryThreshold += amountToRequest } - elementsUnrolled += 1 } + elementsUnrolled += 1 + } - if (keepUnrolling && ccbst.shouldCache) { - // We successfully unrolled the entirety of this block - // and the block size is within csdCacheBlockSizeLimit - val arrayValues = vector.toArray - vector = null - val entry = - new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size - - def transferUnrollToStorage(amount: Long): Unit = { - // Synchronize so that transfer is atomic - memoryManager.synchronized { - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) - val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) - assert(success, "transferring unroll memory to storage memory failed") - } + if (keepUnrolling && ccbst.shouldCache) { + // We successfully unrolled the entirety of this block + // and the block size is within csdCacheBlockSizeLimit + val arrayValues = vector.toArray + vector = null + val entry = + new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) + val size = entry.size + def transferUnrollToStorage(amount: Long): Unit = { + // Synchronize so that transfer is atomic + memoryManager.synchronized { + releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) + val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) + assert(success, "transferring unroll memory to storage memory failed") } - - // Acquire storage memory if necessary to store this block in memory. - val enoughStorageMemory = { - if (unrollMemoryUsedByThisBlock <= size) { - val acquiredExtra = - memoryManager.acquireStorageMemory( - blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) - if (acquiredExtra) { - transferUnrollToStorage(unrollMemoryUsedByThisBlock) - } - acquiredExtra - } else { // unrollMemoryUsedByThisBlock > size - // If this task attempt already owns more unroll memory than is necessary to store the - // block, then release the extra memory that will not be used. - val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) - transferUnrollToStorage(size) - true + } + // Acquire storage memory if necessary to store this block in memory. + val enoughStorageMemory = { + if (unrollMemoryUsedByThisBlock <= size) { + val acquiredExtra = + memoryManager.acquireStorageMemory( + blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) + if (acquiredExtra) { + transferUnrollToStorage(unrollMemoryUsedByThisBlock) } + acquiredExtra + } else { // unrollMemoryUsedByThisBlock > size + // If this task attempt already owns more unroll memory than is necessary to store the + // block, then release the extra memory that will not be used. + val excessUnrollMemory = unrollMemoryUsedByThisBlock - size + releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) + transferUnrollToStorage(size) + true } - if (enoughStorageMemory) { - entries.synchronized { - entries.put(blockId, entry) - } - logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) - Right(size) - } else { - assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, - "released too much unroll memory") - Left( - ( - new PartiallyUnrolledIterator( - this, - MemoryMode.ON_HEAP, - unrollMemoryUsedByThisBlock, - unrolled = arrayValues.toIterator, - rest = Iterator.empty), - true - ) - ) + } + if (enoughStorageMemory) { + entries.synchronized { + entries.put(blockId, entry) } + logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( + blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) + Right(size) } else { - val iter = new PartiallyUnrolledIterator( - this, - MemoryMode.ON_HEAP, - unrollMemoryUsedByThisBlock, - unrolled = vector.iterator, - rest = values + assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, + "released too much unroll memory") + Left( + ( + new PartiallyUnrolledIterator( + this, + MemoryMode.ON_HEAP, + unrollMemoryUsedByThisBlock, + unrolled = arrayValues.toIterator, + rest = Iterator.empty), + true + ) ) - // We ran out of space while unrolling the values for this block - if (!ccbst.shouldCache) { - logBlockCacheSizeLimitMessage(blockId, currentSize) - Left((iter, false)) - } else { - logUnrollFailureMessage(blockId, vector.estimateSize()) - Left((iter, true)) - } } - } finally { - + } else { + val iter = new PartiallyUnrolledIterator( + this, + MemoryMode.ON_HEAP, + unrollMemoryUsedByThisBlock, + unrolled = vector.iterator, + rest = values) + // We ran out of space while unrolling the values for this block + if (!ccbst.shouldCache) { + logBlockCacheSizeLimitMessage(blockId, currentSize) + Left((iter, false)) + } else { + logUnrollFailureMessage(blockId, vector.estimateSize()) + Left((iter, true)) + } } } /**