Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13566][CORE] Avoid deadlock between BlockManager and Executor Thread #11546

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ private[spark] class Executor(
threwException = false
res
} finally {
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
Expand All @@ -227,6 +228,17 @@ private[spark] class Executor(
logError(errMsg)
}
}

if (releasedLocks.nonEmpty) {
val errMsg =
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...which would tend to argue that this should be named unreleasedLocks, not releasedLocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markhamstra These codes are from #10705 by JoshRosen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen In my production environment, when the storage memory is full, there is a great probability of deadlock. It is a temporary patch because JoshRosen add a read/write lock for block in #10705 for Spark 2.0.

Two theads are removing the same block which result in deadlock.
BlockManager will first lock MemoryManager and wait to lock BlockInfo in function dropFromMemory, Execturo task lock BlockInfo and wait to lock MemoryManager calling memstore.remove(block) in function removeBlock or function removeOldBlocks.

So just use a ConcurrentHashMap to record the locks by tasks. In case of failure, release all lock after task complete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markhamstra this is a little confusing but it's basically saying that all the locks are supposed to be released by now, so if we have to release any locks here (i.e. if releasedLocks.nonEmpty) then something it wrong.

releasedLocks.mkString("[", ", ", "]")
if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
throw new SparkException(errMsg)
} else {
logError(errMsg)
}
}
}
val taskFinish = System.currentTimeMillis()

Expand Down
48 changes: 43 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package org.apache.spark.storage

import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Random
import scala.util.control.NonFatal
import scala.collection.JavaConverters._

import sun.nio.ch.DirectBuffer

Expand Down Expand Up @@ -65,7 +67,7 @@ private[spark] class BlockManager(
val master: BlockManagerMaster,
defaultSerializer: Serializer,
val conf: SparkConf,
memoryManager: MemoryManager,
val memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
Expand Down Expand Up @@ -164,6 +166,11 @@ private[spark] class BlockManager(
* loaded yet. */
private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)

// Blocks are removing by another thread
private val pendingToRemove = new ConcurrentHashMap[BlockId, Long]()

private val NON_TASK_WRITER = -1024L

/**
* Initializes the BlockManager with the given appId. This is not performed in the constructor as
* the appId may not be known at BlockManager instantiation time (in particular for the driver,
Expand Down Expand Up @@ -1025,7 +1032,7 @@ private[spark] class BlockManager(
val info = blockInfo.get(blockId).orNull

// If the block has not already been dropped
if (info != null) {
if (info != null && !pendingToRemove.containsKey(blockId)) {
info.synchronized {
// required ? As of now, this will be invoked only for blocks which are ready
// But in case this changes in future, adding for consistency sake.
Expand All @@ -1051,11 +1058,13 @@ private[spark] class BlockManager(
}
blockIsUpdated = true
}
pendingToRemove.put(blockId, currentTaskAttemptId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This put should happen as soon as we check containsKey in L1035. Between now and then many things could happen; another thread may find that containsKey is false at the same time and both threads can still end up trying to drop the block.


// Actually drop from memory store
val droppedMemorySize =
if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val blockIsRemoved = memoryStore.remove(blockId)
pendingToRemove.remove(blockId)
if (blockIsRemoved) {
blockIsUpdated = true
} else {
Expand All @@ -1080,6 +1089,7 @@ private[spark] class BlockManager(

/**
* Remove all blocks belonging to the given RDD.
*
* @return The number of blocks removed.
*/
def removeRdd(rddId: Int): Int = {
Expand Down Expand Up @@ -1108,11 +1118,14 @@ private[spark] class BlockManager(
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
val info = blockInfo.get(blockId).orNull
if (info != null) {
if (info != null && !pendingToRemove.containsKey(blockId)) {
pendingToRemove.put(blockId, currentTaskAttemptId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, on closer inspection I believe this is still not safe. You need to use something like pendingToRemove.putIfAbsent to do this atomically.

Copy link
Contributor

@andrewor14 andrewor14 May 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like

if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) {

info.synchronized {
val level = info.level
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
val removedFromMemory = if (level.useMemory) memoryStore.remove(blockId) else false
pendingToRemove.remove(blockId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should wrap this in a try finally in case an exception is thrown. Otherwise we may never release the "pending" lock:

if (info != null && !pendingToRemove.containsKey(blockId)) {
  pendingToRemove.put(blockId, currentTaskAttemptId)
  try {
    info.synchronized {
      ... // remove the block from memoryStore
    }
  } finally {
    pendingToRemove.remove(blockId)
  }
}

Here and a few other places.

val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) else false
val removedFromExternalBlockStore =
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
Expand Down Expand Up @@ -1147,9 +1160,11 @@ private[spark] class BlockManager(
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp)
if (time < cleanupTime && shouldDrop(id)) {
pendingToRemove.put(id, currentTaskAttemptId)
info.synchronized {
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
pendingToRemove.remove(id)
if (level.useDisk) { diskStore.remove(id) }
if (level.useOffHeap) { externalBlockStore.remove(id) }
iterator.remove()
Expand All @@ -1161,6 +1176,28 @@ private[spark] class BlockManager(
}
}

private def currentTaskAttemptId: Long = {
Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(NON_TASK_WRITER)
}

/**
* Release all lock held by the given task, clearing that task's pin bookkeeping
* structures and updating the global pin counts. This method should be called at the
* end of a task (either by a task completion handler or in `TaskRunner.run()`).
*
* @return the ids of blocks whose pins were released
*/
def releaseAllLocksForTask(taskAttemptId: Long): ArrayBuffer[BlockId] = {
var selectLocks = ArrayBuffer[BlockId]()
pendingToRemove.entrySet().asScala.foreach { entry =>
if (entry.getValue == taskAttemptId) {
pendingToRemove.remove(entry.getKey)
selectLocks += entry.getKey
}
}
selectLocks
}

private def shouldCompress(blockId: BlockId): Boolean = {
blockId match {
case _: ShuffleBlockId => compressShuffle
Expand Down Expand Up @@ -1234,6 +1271,7 @@ private[spark] class BlockManager(
rpcEnv.stop(slaveEndpoint)
blockInfo.clear()
memoryStore.clear()
pendingToRemove.clear()
diskStore.clear()
if (externalBlockStoreInitialized) {
externalBlockStore.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}

test("deadlock between dropFromMemory and removeBlock") {
store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
val t1 = new Thread {
override def run() = {
store.memoryManager.synchronized {
Thread.sleep(1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to reproduce the deadlock locally with this test almost every time, which is great. However, it would be good if you could rewrite this using explicit locks or semaphores or something so we don't have to introduce Thread.sleep here, which slows the test down and is more brittle.

val status = store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
assert(status == None, "this thread can not get block a1")
}
}
}

val t2 = new Thread {
override def run() = {
store.removeBlock("a1", tellMaster = false)
}
}

t1.start()
t2.start()
t1.join()
t2.join()
}

test("correct BlockResult returned from get() calls") {
store = makeBlockManager(12000)
val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
Expand Down