-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13566][CORE] Avoid deadlock between BlockManager and Executor Thread #11546
Changes from 2 commits
3f2ac8d
aaa6a96
449e6fc
a096afa
27fd070
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,12 +19,14 @@ package org.apache.spark.storage | |
|
||
import java.io._ | ||
import java.nio.{ByteBuffer, MappedByteBuffer} | ||
import java.util.concurrent.ConcurrentHashMap | ||
|
||
import scala.collection.mutable.{ArrayBuffer, HashMap} | ||
import scala.concurrent.duration._ | ||
import scala.concurrent.{Await, ExecutionContext, Future} | ||
import scala.util.Random | ||
import scala.util.control.NonFatal | ||
import scala.collection.JavaConverters._ | ||
|
||
import sun.nio.ch.DirectBuffer | ||
|
||
|
@@ -65,7 +67,7 @@ private[spark] class BlockManager( | |
val master: BlockManagerMaster, | ||
defaultSerializer: Serializer, | ||
val conf: SparkConf, | ||
memoryManager: MemoryManager, | ||
val memoryManager: MemoryManager, | ||
mapOutputTracker: MapOutputTracker, | ||
shuffleManager: ShuffleManager, | ||
blockTransferService: BlockTransferService, | ||
|
@@ -164,6 +166,11 @@ private[spark] class BlockManager( | |
* loaded yet. */ | ||
private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf) | ||
|
||
// Blocks are removing by another thread | ||
private val pendingToRemove = new ConcurrentHashMap[BlockId, Long]() | ||
|
||
private val NON_TASK_WRITER = -1024L | ||
|
||
/** | ||
* Initializes the BlockManager with the given appId. This is not performed in the constructor as | ||
* the appId may not be known at BlockManager instantiation time (in particular for the driver, | ||
|
@@ -1025,7 +1032,7 @@ private[spark] class BlockManager( | |
val info = blockInfo.get(blockId).orNull | ||
|
||
// If the block has not already been dropped | ||
if (info != null) { | ||
if (info != null && !pendingToRemove.containsKey(blockId)) { | ||
info.synchronized { | ||
// required ? As of now, this will be invoked only for blocks which are ready | ||
// But in case this changes in future, adding for consistency sake. | ||
|
@@ -1051,11 +1058,13 @@ private[spark] class BlockManager( | |
} | ||
blockIsUpdated = true | ||
} | ||
pendingToRemove.put(blockId, currentTaskAttemptId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This put should happen as soon as we check |
||
|
||
// Actually drop from memory store | ||
val droppedMemorySize = | ||
if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L | ||
val blockIsRemoved = memoryStore.remove(blockId) | ||
pendingToRemove.remove(blockId) | ||
if (blockIsRemoved) { | ||
blockIsUpdated = true | ||
} else { | ||
|
@@ -1080,6 +1089,7 @@ private[spark] class BlockManager( | |
|
||
/** | ||
* Remove all blocks belonging to the given RDD. | ||
* | ||
* @return The number of blocks removed. | ||
*/ | ||
def removeRdd(rddId: Int): Int = { | ||
|
@@ -1108,11 +1118,14 @@ private[spark] class BlockManager( | |
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { | ||
logDebug(s"Removing block $blockId") | ||
val info = blockInfo.get(blockId).orNull | ||
if (info != null) { | ||
if (info != null && !pendingToRemove.containsKey(blockId)) { | ||
pendingToRemove.put(blockId, currentTaskAttemptId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, on closer inspection I believe this is still not safe. You need to use something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. something like
|
||
info.synchronized { | ||
val level = info.level | ||
// Removals are idempotent in disk store and memory store. At worst, we get a warning. | ||
val removedFromMemory = memoryStore.remove(blockId) | ||
val removedFromDisk = diskStore.remove(blockId) | ||
val removedFromMemory = if (level.useMemory) memoryStore.remove(blockId) else false | ||
pendingToRemove.remove(blockId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should wrap this in a try finally in case an exception is thrown. Otherwise we may never release the "pending" lock:
Here and a few other places. |
||
val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) else false | ||
val removedFromExternalBlockStore = | ||
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false | ||
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) { | ||
|
@@ -1147,9 +1160,11 @@ private[spark] class BlockManager( | |
val entry = iterator.next() | ||
val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp) | ||
if (time < cleanupTime && shouldDrop(id)) { | ||
pendingToRemove.put(id, currentTaskAttemptId) | ||
info.synchronized { | ||
val level = info.level | ||
if (level.useMemory) { memoryStore.remove(id) } | ||
pendingToRemove.remove(id) | ||
if (level.useDisk) { diskStore.remove(id) } | ||
if (level.useOffHeap) { externalBlockStore.remove(id) } | ||
iterator.remove() | ||
|
@@ -1161,6 +1176,28 @@ private[spark] class BlockManager( | |
} | ||
} | ||
|
||
private def currentTaskAttemptId: Long = { | ||
Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(NON_TASK_WRITER) | ||
} | ||
|
||
/** | ||
* Release all lock held by the given task, clearing that task's pin bookkeeping | ||
* structures and updating the global pin counts. This method should be called at the | ||
* end of a task (either by a task completion handler or in `TaskRunner.run()`). | ||
* | ||
* @return the ids of blocks whose pins were released | ||
*/ | ||
def releaseAllLocksForTask(taskAttemptId: Long): ArrayBuffer[BlockId] = { | ||
var selectLocks = ArrayBuffer[BlockId]() | ||
pendingToRemove.entrySet().asScala.foreach { entry => | ||
if (entry.getValue == taskAttemptId) { | ||
pendingToRemove.remove(entry.getKey) | ||
selectLocks += entry.getKey | ||
} | ||
} | ||
selectLocks | ||
} | ||
|
||
private def shouldCompress(blockId: BlockId): Boolean = { | ||
blockId match { | ||
case _: ShuffleBlockId => compressShuffle | ||
|
@@ -1234,6 +1271,7 @@ private[spark] class BlockManager( | |
rpcEnv.stop(slaveEndpoint) | ||
blockInfo.clear() | ||
memoryStore.clear() | ||
pendingToRemove.clear() | ||
diskStore.clear() | ||
if (externalBlockStoreInitialized) { | ||
externalBlockStore.clear() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -424,6 +424,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE | |
} | ||
} | ||
|
||
test("deadlock between dropFromMemory and removeBlock") { | ||
store = makeBlockManager(2000) | ||
val a1 = new Array[Byte](400) | ||
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) | ||
val t1 = new Thread { | ||
override def run() = { | ||
store.memoryManager.synchronized { | ||
Thread.sleep(1000) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was able to reproduce the deadlock locally with this test almost every time, which is great. However, it would be good if you could rewrite this using explicit locks or semaphores or something so we don't have to introduce |
||
val status = store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer]) | ||
assert(status == None, "this thread can not get block a1") | ||
} | ||
} | ||
} | ||
|
||
val t2 = new Thread { | ||
override def run() = { | ||
store.removeBlock("a1", tellMaster = false) | ||
} | ||
} | ||
|
||
t1.start() | ||
t2.start() | ||
t1.join() | ||
t2.join() | ||
} | ||
|
||
test("correct BlockResult returned from get() calls") { | ||
store = makeBlockManager(12000) | ||
val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...which would tend to argue that this should be named unreleasedLocks, not releasedLocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@markhamstra These codes are from #10705 by JoshRosen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen In my production environment, when the storage memory is full, there is a great probability of deadlock. It is a temporary patch because JoshRosen add a read/write lock for block in #10705 for Spark 2.0.
Two theads are removing the same block which result in deadlock.
BlockManager will first lock MemoryManager and wait to lock BlockInfo in function dropFromMemory, Execturo task lock BlockInfo and wait to lock MemoryManager calling memstore.remove(block) in function removeBlock or function removeOldBlocks.
So just use a ConcurrentHashMap to record the locks by tasks. In case of failure, release all lock after task complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@markhamstra this is a little confusing but it's basically saying that all the locks are supposed to be released by now, so if we have to release any locks here (i.e. if
releasedLocks.nonEmpty
) then something it wrong.