Skip to content

Commit

Permalink
[SPARK-3825] Log more detail when unrolling a block fails
Browse files Browse the repository at this point in the history
Before:
```
14/10/06 16:45:42 WARN CacheManager: Not enough space to cache partition rdd_0_2
in memory! Free memory is 481861527 bytes.
```
After:
```
14/10/07 11:08:24 WARN MemoryStore: Not enough space to cache rdd_2_0 in memory!
(computed 68.8 MB so far)
14/10/07 11:08:24 INFO MemoryStore: Memory use = 1088.0 B (blocks) + 445.1 MB
(scratch space shared across 8 thread(s)) = 445.1 MB. Storage limit = 459.5 MB.
```

Author: Andrew Or <[email protected]>

Closes #2688 from andrewor14/cache-log-message and squashes the following commits:

28e33d6 [Andrew Or] Shy away from "unrolling"
5638c49 [Andrew Or] Grammar
39a0c28 [Andrew Or] Log more detail when unrolling a block fails

(cherry picked from commit 553737c)
Signed-off-by: Andrew Or <[email protected]>
  • Loading branch information
andrewor14 committed Oct 7, 2014
1 parent 3a7875d commit 267c7be
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 8 deletions.
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
logWarning(s"Not enough space to cache partition $key in memory! " +
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
Expand Down
45 changes: 39 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
PutResult(res.size, res.data, droppedBlocks)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
logWarning(s"Not enough space to store block $blockId in memory! " +
s"Free memory is $freeMemory bytes.")
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
Expand Down Expand Up @@ -265,6 +263,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
Left(vector.toArray)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Right(vector.iterator ++ values)
}

Expand Down Expand Up @@ -424,7 +423,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Reserve additional memory for unrolling blocks used by this thread.
* Return whether the request is granted.
*/
private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
accountingLock.synchronized {
val granted = freeMemory > currentUnrollMemory + memory
if (granted) {
Expand All @@ -439,7 +438,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Release memory used by this thread for unrolling blocks.
* If the amount is not specified, remove the current thread's allocation altogether.
*/
private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
val threadId = Thread.currentThread().getId
accountingLock.synchronized {
if (memory < 0) {
Expand All @@ -457,16 +456,50 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
/**
* Return the amount of memory currently occupied for unrolling blocks across all threads.
*/
private[spark] def currentUnrollMemory: Long = accountingLock.synchronized {
def currentUnrollMemory: Long = accountingLock.synchronized {
unrollMemoryMap.values.sum
}

/**
* Return the amount of memory currently occupied for unrolling blocks by this thread.
*/
private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
}

/**
* Return the number of threads currently unrolling blocks.
*/
def numThreadsUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size }

/**
* Log information about current memory usage.
*/
def logMemoryUsage(): Unit = {
val blocksMemory = currentMemory
val unrollMemory = currentUnrollMemory
val totalMemory = blocksMemory + unrollMemory
logInfo(
s"Memory use = ${Utils.bytesToString(blocksMemory)} (blocks) + " +
s"${Utils.bytesToString(unrollMemory)} (scratch space shared across " +
s"$numThreadsUnrolling thread(s)) = ${Utils.bytesToString(totalMemory)}. " +
s"Storage limit = ${Utils.bytesToString(maxMemory)}."
)
}

/**
* Log a warning for failing to unroll a block.
*
* @param blockId ID of the block we are trying to unroll.
* @param finalVectorSize Final size of the vector before unrolling failed.
*/
def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = {
logWarning(
s"Not enough space to cache $blockId in memory! " +
s"(computed ${Utils.bytesToString(finalVectorSize)} so far)"
)
logMemoryUsage()
}
}

private[spark] case class ResultWithDroppedBlocks(
Expand Down

0 comments on commit 267c7be

Please sign in to comment.