Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1777] Prevent OOMs from single partitions #1165

Closed
wants to merge 74 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
c12f093
Add SizeTrackingAppendOnlyBuffer and tests
andrewor14 Jun 20, 2014
97ea499
Change BlockManager interface to use Arrays
andrewor14 Jun 20, 2014
bbd3eea
Fix CacheManagerSuite to use Array
andrewor14 Jun 20, 2014
776aec9
Prevent OOM if a single RDD partition is too large
andrewor14 Jun 20, 2014
f94f5af
Update a few comments (minor)
andrewor14 Jun 21, 2014
6d05a81
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jun 21, 2014
df47265
Fix binary incompatibility
andrewor14 Jun 21, 2014
7de5ef9
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jun 21, 2014
50cae44
Remove now duplicate mima exclude
andrewor14 Jun 21, 2014
11eb921
Clarify comment (minor)
andrewor14 Jun 21, 2014
87aa75c
Fix mima excludes again (typo)
andrewor14 Jun 21, 2014
e1b8b25
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jun 27, 2014
a8704c1
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jun 27, 2014
b8e1d9c
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 4, 2014
ea02eec
Fix tests
andrewor14 Jul 4, 2014
d5dd3b4
Free buffer memory in finally
andrewor14 Jul 4, 2014
3ce413e
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 4, 2014
1e82d00
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 7, 2014
195abd7
Refactor: move unfold logic to MemoryStore
andrewor14 Jul 7, 2014
5961f50
Fix tests
andrewor14 Jul 7, 2014
d68f31e
Safely unfold blocks for all memory puts
andrewor14 Jul 8, 2014
0871535
Fix tests in BlockManagerSuite
andrewor14 Jul 8, 2014
078eb83
Add check for hasNext in PrimitiveVector.iterator
andrewor14 Jul 8, 2014
22b2209
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 8, 2014
ec728d8
Add tests for safe unfolding of blocks
andrewor14 Jul 9, 2014
d9d02a8
Remove unused param in unfoldSafely
andrewor14 Jul 9, 2014
0d50155
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 17, 2014
198e374
Unfold -> unroll
andrewor14 Jul 17, 2014
e9c3cb0
cacheMemoryMap -> unrollMemoryMap
andrewor14 Jul 18, 2014
a10b0e7
Add initial memory request threshold + rename a few things
andrewor14 Jul 18, 2014
649bdb3
Document spark.storage.bufferFraction
andrewor14 Jul 18, 2014
20eb3e5
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 18, 2014
9b9a273
Fix tests
andrewor14 Jul 18, 2014
2b7ee66
Fix bug in SizeTracking*
andrewor14 Jul 18, 2014
24185ea
Avoid dropping a block back to disk if reading from disk
andrewor14 Jul 18, 2014
ecc8c2d
Fix binary incompatibility
andrewor14 Jul 18, 2014
4f4834e
Use original storage level for blocks dropped to disk
andrewor14 Jul 18, 2014
8af2f35
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 18, 2014
64e7d4c
Add/modify a few comments (minor)
andrewor14 Jul 19, 2014
0871835
Add an effective storage level interface to BlockManager
andrewor14 Jul 19, 2014
f920531
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 19, 2014
3dd96aa
AppendOnlyBuffer -> Vector (+ a few small changes)
andrewor14 Jul 21, 2014
abeae4f
Add comment clarifying the MEMORY_AND_DISK case
andrewor14 Jul 21, 2014
57f8d85
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 21, 2014
9a65245
Update a few comments + minor control flow changes
andrewor14 Jul 22, 2014
129c441
Fix bug: Use toArray rather than array
andrewor14 Jul 22, 2014
5ab2329
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 22, 2014
728323b
Do not synchronize every 1000 elements
andrewor14 Jul 24, 2014
28edfa3
Update a few comments / log messages
andrewor14 Jul 24, 2014
4f18a3d
bufferFraction -> unrollFraction
andrewor14 Jul 24, 2014
8288228
Synchronize put and unroll properly
andrewor14 Jul 24, 2014
dce55c8
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 24, 2014
3f5a083
Simplify signature of ensureFreeSpace
andrewor14 Jul 24, 2014
69bc0a5
Always synchronize on putLock before unrollMemoryMap
andrewor14 Jul 24, 2014
a49ba4d
Do not expose unroll memory check period
andrewor14 Jul 24, 2014
8448c9b
Fix tests
andrewor14 Jul 24, 2014
beb368f
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 24, 2014
f9ff82e
putValues -> putIterator + putArray
andrewor14 Jul 24, 2014
ed6cda4
Formatting fix (super minor)
andrewor14 Jul 24, 2014
b9a6eee
Simplify locking behavior on unrollMemoryMap
andrewor14 Jul 24, 2014
1a43c06
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 25, 2014
ff77aa1
Fix tests
andrewor14 Jul 25, 2014
e40c60d
Fix MIMA excludes
andrewor14 Jul 25, 2014
68730b3
Fix weird scalatest behavior
andrewor14 Jul 25, 2014
a66fbd2
Rename a few things + update comments
andrewor14 Jul 25, 2014
f4d035c
Allow one thread to unroll multiple blocks
andrewor14 Jul 25, 2014
369ad07
Correct ensureFreeSpace and requestMemory behavior
andrewor14 Jul 25, 2014
71672a7
Update unrollSafely tests
andrewor14 Jul 25, 2014
f12916d
Slightly clean up tests
andrewor14 Jul 25, 2014
b7e165c
Add new tests for unrolling blocks
andrewor14 Jul 26, 2014
6645a8a
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 26, 2014
269d07b
Very minor changes to tests
andrewor14 Jul 27, 2014
c7c8832
Simplify logic + update a few comments
andrewor14 Jul 27, 2014
e77f451
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
andrewor14 Jul 27, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 51 additions & 21 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark

