Skip to content

Commit

Permalink
[SPARK-13566][CORE] Avoid deadlock between BlockManager and Executor …
Browse files Browse the repository at this point in the history
…Thread

Temp patch for branch 1.6, avoid deadlock between BlockManager and Executor Thread.

Author: cenyuhai <[email protected]>

Closes apache#11546 from cenyuhai/SPARK-13566.
  • Loading branch information
cenyuhai authored and Andrew Or committed May 6, 2016
1 parent a3aa22a commit ab00652
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 72 deletions.
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ private[spark] class Executor(
threwException = false
res
} finally {
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
Expand All @@ -227,6 +228,17 @@ private[spark] class Executor(
logError(errMsg)
}
}

if (releasedLocks.nonEmpty) {
val errMsg =
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
releasedLocks.mkString("[", ", ", "]")
if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
throw new SparkException(errMsg)
} else {
logError(errMsg)
}
}
}
val taskFinish = System.currentTimeMillis()

Expand Down
192 changes: 120 additions & 72 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package org.apache.spark.storage

import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Random
import scala.util.control.NonFatal
import scala.collection.JavaConverters._

import sun.nio.ch.DirectBuffer

Expand Down Expand Up @@ -65,7 +67,7 @@ private[spark] class BlockManager(
val master: BlockManagerMaster,
defaultSerializer: Serializer,
val conf: SparkConf,
memoryManager: MemoryManager,
val memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
Expand Down Expand Up @@ -164,6 +166,11 @@ private[spark] class BlockManager(
* loaded yet. */
private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)

// Blocks are removing by another thread
val pendingToRemove = new ConcurrentHashMap[BlockId, Long]()

private val NON_TASK_WRITER = -1024L

/**
* Initializes the BlockManager with the given appId. This is not performed in the constructor as
* the appId may not be known at BlockManager instantiation time (in particular for the driver,
Expand Down Expand Up @@ -1025,54 +1032,58 @@ private[spark] class BlockManager(
val info = blockInfo.get(blockId).orNull

// If the block has not already been dropped
if (info != null) {
info.synchronized {
// required ? As of now, this will be invoked only for blocks which are ready
// But in case this changes in future, adding for consistency sake.
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
} else if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId was already dropped.")
return None
}
var blockIsUpdated = false
val level = info.level
if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) {
try {
info.synchronized {
// required ? As of now, this will be invoked only for blocks which are ready
// But in case this changes in future, adding for consistency sake.
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
} else if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId was already dropped.")
return None
}
var blockIsUpdated = false
val level = info.level

// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
blockIsUpdated = true
}
blockIsUpdated = true
}

// Actually drop from memory store
val droppedMemorySize =
if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val blockIsRemoved = memoryStore.remove(blockId)
if (blockIsRemoved) {
blockIsUpdated = true
} else {
logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
}
// Actually drop from memory store
val droppedMemorySize =
if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val blockIsRemoved = memoryStore.remove(blockId)
if (blockIsRemoved) {
blockIsUpdated = true
} else {
logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
}

val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
reportBlockStatus(blockId, info, status, droppedMemorySize)
}
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
blockInfo.remove(blockId)
}
if (blockIsUpdated) {
return Some(status)
val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
reportBlockStatus(blockId, info, status, droppedMemorySize)
}
if (!level.useDisk) {
// The block is completely gone from this node;forget it so we can put() it again later.
blockInfo.remove(blockId)
}
if (blockIsUpdated) {
return Some(status)
}
}
} finally {
pendingToRemove.remove(blockId)
}
}
None
Expand Down Expand Up @@ -1108,27 +1119,32 @@ private[spark] class BlockManager(
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
val removedFromExternalBlockStore =
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
blockInfo.remove(blockId)
val status = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, info, status)
}
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) {
try {
info.synchronized {
val level = info.level
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = if (level.useMemory) memoryStore.remove(blockId) else false
val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) else false
val removedFromExternalBlockStore =
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
blockInfo.remove(blockId)
val status = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, info, status)
}
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
}
}
} finally {
pendingToRemove.remove(blockId)
}
} else {
// The block has already been removed; do nothing.
Expand All @@ -1151,21 +1167,52 @@ private[spark] class BlockManager(
while (iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp)
if (time < cleanupTime && shouldDrop(id)) {
info.synchronized {
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
if (level.useOffHeap) { externalBlockStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
if (time < cleanupTime && shouldDrop(id) &&
pendingToRemove.putIfAbsent(id, currentTaskAttemptId) == 0L) {
try {
info.synchronized {
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
if (level.useOffHeap) { externalBlockStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
}
} finally {
pendingToRemove.remove(id)
}
val status = getCurrentBlockStatus(id, info)
reportBlockStatus(id, info, status)
}
}
}

private def currentTaskAttemptId: Long = {
Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(NON_TASK_WRITER)
}

def getBlockInfo(blockId: BlockId): BlockInfo = {
blockInfo.get(blockId).orNull
}

/**
* Release all lock held by the given task, clearing that task's pin bookkeeping
* structures and updating the global pin counts. This method should be called at the
* end of a task (either by a task completion handler or in `TaskRunner.run()`).
*
* @return the ids of blocks whose pins were released
*/
def releaseAllLocksForTask(taskAttemptId: Long): ArrayBuffer[BlockId] = {
var selectLocks = ArrayBuffer[BlockId]()
pendingToRemove.entrySet().asScala.foreach { entry =>
if (entry.getValue == taskAttemptId) {
pendingToRemove.remove(entry.getKey)
selectLocks += entry.getKey
}
}
selectLocks
}

private def shouldCompress(blockId: BlockId): Boolean = {
blockId match {
case _: ShuffleBlockId => compressShuffle
Expand Down Expand Up @@ -1239,6 +1286,7 @@ private[spark] class BlockManager(
rpcEnv.stop(slaveEndpoint)
blockInfo.clear()
memoryStore.clear()
pendingToRemove.clear()
diskStore.clear()
if (externalBlockStoreInitialized) {
externalBlockStore.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.storage

import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.Arrays
import java.util.concurrent.CountDownLatch

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
Expand Down Expand Up @@ -424,6 +425,43 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}

test("deadlock between dropFromMemory and removeBlock") {
store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
val lock1 = new CountDownLatch(1)
val lock2 = new CountDownLatch(1)

val t2 = new Thread {
override def run() = {
val info = store.getBlockInfo("a1")
info.synchronized {
store.pendingToRemove.put("a1", 1L)
lock1.countDown()
lock2.await()
store.pendingToRemove.remove("a1")
}
}
}

val t1 = new Thread {
override def run() = {
store.memoryManager.synchronized {
t2.start()
lock1.await()
val status = store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
assert(status == None, "this thread can not get block a1")
lock2.countDown()
}
}
}

t1.start()
t1.join()
t2.join()
store.removeBlock("a1", tellMaster = false)
}

test("correct BlockResult returned from get() calls") {
store = makeBlockManager(12000)
val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
Expand Down

0 comments on commit ab00652

Please sign in to comment.