Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12757] Add block-level read/write locks to BlockManager #10705

Closed
wants to merge 93 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
5d130e4
Add block reference counting class.
JoshRosen Jan 8, 2016
423faab
Make the ReferenceCounter generic, since it's not specific to storage…
JoshRosen Jan 8, 2016
1ee665f
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 8, 2016
76cfebd
Integrate reference counter into storage eviction code.
JoshRosen Jan 8, 2016
7265784
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 11, 2016
2fb8c89
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 14, 2016
7cad770
Fix BlockManagerReplicationSuite tests.
JoshRosen Jan 14, 2016
8ae88b0
Add unit test for pinCount > 0 preventing eviction.
JoshRosen Jan 14, 2016
c1a8d85
Minimal changes to release refs on task completion.
JoshRosen Jan 14, 2016
575a47b
Fix Scalastyle.
JoshRosen Jan 15, 2016
0ba8318
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 15, 2016
feb1172
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 19, 2016
90cf403
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 20, 2016
7f28910
Fix CachedTableSuite tests.
JoshRosen Jan 21, 2016
12ed084
Fix TaskResultGetterSuite.
JoshRosen Jan 21, 2016
43e50ed
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 25, 2016
1b18226
Terminology update: reference -> pin.
JoshRosen Jan 25, 2016
8d45da6
More terminology updates.
JoshRosen Jan 25, 2016
8a52f58
Fix flaky BlockManagerSuite test:
JoshRosen Jan 25, 2016
36253df
Update very last occurrences of old terminology.
JoshRosen Jan 25, 2016
77d8c5c
More test flakiness fixes
JoshRosen Jan 25, 2016
e37f003
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 27, 2016
2cf8157
Detect leaked pins at end of tasks.
JoshRosen Jan 27, 2016
150c6e1
Add unpin calls in more places.
JoshRosen Jan 27, 2016
1adbdb9
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Feb 2, 2016
1828757
Disable leak detection in tests for now.
JoshRosen Feb 2, 2016
76fc9f5
More renaming.
JoshRosen Feb 2, 2016
2942b24
WIP.
JoshRosen Feb 3, 2016
62f6671
BlockInfoManager WIP.
JoshRosen Feb 11, 2016
47f3174
Implement lock downgrading
JoshRosen Feb 11, 2016
4591308
Implement remove()
JoshRosen Feb 11, 2016
77939c2
BlockInfoManagerSuite tests now pass.
JoshRosen Feb 11, 2016
a0c5bb3
Fix scalastyle.
JoshRosen Feb 11, 2016
d40e010
Add a bunch of comments.
JoshRosen Feb 11, 2016
3f29595
Add even more comments.
JoshRosen Feb 11, 2016
ef7d885
Update to reflect new semantics for get() of removed block.
JoshRosen Feb 11, 2016
e8d6ec8
Fixes to torrent broadcast block removal.
JoshRosen Feb 12, 2016
9c8d530
Roll back logging change.
JoshRosen Feb 12, 2016
f3fc298
Remove more printlns.
JoshRosen Feb 12, 2016
dd6358c
Add todos.
JoshRosen Feb 12, 2016
ec8cc24
Add missing ManagedBuffer.release() call.
JoshRosen Feb 13, 2016
6134989
Add a test for OneForOneStreamManager.connectionTerminated
JoshRosen Feb 13, 2016
c9726c2
Add tests covering new release() call.
JoshRosen Feb 13, 2016
c629f26
Add defensive check to guard against exiting while loop when info.rem…
JoshRosen Feb 13, 2016
fc19cfd
Merge branch 'add-missing-release-calls-in-network-layer' into pin-pages
JoshRosen Feb 13, 2016
0aa2392
Fix serialization problems in getMatchingBlockIds().
JoshRosen Feb 13, 2016
b273422
Remove bad retain.
JoshRosen Feb 13, 2016
7639e03
Logging improvements that were helpful when debugging tests.
JoshRosen Feb 13, 2016
27e98a3
Fix SparkContext leak in KryoSerializerDistributedSuite
JoshRosen Feb 13, 2016
b72cd7b
Fix block replication bugs.
JoshRosen Feb 15, 2016
5e23177
Fix locking in indirect task result code path.
JoshRosen Feb 15, 2016
f0b6d71
Add a missing release() in ReceivedBlockHandler.
JoshRosen Feb 15, 2016
e549f2f
Add another missing release in WriteAheadLogBasedBlockHandler
JoshRosen Feb 15, 2016
6d09400
Fix SQL test compilation.
JoshRosen Feb 15, 2016
717c476
Add missing lock release in CacheManager.
JoshRosen Feb 15, 2016
e07b62d
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Feb 16, 2016
0c08731
Revert change in network/common/src/test/java/org/apache/spark/networ…
JoshRosen Feb 16, 2016
55b5b19
Check preconditions in remove().
JoshRosen Feb 17, 2016
3a12480
Free locks in dropFromMemory().
JoshRosen Feb 17, 2016
25b09d7
Guard against MemoryStore removing the block first.
JoshRosen Feb 17, 2016
bcb8318
Fix bug in release of locks after network fetch of block data.
JoshRosen Feb 18, 2016
4e11d00
Rename methods.
JoshRosen Feb 18, 2016
ed44f45
Remove outdated block comment.
JoshRosen Feb 18, 2016
7c74591
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Feb 23, 2016
a401adc
Use try-finally in afterEach().
JoshRosen Feb 23, 2016
504986f
Try to clean up confusing release logic related to dropFromMemory().
JoshRosen Feb 23, 2016
66202f2
Push CompletionIterator logic into BlockResult.
JoshRosen Feb 23, 2016
4f620a4
Add scaladoc to BlockManagerManagedBuffer.
JoshRosen Feb 23, 2016
8547841
Document non-blocking tryLock in MemoryStore.
JoshRosen Feb 23, 2016
99c460c
Fix comment typo in BlockDataManager.
JoshRosen Feb 23, 2016
c94984e
Check invariants whenever BlockInfo is mutated.
JoshRosen Feb 23, 2016
ac2b73f
Extract magic writerTask values into constants.
JoshRosen Feb 23, 2016
39b1185
Numerous documentation updates in BlockInfoManager.
JoshRosen Feb 23, 2016
6502047
Add defensive notifyAll() to BlockInfoManager.clear().
JoshRosen Feb 23, 2016
1d903ff
Remove now-redundant info.removed checks in loop body.
JoshRosen Feb 23, 2016
24dbc3d
Clean up BlockInfoManager.entries() typos.
JoshRosen Feb 23, 2016
745c1f9
unlockAllLocksForTask => releaseAllLocksForTask
JoshRosen Feb 23, 2016
5cfbbdb
Address style nit in BlockManager.getMatchingBlockIds().
JoshRosen Feb 23, 2016
9427576
Address confusing "local lock" comment.
JoshRosen Feb 23, 2016
3d377b5
Remove unnecessary notifyAll() in downgradeLock().
JoshRosen Feb 23, 2016
07e0e37
Deduplicate code in TorrentBroadcast and check put() return values.
JoshRosen Feb 23, 2016
697eba2
Torrent broadcast pieces need to be stored in serialized form.
JoshRosen Feb 23, 2016
f5f089d
Don't acquire lock in dropFromMemory().
JoshRosen Feb 23, 2016
0b7281b
Simplify confusing getOrElseUpdate in lockNewBlockForWriting().
JoshRosen Feb 23, 2016
68b9e83
Roll back checking of blockWasSuccessfullyStored in replication code.
JoshRosen Feb 23, 2016
a5ef11b
Explain seemingly-unreachable error handling code.
JoshRosen Feb 23, 2016
b9d6e18
Require tasks to explicitly register themselves with the BlockManager.
JoshRosen Feb 24, 2016
5df7284
DeMorgan.
JoshRosen Feb 24, 2016
eab288c
Synchronize BlockInfoManager.registerTask()
JoshRosen Feb 24, 2016
06ebef5
Minor comment fixes.
JoshRosen Feb 24, 2016
0628a33
Check lockForReading outcome in downgradeLock()
JoshRosen Feb 24, 2016
b963178
More logTrace detail in lockNewBlockForWriting
JoshRosen Feb 24, 2016
9becde3
Move registration of task with BlockManager into Task.run()
JoshRosen Feb 24, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.rpc.RpcEnv
import org.apache.spark.serializer.{Serializer, SerializerInstance}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.util._
import org.apache.spark.util.collection.ReferenceCounter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment in this class that explains the ref counting mechanism? It can be a shorter version of the commit message.
Specifically:
What are the invariants? (explain get()) Need to call release. What does it mean if it is 0?

