-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12757] Add block-level read/write locks to BlockManager #10705
Conversation
Test build #49167 has finished for PR 10705 at commit
|
Test build #49411 has finished for PR 10705 at commit
|
@@ -43,6 +43,7 @@ import org.apache.spark.rpc.RpcEnv | |||
import org.apache.spark.serializer.{Serializer, SerializerInstance} | |||
import org.apache.spark.shuffle.ShuffleManager | |||
import org.apache.spark.util._ | |||
import org.apache.spark.util.collection.ReferenceCounter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment in this class that explains the ref counting mechanism? It can be a shorter version of the commit message.
Specifically:
What are the invariants? (explain get()) Need to call release. What does it mean if it is 0?
I slightly prefer pin count over ref count (the block manager has a reference but it is unpinned)
/** | ||
* Thread-safe collection for maintaining both global and per-task reference counts for objects. | ||
*/ | ||
private[spark] class ReferenceCounter[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason you did it this way instead of a counter per object? Not sure how many blocks we have but this seems contention prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to maintain global counts per each object as well as counts for each task (in order to automatically decrement the global counts when tasks finish) (I'm working on adding the releaseAllReferencesForTask()
call to the task completion cleanup code).
If I stored the global count per block inside of the BlockInfo
class, then I'd still need a mechanism to count the references per task. If the counts for each task were stored in BlockInfo
then I'd have to loop over the BlockInfo
list on task completion in order to clear those counts, or would have to maintain the counts separately. As a result, it made sense to me to keep both types of counts in close proximity like this.
Test build #49422 has finished for PR 10705 at commit
|
Test build #49419 has finished for PR 10705 at commit
|
Test build #49427 has finished for PR 10705 at commit
|
Test build #49477 has finished for PR 10705 at commit
|
Test build #49773 has finished for PR 10705 at commit
|
Jenkins, retest this please. |
Test build #49861 has finished for PR 10705 at commit
|
LGTM Good work! |
Test build #51821 has finished for PR 10705 at commit
|
Test build #51841 has finished for PR 10705 at commit
|
Jenkins, retest this please. |
Jenkins retest this please |
Test build #51883 has finished for PR 10705 at commit
|
Jenkins, retest this please. |
1 similar comment
Jenkins, retest this please. |
Test build #51910 has finished for PR 10705 at commit
|
Jenkins retest this please |
Jenkins, retest this please. |
Test build #51985 has finished for PR 10705 at commit
|
memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size)) | ||
memoryStore.getValues(blockId).map { iter => | ||
val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) | ||
new BlockResult(ci, DataReadMethod.Memory, info.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now there's still a chance that the programmer forgets to wrap the iter. I would actually push the CompletionIterator
logic one step further into BlockResult
itself, e.g.
private[spark] class BlockResult(
iter: Iterator[Any],
blockId: BlockId,
val readMethod: DataReadMethod.Value,
val bytes: Long) {
/**
* Values of this block, to be consumed at most once.
*
* If this block was read locally, then we must have acquired a read lock on this block.
* If so, release the lock once this iterator is drained. In cases where we don't consume
* the entire iterator (e.g. take or limit), we rely on the executor releasing all locks
* held by this task attempt at the end of the task.
*
* Otherwise, if this block was read remotely from other executors, there is no need to
* do this because we didn't acquire any locks on the block.
*/
val data: Iterator[Any] = {
if (readMethod != DataReadMethod.Network) {
CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
} else {
iter
}
}
}
I did a search and could not find another place where we would not want to release the lock other than getRemoteBlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you push it further then BlockResult needs to hold a reference to the BlockManager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do this in a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we pass in an optional completion callback instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to handle the DataReadMethod == Network
case somewhere since there's no lock to release in that case, so having an optional callback in the constructor seems like it faces the same problem of someone forgetting to add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference is that now the programmer needs to explicitly completionCallback = None
. If the completionCallback
is specified then you don't need to do the network check. It's better in that today you have zero reminder that you need to release the lock by the end of the task.
Actually an even better way IMO is to have a LocalBlockResult
and a RemoteBlockResult
so there's no way the programmer can forget to release the lock.
By the way, I'm not quite done reviewing yet but feel free to address these in a follow-up patch.
Test build #51987 has finished for PR 10705 at commit
|
// A block is either locked for reading or for writing, but not for both at the same time: | ||
assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER) | ||
// If a block is removed then it is not locked: | ||
assert(!_removed || (_readerCount == 0 && _writerTask == BlockInfo.NO_WRITER)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: clearer
if (_removed) {
assert(_readerCount == 0 ...)
}
LGTM. There are still a few remaining issues about maintainability but they can be addressed in a follow-up patch. |
Merged into master. |
synchronized { | ||
get(blockId).foreach { info => | ||
info.readerCount -= lockCount | ||
assert(info.readerCount >= 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should an exception be thrown here instead ?
In production, assertion may not be enabled.
CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication. Thanks to the addition of block-level read/write locks in #10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method. This pull request replaces / subsumes #10748. /cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods. Author: Josh Rosen <[email protected]> Closes #11436 from JoshRosen/remove-cachemanager.
CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication. Thanks to the addition of block-level read/write locks in apache#10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method. This pull request replaces / subsumes apache#10748. /cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods. Author: Josh Rosen <[email protected]> Closes apache#11436 from JoshRosen/remove-cachemanager.
Motivation
As a pre-requisite to off-heap caching of blocks, we need a mechanism to prevent pages / blocks from being evicted while they are being read. With on-heap objects, evicting a block while it is being read merely leads to memory-accounting problems (because we assume that an evicted block is a candidate for garbage-collection, which will not be true during a read), but with off-heap memory this will lead to either data corruption or segmentation faults.
Changes
BlockInfoManager and reader/writer locks
This patch adds block-level read/write locks to the BlockManager. It introduces a new
BlockInfoManager
component, which is contained within theBlockManager
, holds theBlockInfo
objects that theBlockManager
uses for tracking block metadata, and exposes APIs for locking blocks in either shared read or exclusive write modes.BlockManager
'sget*()
andput*()
methods now implicitly acquire the necessary locks. After aget()
call successfully retrieves a block, that block is locked in a shared read mode. Aput()
call will block until it acquires an exclusive write lock. If the write succeeds, the write lock will be downgraded to a shared read lock before returning to the caller. Thisput()
locking behavior allows us store a block and then immediately turn around and read it without having to worry about it having been evicted between the write and the read, which will allow us to significantly simplifyCacheManager
in the future (see #10748).See
BlockInfoManagerSuite
's test cases for a more detailed specification of the locking semantics.Auto-release of locks at the end of tasks
Our locking APIs support explicit release of locks (by calling
unlock()
), but it's not always possible to guarantee that locks will be released prior to the end of the task. One reason for this is our iterator interface: since our iterators don't support an explicitclose()
operator to signal that no more records will be consumed, operations liketake()
orlimit()
don't have a good means to release locks on their input iterators' blocks. Another example is broadcast variables, whose block locks can only be released at the end of the task.To address this,
BlockInfoManager
uses a pair of maps to track the set of locks acquired by each task. Lock acquisitions automatically record the current task attempt id by obtaining it fromTaskContext
. When a task finishes, code inExecutor
callsBlockInfoManager.unlockAllLocksForTask(taskAttemptId)
to free locks.Locking and the MemoryStore
In order to prevent in-memory blocks from being evicted while they are being read, the
MemoryStore
'sevictBlocksToFreeSpace()
method acquires write locks on blocks which it is considering as candidates for eviction. These lock acquisitions are non-blocking, so a block which is being read will not be evicted. By holding write locks until the eviction is performed or skipped (in case evicting the blocks would not free enough memory), we avoid a race where a new reader starts to read a block after the block has been marked as an eviction candidate but before it has been removed.Locking and remote block transfer
This patch makes small changes to to block transfer and network layer code so that locks acquired by the BlockTransferService are released as soon as block transfer messages are consumed and released by Netty. This builds on top of #11193, a bug fix related to freeing of network layer ManagedBuffers.
FAQ
Why not use Java's built-in
ReadWriteLock
?Our locks operate on a per-task rather than per-thread level. Under certain circumstances a task may consist of multiple threads, so using
ReadWriteLock
would mean that we might callunlock()
from a thread which didn't hold the lock in question, an operation which has undefined semantics. If we could rely on Java 8 classes, we might be able to useStampedLock
to work around this issue.Why not detect "leaked" locks in tests?:
See above notes about
take()
andlimit
.