Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPY-1544 removed unused try-finally #211

Merged
merged 1 commit into from
Dec 29, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 83 additions & 90 deletions core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -234,107 +234,100 @@ private[spark] class MemoryStore(

val ccbst = new CsdCacheBlockSizeTracker(csdCacheBlockSizeLimit)
// Unroll this block safely, checking whether we have exceeded our threshold periodically
try {
var currentSize = 0L
while (values.hasNext && keepUnrolling && ccbst.shouldCache) {
vector += values.next()
if (elementsUnrolled % memoryCheckPeriod == 0) {
// If our vector's size has exceeded the threshold, request more memory
currentSize = vector.estimateSize()
if (ccbst.shouldTurnOffCache(currentSize)) {
ccbst.turnOffCache()
} else if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
var currentSize = 0L
while (values.hasNext && keepUnrolling && ccbst.shouldCache) {
vector += values.next()
if (elementsUnrolled % memoryCheckPeriod == 0) {
// If our vector's size has exceeded the threshold, request more memory
currentSize = vector.estimateSize()
if (ccbst.shouldTurnOffCache(currentSize)) {
ccbst.turnOffCache()
} else if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
elementsUnrolled += 1
}
elementsUnrolled += 1
}

if (keepUnrolling && ccbst.shouldCache) {
// We successfully unrolled the entirety of this block
// and the block size is within csdCacheBlockSizeLimit
val arrayValues = vector.toArray
vector = null
val entry =
new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
val size = entry.size

def transferUnrollToStorage(amount: Long): Unit = {
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}
if (keepUnrolling && ccbst.shouldCache) {
// We successfully unrolled the entirety of this block
// and the block size is within csdCacheBlockSizeLimit
val arrayValues = vector.toArray
vector = null
val entry =
new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
val size = entry.size
def transferUnrollToStorage(amount: Long): Unit = {
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}

// Acquire storage memory if necessary to store this block in memory.
val enoughStorageMemory = {
if (unrollMemoryUsedByThisBlock <= size) {
val acquiredExtra =
memoryManager.acquireStorageMemory(
blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
if (acquiredExtra) {
transferUnrollToStorage(unrollMemoryUsedByThisBlock)
}
acquiredExtra
} else { // unrollMemoryUsedByThisBlock > size
// If this task attempt already owns more unroll memory than is necessary to store the
// block, then release the extra memory that will not be used.
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
transferUnrollToStorage(size)
true
}
// Acquire storage memory if necessary to store this block in memory.
val enoughStorageMemory = {
if (unrollMemoryUsedByThisBlock <= size) {
val acquiredExtra =
memoryManager.acquireStorageMemory(
blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
if (acquiredExtra) {
transferUnrollToStorage(unrollMemoryUsedByThisBlock)
}
acquiredExtra
} else { // unrollMemoryUsedByThisBlock > size
// If this task attempt already owns more unroll memory than is necessary to store the
// block, then release the extra memory that will not be used.
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
transferUnrollToStorage(size)
true
}
if (enoughStorageMemory) {
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(size)
} else {
assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
"released too much unroll memory")
Left(
(
new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = arrayValues.toIterator,
rest = Iterator.empty),
true
)
)
}
if (enoughStorageMemory) {
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(size)
} else {
val iter = new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = vector.iterator,
rest = values
assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
"released too much unroll memory")
Left(
(
new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = arrayValues.toIterator,
rest = Iterator.empty),
true
)
)
// We ran out of space while unrolling the values for this block
if (!ccbst.shouldCache) {
logBlockCacheSizeLimitMessage(blockId, currentSize)
Left((iter, false))
} else {
logUnrollFailureMessage(blockId, vector.estimateSize())
Left((iter, true))
}
}
} finally {

} else {
val iter = new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = vector.iterator,
rest = values)
// We ran out of space while unrolling the values for this block
if (!ccbst.shouldCache) {
logBlockCacheSizeLimitMessage(blockId, currentSize)
Left((iter, false))
} else {
logUnrollFailureMessage(blockId, vector.estimateSize())
Left((iter, true))
}
}
}
/**
Expand Down