I slightly prefer pin count over ref count (the block manager has a reference but it is unpinned)


private[spark] sealed trait BlockValues
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
Expand Down Expand Up @@ -161,6 +162,8 @@ private[spark] class BlockManager(
* loaded yet. */
private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)

private val referenceCounts = new ReferenceCounter[BlockId]

/**
* Initializes the BlockManager with the given appId. This is not performed in the constructor as
* the appId may not be known at BlockManager instantiation time (in particular for the driver,
Expand Down Expand Up @@ -414,7 +417,11 @@ private[spark] class BlockManager(
*/
def getLocal(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
val res = doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A shortened version of this should be a class comment. Describe what pinning means and what are the API semantics.

"we should add a pin-counting mechanism to track which blocks/pages are being read in order to prevent them from being evicted prematurely. I propose to do this in two phases: first, add a safe, conservative approach in which all BlockManager.get*() calls implicitly increment the pin count of blocks and where tasks' pins are automatically freed upon task completion (this PR)"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should also document the thread safety guarantees wrt to pinning.

if (res.isDefined) {
referenceCounts.retain(blockId)
}
res
}

/**
Expand All @@ -424,7 +431,7 @@ private[spark] class BlockManager(
logDebug(s"Getting local block $blockId as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
val res = if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
Expand All @@ -433,6 +440,10 @@ private[spark] class BlockManager(
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
if (res.isDefined) {
referenceCounts.retain(blockId)
}
res
}

private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
Expand Down Expand Up @@ -564,15 +575,23 @@ private[spark] class BlockManager(
*/
def getRemote(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting remote block $blockId")
doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
val res = doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
if (res.isDefined) {
referenceCounts.retain(blockId)
}
res
}

