Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3941][CORE] _remainingmem should not increase twice when updateBlockInfo #2792

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,18 @@ private[spark] class BlockManagerInfo(

if (_blocks.containsKey(blockId)) {
// The block exists on the slave already.
val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
val blockStatus: BlockStatus = _blocks.get(blockId)
val originalLevel: StorageLevel = blockStatus.storageLevel
val originalMemSize: Long = blockStatus.memSize

if (originalLevel.useMemory) {
_remainingMem += memSize
_remainingMem += originalMemSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a change in semantics. Can you explain why the old code is incorrect (is it)? I'm just trying to understand what this block is trying to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"memSize" is the current memory that the block consume, "originalMemSize" is the memory used before block updating. These two size might be not the same. So here, if the block exists on the slave already, _remainingmem should first _remainingMem += originalMemSize and then _remainingMem -= memSize if the new storage level is "useMemory". Do I have misunderstanding of these two mem size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we want to release the originalMemSize and occupy the new memSize. That makes sense. It seems that the old code is just wrong in some cases then.

}
}

if (storageLevel.isValid) {
/* isValid means it is either stored in-memory, on-disk or on-Tachyon.
* But the memSize here indicates the data size in or dropped from memory,
* The memSize here indicates the data size in or dropped from memory,
* tachyonSize here indicates the data size in or dropped from Tachyon,
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
Expand All @@ -493,7 +495,6 @@ private[spark] class BlockManagerInfo(
val blockStatus: BlockStatus = _blocks.get(blockId)
_blocks.remove(blockId)
if (blockStatus.storageLevel.useMemory) {
_remainingMem += blockStatus.memSize
logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
Utils.bytesToString(_remainingMem)))
Expand Down