Skip to content

Commit

Permalink
Integrate reference counter into storage eviction code.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jan 8, 2016
1 parent 1ee665f commit 76cfebd
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 73 deletions.
38 changes: 34 additions & 4 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.rpc.RpcEnv
import org.apache.spark.serializer.{Serializer, SerializerInstance}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.util._
import org.apache.spark.util.collection.ReferenceCounter

private[spark] sealed trait BlockValues
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
Expand Down Expand Up @@ -161,6 +162,8 @@ private[spark] class BlockManager(
* loaded yet. */
private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)

private val referenceCounts = new ReferenceCounter[BlockId]

/**
* Initializes the BlockManager with the given appId. This is not performed in the constructor as
* the appId may not be known at BlockManager instantiation time (in particular for the driver,
Expand Down Expand Up @@ -414,7 +417,11 @@ private[spark] class BlockManager(
*/
def getLocal(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
val res = doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
if (res.isDefined) {
referenceCounts.retain(blockId)
}
res
}

/**
Expand All @@ -424,7 +431,7 @@ private[spark] class BlockManager(
logDebug(s"Getting local block $blockId as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
val res = if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
Expand All @@ -433,6 +440,10 @@ private[spark] class BlockManager(
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
if (res.isDefined) {
referenceCounts.retain(blockId)
}
res
}

private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
Expand Down Expand Up @@ -564,15 +575,23 @@ private[spark] class BlockManager(
*/
def getRemote(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting remote block $blockId")
doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
val res = doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
if (res.isDefined) {
referenceCounts.retain(blockId)
}
res
}

/**
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
logDebug(s"Getting remote block $blockId as bytes")
doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
val res = doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
if (res.isDefined) {
referenceCounts.retain(blockId)
}
res
}

/**
Expand Down Expand Up @@ -642,6 +661,17 @@ private[spark] class BlockManager(
None
}

/**
* Release one reference to the given block.
*/
def release(blockId: BlockId): Unit = {
referenceCounts.release(blockId)
}

private[storage] def getReferenceCount(blockId: BlockId): Int = {
referenceCounts.getReferenceCount(blockId)
}

def putIterator(
blockId: BlockId,
values: Iterator[Any],
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}

override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
val referenceCount = blockManager.getReferenceCount(blockId)
if (referenceCount != 0) {
throw new IllegalStateException(
s"Cannot free block $blockId since it is still referenced $referenceCount times")
}
val entry = entries.synchronized { entries.remove(blockId) }
if (entry != null) {
memoryManager.releaseStorageMemory(entry.size)
Expand Down Expand Up @@ -425,6 +430,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
def blockIsEvictable(blockId: BlockId): Boolean = {
blockManager.getReferenceCount(blockId) == 0 &&
(rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
}
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
Expand All @@ -433,7 +442,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
if (blockIsEvictable(blockId)) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ private[spark] class ReferenceCounter[T] {
/**
* Increments the given object's reference count for the current task.
*/
def retain(obj: T): Unit = {
retainForTask(TaskContext.get().taskAttemptId(), obj)
}
def retain(obj: T): Unit = retainForTask(currentTaskAttemptId, obj)

/**
* Decrements the given object's reference count for the current task.
*/
def release(obj: T): Unit = {
releaseForTask(TaskContext.get().taskAttemptId(), obj)
def release(obj: T): Unit = releaseForTask(currentTaskAttemptId, obj)

private def currentTaskAttemptId: TaskAttemptId = {
Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(-1L)
}

/**
Expand Down
Loading

0 comments on commit 76cfebd

Please sign in to comment.