Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3825] Log more detail when unrolling a block fails #2688

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
logWarning(s"Not enough space to cache partition $key in memory! " +
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
Expand Down
45 changes: 39 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
PutResult(res.size, res.data, droppedBlocks)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
logWarning(s"Not enough space to store block $blockId in memory! " +
s"Free memory is $freeMemory bytes.")
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
Expand Down Expand Up @@ -265,6 +263,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
Left(vector.toArray)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Right(vector.iterator ++ values)
}

Expand Down Expand Up @@ -424,7 +423,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Reserve additional memory for unrolling blocks used by this thread.
* Return whether the request is granted.
*/
private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
accountingLock.synchronized {
val granted = freeMemory > currentUnrollMemory + memory
if (granted) {
Expand All @@ -439,7 +438,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Release memory used by this thread for unrolling blocks.
* If the amount is not specified, remove the current thread's allocation altogether.
*/
private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
val threadId = Thread.currentThread().getId
accountingLock.synchronized {
if (memory < 0) {
Expand All @@ -457,16 +456,50 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
/**
* Return the amount of memory currently occupied for unrolling blocks across all threads.
*/
private[spark] def currentUnrollMemory: Long = accountingLock.synchronized {
def currentUnrollMemory: Long = accountingLock.synchronized {
unrollMemoryMap.values.sum
}

/**
* Return the amount of memory currently occupied for unrolling blocks by this thread.
*/
private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
}

/**
* Return the number of threads currently unrolling blocks.
*/
def numThreadsUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size }

/**
* Log information about current memory usage.
*/
def logMemoryUsage(): Unit = {
val blocksMemory = currentMemory
val unrollMemory = currentUnrollMemory
val totalMemory = blocksMemory + unrollMemory
logInfo(
s"Memory use = ${Utils.bytesToString(blocksMemory)} (blocks) + " +
s"${Utils.bytesToString(unrollMemory)} (scratch space shared across " +
s"$numThreadsUnrolling thread(s)) = ${Utils.bytesToString(totalMemory)}. " +
s"Storage limit = ${Utils.bytesToString(maxMemory)}."
)
}

/**
* Log a warning for failing to unroll a block.
*
* @param blockId ID of the block we are trying to unroll.
* @param finalVectorSize Final size of the vector before unrolling failed.
*/
def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = {
logWarning(
s"Not enough space to cache $blockId in memory! " +
s"(computed ${Utils.bytesToString(finalVectorSize)} so far)"
)
logMemoryUsage()
}
}

private[spark] case class ResultWithDroppedBlocks(
Expand Down