Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13566][CORE] Avoid deadlock between BlockManager and Executor Thread #11546

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ private[spark] class Executor(
threwException = false
res
} finally {
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
Expand All @@ -227,6 +228,17 @@ private[spark] class Executor(
logError(errMsg)
}
}

if (releasedLocks.nonEmpty) {
val errMsg =
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...which would tend to argue that this should be named unreleasedLocks, not releasedLocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markhamstra These codes are from #10705 by JoshRosen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen In my production environment, when the storage memory is full, there is a great probability of deadlock. It is a temporary patch because JoshRosen add a read/write lock for block in #10705 for Spark 2.0.

Two theads are removing the same block which result in deadlock.
BlockManager will first lock MemoryManager and wait to lock BlockInfo in function dropFromMemory, Execturo task lock BlockInfo and wait to lock MemoryManager calling memstore.remove(block) in function removeBlock or function removeOldBlocks.

So just use a ConcurrentHashMap to record the locks by tasks. In case of failure, release all lock after task complete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markhamstra this is a little confusing but it's basically saying that all the locks are supposed to be released by now, so if we have to release any locks here (i.e. if releasedLocks.nonEmpty) then something it wrong.

releasedLocks.mkString("[", ", ", "]")
if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
throw new SparkException(errMsg)
} else {
logError(errMsg)
}
}
}
val taskFinish = System.currentTimeMillis()

Expand Down
192 changes: 120 additions & 72 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package org.apache.spark.storage

import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Random
import scala.util.control.NonFatal
import scala.collection.JavaConverters._

import sun.nio.ch.DirectBuffer

Expand Down Expand Up @@ -65,7 +67,7 @@ private[spark] class BlockManager(
val master: BlockManagerMaster,
defaultSerializer: Serializer,
val conf: SparkConf,
memoryManager: MemoryManager,
val memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
Expand Down Expand Up @@ -164,6 +166,11 @@ private[spark] class BlockManager(
* loaded yet. */
private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)

// Blocks are removing by another thread
val pendingToRemove = new ConcurrentHashMap[BlockId, Long]()

private val NON_TASK_WRITER = -1024L

/**
* Initializes the BlockManager with the given appId. This is not performed in the constructor as
* the appId may not be known at BlockManager instantiation time (in particular for the driver,
Expand Down Expand Up @@ -1025,54 +1032,58 @@ private[spark] class BlockManager(
val info = blockInfo.get(blockId).orNull

// If the block has not already been dropped
if (info != null) {
info.synchronized {
// required ? As of now, this will be invoked only for blocks which are ready
// But in case this changes in future, adding for consistency sake.
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
} else if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId was already dropped.")
return None
}
var blockIsUpdated = false
val level = info.level
if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) {
try {
info.synchronized {
// required ? As of now, this will be invoked only for blocks which are ready
// But in case this changes in future, adding for consistency sake.
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
} else if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId was already dropped.")
return None
}
var blockIsUpdated = false
val level = info.level

// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
blockIsUpdated = true
}
blockIsUpdated = true
}

// Actually drop from memory store
val droppedMemorySize =
if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val blockIsRemoved = memoryStore.remove(blockId)
if (blockIsRemoved) {
blockIsUpdated = true
} else {
logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
}
// Actually drop from memory store
val droppedMemorySize =
if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val blockIsRemoved = memoryStore.remove(blockId)
if (blockIsRemoved) {
blockIsUpdated = true
} else {
logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
}

val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
reportBlockStatus(blockId, info, status, droppedMemorySize)
}
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
blockInfo.remove(blockId)
}
if (blockIsUpdated) {
return Some(status)
val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
reportBlockStatus(blockId, info, status, droppedMemorySize)
}
if (!level.useDisk) {
// The block is completely gone from this node;forget it so we can put() it again later.
blockInfo.remove(blockId)
}
if (blockIsUpdated) {
return Some(status)
}
}
} finally {
pendingToRemove.remove(blockId)
}
}
None
Expand Down Expand Up @@ -1108,27 +1119,32 @@ private[spark] class BlockManager(
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
val removedFromExternalBlockStore =
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
blockInfo.remove(blockId)
val status = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, info, status)
}
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) {
try {
info.synchronized {
val level = info.level
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = if (level.useMemory) memoryStore.remove(blockId) else false
val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) else false
val removedFromExternalBlockStore =
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
blockInfo.remove(blockId)
val status = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, info, status)
}
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
}
}
} finally {
pendingToRemove.remove(blockId)
}
} else {
// The block has already been removed; do nothing.
Expand All @@ -1151,21 +1167,52 @@ private[spark] class BlockManager(
while (iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp)
if (time < cleanupTime && shouldDrop(id)) {
info.synchronized {
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
if (level.useOffHeap) { externalBlockStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
if (time < cleanupTime && shouldDrop(id) &&
pendingToRemove.putIfAbsent(id, currentTaskAttemptId) == 0L) {
try {
info.synchronized {
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
if (level.useOffHeap) { externalBlockStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
}
} finally {
pendingToRemove.remove(id)
}
val status = getCurrentBlockStatus(id, info)
reportBlockStatus(id, info, status)
}
}
}

private def currentTaskAttemptId: Long = {
Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(NON_TASK_WRITER)
}

def getBlockInfo(blockId: BlockId): BlockInfo = {
blockInfo.get(blockId).orNull
}

/**
* Release all lock held by the given task, clearing that task's pin bookkeeping
* structures and updating the global pin counts. This method should be called at the
* end of a task (either by a task completion handler or in `TaskRunner.run()`).
*
* @return the ids of blocks whose pins were released
*/
def releaseAllLocksForTask(taskAttemptId: Long): ArrayBuffer[BlockId] = {
var selectLocks = ArrayBuffer[BlockId]()
pendingToRemove.entrySet().asScala.foreach { entry =>
if (entry.getValue == taskAttemptId) {
pendingToRemove.remove(entry.getKey)
selectLocks += entry.getKey
}
}
selectLocks
}

private def shouldCompress(blockId: BlockId): Boolean = {
blockId match {
case _: ShuffleBlockId => compressShuffle
Expand Down Expand Up @@ -1239,6 +1286,7 @@ private[spark] class BlockManager(
rpcEnv.stop(slaveEndpoint)
blockInfo.clear()
memoryStore.clear()
pendingToRemove.clear()
diskStore.clear()
if (externalBlockStoreInitialized) {
externalBlockStore.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.storage

import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.Arrays
import java.util.concurrent.CountDownLatch

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
Expand Down Expand Up @@ -424,6 +425,43 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}

test("deadlock between dropFromMemory and removeBlock") {
store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
val lock1 = new CountDownLatch(1)
val lock2 = new CountDownLatch(1)

val t2 = new Thread {
override def run() = {
val info = store.getBlockInfo("a1")
info.synchronized {
store.pendingToRemove.put("a1", 1L)
lock1.countDown()
lock2.await()
store.pendingToRemove.remove("a1")
}
}
}

val t1 = new Thread {
override def run() = {
store.memoryManager.synchronized {
t2.start()
lock1.await()
val status = store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
assert(status == None, "this thread can not get block a1")
lock2.countDown()
}
}
}

t1.start()
t1.join()
t2.join()
store.removeBlock("a1", tellMaster = false)
}

test("correct BlockResult returned from get() calls") {
store = makeBlockManager(12000)
val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
Expand Down