import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._

Expand All @@ -30,7 +30,7 @@ import org.apache.spark.storage._
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

/** Keys of RDD partitions that are being computed/loaded. */
private val loading = new HashSet[RDDBlockId]()
private val loading = new mutable.HashSet[RDDBlockId]

/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
Expand Down Expand Up @@ -118,36 +118,66 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}

/**
* Cache the values of a partition, keeping track of any updates in the storage statuses
* of other blocks along the way.
* Cache the values of a partition, keeping track of any updates in the storage statuses of
* other blocks along the way.
*
* The effective storage level refers to the level that actually specifies BlockManager put
* behavior, not the level originally specified by the user. This is mainly for forcing a
* MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
* while preserving the the original semantics of the RDD as specified by the application.
*/
private def putInBlockManager[T](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have putIterator interface now, could we just let the blockManager to handle this logic? Calling unrollSafely here looks a little coupling, maybe it's better to mark unrollSafely as private and hide it outside memory store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is first introduced at 40566e1.
It was aimed to avoid unroll the iterator into an ArrayBuffer if block is DISK_ONLY. But with the appearance of putIterator interface in blockManager, this logic can be easily done in blockManager and we can clean the code here. Did I miss something here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this part is actually really tricky. If you just used putIterator here, the result would be incorrect and the reason behind that is quite subtle. Here in getOrCompute we need to return the actual iterator in addition to storing it in BlockManager, so if we just use putIterator with MEMORY_ONLY level, then other threads might drop our block before we get to read it back, in which case we will have nothing to return because our original iterator was already exhausted. We have no other choice but to attempt to unroll it here first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it. So it's a follow of the origin implementation:

val elements = new ArrayBuffer[Any]
elements ++= computedValues
...
return elements.iterator.asInstanceOf[Iterator[T]]

Then we can ensure the returned iterator will always have data for user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, except the original implementation OOM's easily.

key: BlockId,
values: Iterator[T],
storageLevel: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {

if (!storageLevel.useMemory) {
/* This RDD is not to be cached in memory, so we can just pass the computed values
* as an iterator directly to the BlockManager, rather than first fully unrolling
* it in memory. The latter option potentially uses much more memory and risks OOM
* exceptions that can be avoided. */
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
level: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be documented in the javadoc here - it's not at all obvious what it means.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 , since the effectiveStorageLevel only used for forcing a MEM_AND_DISK partition to disk in this patch, is it better not to expose this parameter here and in BlockManager.putBytes/putArray/putIterator. Because if someone explicitly give an effectiveStorageLevel by calling BlockManager.putBytes directly for example PR#3534, and the effectiveStoragelLevel might be different from the original level (cache levels and replication), this will lead to some wrong messages on webUI, because the BlockManager is not aware of this.

Take another case for example, if original Level is MEM_ONLY, and the user force it to cache on disk via effectiveStorageLevel, then the BlockManager will never know the block is on disk, and the block will be a zombie. Of course , such case would not likely happen.


val putLevel = effectiveStorageLevel.getOrElse(level)
if (!putLevel.useMemory) {
/*
* This RDD is not to be cached in memory, so we can just pass the computed values as an
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
*/
updatedBlocks ++=
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
/* This RDD is to be cached in memory. In this case we cannot pass the computed values
/*
* This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back, e.g.
* when the entirety of the RDD does not fit in memory. */
val elements = new ArrayBuffer[Any]
elements ++= values
updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
* we may end up dropping a partition from memory store before getting it back.
*
* In addition, we must be careful to not unroll the entire partition in memory at once.
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
* single partition. Instead, we unroll the values cautiously, potentially aborting and
* dropping the partition to disk if applicable.
*/
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
logWarning(s"Not enough space to cache partition $key in memory! " +
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
} else {
returnValues
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of exceptions/etc in this method, we end up with every increasing cacheMemoryMap.
Would be good idea to wrap entire method in a try/finally - with this setting to 0 inside the finally : assumes no re-enterancy in this method (which currently hold if I am not wrong : except for the beneign case of immediate call to itself with memory disabled that is).

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could move this logic to BlockManager later. Probably can be part of another PR, but it's pretty complicated to have this here and then similar logic in there when it's doing a get (not to mention in its put methods).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The existing BM interface does not return the values you just put (for good reasons), but once we add it, this entire method can be simplified to a wrapper, and there won't be duplicate logic between here and the get from disk case.

}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class SparkEnv (
val metricsSystem: MetricsSystem,
val conf: SparkConf) extends Logging {

// A mapping of thread ID to amount of memory used for shuffle in bytes
// A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,13 @@ private[spark] class Executor(
}
}
} finally {
// TODO: Unregister shuffle memory only for ResultTask
// Release memory used by this thread for shuffles
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
// Release memory used by this thread for unrolling blocks
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
runningTasks.remove(taskId)
}
}
Expand Down
110 changes: 66 additions & 44 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.util._
private[spark] sealed trait BlockValues
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was made because SizeTrackingAppendOnlyBuffer (used in MemoryStore) is backed by an Array, which is more space efficient. These values are never mutated anyway, so it seems somewhat arbitrary that we had been using ArrayBuffers the whole time.


/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
Expand Down Expand Up @@ -71,9 +71,9 @@ private[spark] class BlockManager(

// Actual storage of where blocks are kept
private var tachyonInitialized = false
private[storage] val memoryStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
private[storage] lazy val tachyonStore: TachyonStore = {
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
private[spark] lazy val tachyonStore: TachyonStore = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont need to relax visibility of diskStore and tachyonStore - I did not see them being used ...

val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
val appFolderName = conf.get("spark.tachyonStore.folderName")
val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
Expand Down Expand Up @@ -463,16 +463,17 @@ private[spark] class BlockManager(
val values = dataDeserialize(blockId, bytes)
if (level.deserialized) {
// Cache the values before returning them
// TODO: Consider creating a putValues that also takes in a iterator?
val valuesBuffer = new ArrayBuffer[Any]
valuesBuffer ++= values
memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
match {
case Left(values2) =>
return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
case _ =>
throw new SparkException("Memory store did not return back an iterator")
}
val putResult = memoryStore.putIterator(
blockId, values, level, returnValues = true, allowPersistToDisk = false)
// The put may or may not have succeeded, depending on whether there was enough
// space to unroll the block. Either way, the put here should return an iterator.
putResult.data match {
case Left(it) =>
return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
case _ =>
// This only happens if we dropped the values back to disk (which is never)
throw new SparkException("Memory store did not return an iterator!")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible that as we unroll the partition here, it will be too large? It's certainly less common than it being too large the first time we read it, but I can see it happening. I'm thinking of the case where someone stores a block as MEMORY_AND_DISK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah never mind, I see that this checks for it. Might be worthwhile to add a comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I added it in my latest commit.

} else {
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
Expand Down Expand Up @@ -561,13 +562,14 @@ private[spark] class BlockManager(
iter
}

def put(
def putIterator(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is renamed to avoid method signature conflicts with the new putArray, since this PR introduced a new optional parameter effectiveStorageLevel for both methods.

blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
doPut(blockId, IteratorValues(values), level, tellMaster)
doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
}

/**
Expand All @@ -589,13 +591,14 @@ private[spark] class BlockManager(
* Put a new block of values to the block manager.
* Return a list of blocks updated as a result of this put.
*/
def put(
def putArray(
blockId: BlockId,
values: ArrayBuffer[Any],
values: Array[Any],
level: StorageLevel,
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
doPut(blockId, ArrayBufferValues(values), level, tellMaster)
doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
}

/**
Expand All @@ -606,19 +609,33 @@ private[spark] class BlockManager(
blockId: BlockId,
bytes: ByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(bytes != null, "Bytes is null")
doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
}

/**
* Put the given block according to the given level in one of the block stores, replicating
* the values if necessary.
*
* The effective storage level refers to the level according to which the block will actually be
* handled. This allows the caller to specify an alternate behavior of doPut while preserving
* the original level specified by the user.
*/
private def doPut(
blockId: BlockId,
data: BlockValues,
level: StorageLevel,
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None)
: Seq[(BlockId, BlockStatus)] = {

require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
effectiveStorageLevel.foreach { level =>
require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
}

// Return value
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
Expand Down Expand Up @@ -657,13 +674,16 @@ private[spark] class BlockManager(
// Size of the block in bytes
var size = 0L

// The level we actually use to put the block
val putLevel = effectiveStorageLevel.getOrElse(level)

// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
case b: ByteBufferValues if level.replication > 1 =>
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
Future { replicate(blockId, bufferView, level) }
Future { replicate(blockId, bufferView, putLevel) }
case _ => null
}

Expand All @@ -676,18 +696,18 @@ private[spark] class BlockManager(
// returnValues - Whether to return the values put
// blockStore - The type of storage to put these values into
val (returnValues, blockStore: BlockStore) = {
if (level.useMemory) {
if (putLevel.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
} else if (level.useOffHeap) {
} else if (putLevel.useOffHeap) {
// Use tachyon for off-heap storage
(false, tachyonStore)
} else if (level.useDisk) {
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(level.replication > 1, diskStore)
(putLevel.replication > 1, diskStore)
} else {
assert(level == StorageLevel.NONE)
assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
Expand All @@ -696,22 +716,22 @@ private[spark] class BlockManager(
// Actually put the values
val result = data match {
case IteratorValues(iterator) =>
blockStore.putValues(blockId, iterator, level, returnValues)
case ArrayBufferValues(array) =>
blockStore.putValues(blockId, array, level, returnValues)
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, level)
blockStore.putBytes(blockId, bytes, putLevel)
}
size = result.size
result.data match {
case Left (newIterator) if level.useMemory => valuesAfterPut = newIterator
case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
case Right (newBytes) => bytesAfterPut = newBytes
case _ =>
}

// Keep track of which blocks are dropped from memory
if (level.useMemory) {
if (putLevel.useMemory) {
result.droppedBlocks.foreach { updatedBlocks += _ }
}

Expand Down Expand Up @@ -742,7 +762,7 @@ private[spark] class BlockManager(

// Either we're storing bytes and we asynchronously started replication, or we're storing
// values and need to serialize and replicate them now:
if (level.replication > 1) {
if (putLevel.replication > 1) {
data match {
case ByteBufferValues(bytes) =>
if (replicationFuture != null) {
Expand All @@ -758,15 +778,15 @@ private[spark] class BlockManager(
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, level)
replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}

BlockManager.dispose(bytesAfterPut)

if (level.replication > 1) {
if (putLevel.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
Expand Down Expand Up @@ -818,7 +838,7 @@ private[spark] class BlockManager(
value: Any,
level: StorageLevel,
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
put(blockId, Iterator(value), level, tellMaster)
putIterator(blockId, Iterator(value), level, tellMaster)
}

/**
Expand All @@ -829,7 +849,7 @@ private[spark] class BlockManager(
*/
def dropFromMemory(
blockId: BlockId,
data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {

logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
Expand All @@ -853,7 +873,7 @@ private[spark] class BlockManager(
logInfo(s"Writing block $blockId to disk")
data match {
case Left(elements) =>
diskStore.putValues(blockId, elements, level, returnValues = false)
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
Expand Down Expand Up @@ -1068,9 +1088,11 @@ private[spark] class BlockManager(
private[spark] object BlockManager extends Logging {
private val ID_GENERATOR = new IdGenerator

/** Return the total amount of storage memory available. */
private def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}

def getHeartBeatFrequency(conf: SparkConf): Long =
Expand Down
Loading