/**
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
logDebug(s"Getting remote block $blockId as bytes")
doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
val res = doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
if (res.isDefined) {
referenceCounts.retain(blockId)
}
res
}

/**
Expand Down Expand Up @@ -642,6 +661,17 @@ private[spark] class BlockManager(
None
}

/**
* Release one reference to the given block.
*/
def release(blockId: BlockId): Unit = {
referenceCounts.release(blockId)
}

private[storage] def getReferenceCount(blockId: BlockId): Int = {
referenceCounts.getReferenceCount(blockId)
}

def putIterator(
blockId: BlockId,
values: Iterator[Any],
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}

override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
val referenceCount = blockManager.getReferenceCount(blockId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the semantics here? It seems reasonable for another thread to get this block. Who calls remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemoryStore.remove() is called in a few places:

  • When removing a block in BlockManager.removeBlock(). This is called by ContextCleaner cleanup code (e.g. when removing blocks from RDDs which are have fallen out of scope on the driver) or when a user explicitly unpersists an RDD or deletes a broadcast variable.
  • When dropping a block memory in BlockManager.dropFromMemory(), which is (confusingly) called by the MemoryStore when dropping blocks to free up space.

In the second case, we'll never hit the error message because the MemoryStore won't try to evict blocks with non-zero pin/reference counts. We do have to worry about the first case: if we try to force-remove a block while a task is still reading it then the removal should fail with an error.

if (referenceCount != 0) {
throw new IllegalStateException(
s"Cannot free block $blockId since it is still referenced $referenceCount times")
}
val entry = entries.synchronized { entries.remove(blockId) }
if (entry != null) {
memoryManager.releaseStorageMemory(entry.size)
Expand Down Expand Up @@ -425,6 +430,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
def blockIsEvictable(blockId: BlockId): Boolean = {
blockManager.getReferenceCount(blockId) == 0 &&
(rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
}
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
Expand All @@ -433,7 +442,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
if (blockIsEvictable(blockId)) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection

import scala.collection.JavaConverters._

import com.google.common.cache.{CacheBuilder, CacheLoader}
import com.google.common.collect.ConcurrentHashMultiset

import org.apache.spark.TaskContext

/**
* Thread-safe collection for maintaining both global and per-task reference counts for objects.
*/
private[spark] class ReferenceCounter[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason you did it this way instead of a counter per object? Not sure how many blocks we have but this seems contention prone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to maintain global counts per each object as well as counts for each task (in order to automatically decrement the global counts when tasks finish) (I'm working on adding the releaseAllReferencesForTask() call to the task completion cleanup code).

If I stored the global count per block inside of the BlockInfo class, then I'd still need a mechanism to count the references per task. If the counts for each task were stored in BlockInfo then I'd have to loop over the BlockInfo list on task completion in order to clear those counts, or would have to maintain the counts separately. As a result, it made sense to me to keep both types of counts in close proximity like this.


private type TaskAttemptId = Long

/**
* Total references across all tasks.
*/
private[this] val allReferences = ConcurrentHashMultiset.create[T]()

/**
* Total references per task. Used to auto-release references upon task completion.
*/
private[this] val referencesByTask = {
// We need to explicitly box as java.lang.Long to avoid a type mismatch error:
val loader = new CacheLoader[java.lang.Long, ConcurrentHashMultiset[T]] {
override def load(t: java.lang.Long) = ConcurrentHashMultiset.create[T]()
}
CacheBuilder.newBuilder().build(loader)
}

/**
* Returns the total reference count, across all tasks, for the given object.
*/
def getReferenceCount(obj: T): Int = allReferences.count(obj)

/**
* Increments the given object's reference count for the current task.
*/
def retain(obj: T): Unit = retainForTask(currentTaskAttemptId, obj)

/**
* Decrements the given object's reference count for the current task.
*/
def release(obj: T): Unit = releaseForTask(currentTaskAttemptId, obj)

private def currentTaskAttemptId: TaskAttemptId = {
Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(-1L)
}

/**
* Increments the given object's reference count for the given task.
*/
def retainForTask(taskAttemptId: TaskAttemptId, obj: T): Unit = {
referencesByTask.get(taskAttemptId).add(obj)
allReferences.add(obj)
}

/**
* Decrements the given object's reference count for the given task.
*/
def releaseForTask(taskAttemptId: TaskAttemptId, obj: T): Unit = {
val countsForTask = referencesByTask.get(taskAttemptId)
val newReferenceCountForTask: Int = countsForTask.remove(obj, 1) - 1
val newTotalReferenceCount: Int = allReferences.remove(obj, 1) - 1
if (newReferenceCountForTask < 0) {
throw new IllegalStateException(
s"Task $taskAttemptId released object $obj more times than it was retained")
}
if (newTotalReferenceCount < 0) {
throw new IllegalStateException(
s"Task $taskAttemptId released object $obj more times than it was retained")
}
}

/**
* Release all references held by the given task, clearing that task's reference bookkeeping
* structures and updating the global reference counts. This method should be called at the
* end of a task (either by a task completion handler or in `TaskRunner.run()`).
*/
def releaseAllReferencesForTask(taskAttemptId: TaskAttemptId): Unit = {
val referenceCounts = referencesByTask.get(taskAttemptId)
referencesByTask.invalidate(taskAttemptId)
referenceCounts.entrySet().iterator().asScala.foreach { entry =>
val obj = entry.getElement
val taskRefCount = entry.getCount
val newRefCount = allReferences.remove(obj, taskRefCount) - taskRefCount
if (newRefCount < 0) {
throw new IllegalStateException(
s"Task $taskAttemptId released object $obj more times than it was retained")
}
}
}

/**
* Return the number of map entries in this reference counter's internal data structures.
* This is used in unit tests in order to detect memory leaks.
*/
private[collection] def getNumberOfMapEntries: Long = {
allReferences.size() +
referencesByTask.size() +
referencesByTask.asMap().asScala.map(_._2.size()).sum
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
}.foreach { testStore =>
val testStoreName = testStore.blockManagerId.executorId
assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
testStore.release(blockId)
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
s"master does not have status for ${blockId.name} in $testStoreName")

Expand Down
Loading