-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1777] Prevent OOMs from single partitions #1165
Changes from all commits
c12f093
97ea499
bbd3eea
776aec9
f94f5af
6d05a81
df47265
7de5ef9
50cae44
11eb921
87aa75c
e1b8b25
a8704c1
b8e1d9c
ea02eec
d5dd3b4
3ce413e
1e82d00
195abd7
5961f50
d68f31e
0871535
078eb83
22b2209
ec728d8
d9d02a8
0d50155
198e374
e9c3cb0
a10b0e7
649bdb3
20eb3e5
9b9a273
2b7ee66
24185ea
ecc8c2d
4f4834e
8af2f35
64e7d4c
0871835
f920531
3dd96aa
abeae4f
57f8d85
9a65245
129c441
5ab2329
728323b
28edfa3
4f18a3d
8288228
dce55c8
3f5a083
69bc0a5
a49ba4d
8448c9b
beb368f
f9ff82e
ed6cda4
b9a6eee
1a43c06
ff77aa1
e40c60d
68730b3
a66fbd2
f4d035c
369ad07
71672a7
f12916d
b7e165c
6645a8a
269d07b
c7c8832
e77f451
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,9 +17,9 @@ | |
|
||
package org.apache.spark | ||
|
||
import scala.collection.mutable.{ArrayBuffer, HashSet} | ||
import scala.collection.mutable | ||
import scala.collection.mutable.ArrayBuffer | ||
|
||
import org.apache.spark.executor.InputMetrics | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.storage._ | ||
|
||
|
@@ -30,7 +30,7 @@ import org.apache.spark.storage._ | |
private[spark] class CacheManager(blockManager: BlockManager) extends Logging { | ||
|
||
/** Keys of RDD partitions that are being computed/loaded. */ | ||
private val loading = new HashSet[RDDBlockId]() | ||
private val loading = new mutable.HashSet[RDDBlockId] | ||
|
||
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */ | ||
def getOrCompute[T]( | ||
|
@@ -118,36 +118,66 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { | |
} | ||
|
||
/** | ||
* Cache the values of a partition, keeping track of any updates in the storage statuses | ||
* of other blocks along the way. | ||
* Cache the values of a partition, keeping track of any updates in the storage statuses of | ||
* other blocks along the way. | ||
* | ||
* The effective storage level refers to the level that actually specifies BlockManager put | ||
* behavior, not the level originally specified by the user. This is mainly for forcing a | ||
* MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition, | ||
* while preserving the the original semantics of the RDD as specified by the application. | ||
*/ | ||
private def putInBlockManager[T]( | ||
key: BlockId, | ||
values: Iterator[T], | ||
storageLevel: StorageLevel, | ||
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = { | ||
|
||
if (!storageLevel.useMemory) { | ||
/* This RDD is not to be cached in memory, so we can just pass the computed values | ||
* as an iterator directly to the BlockManager, rather than first fully unrolling | ||
* it in memory. The latter option potentially uses much more memory and risks OOM | ||
* exceptions that can be avoided. */ | ||
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true) | ||
level: StorageLevel, | ||
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)], | ||
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be documented in the javadoc here - it's not at all obvious what it means. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @andrewor14 , since the Take another case for example, if original Level is MEM_ONLY, and the user force it to cache on disk via effectiveStorageLevel, then the BlockManager will never know the block is on disk, and the block will be a zombie. Of course , such case would not likely happen. |
||
|
||
val putLevel = effectiveStorageLevel.getOrElse(level) | ||
if (!putLevel.useMemory) { | ||
/* | ||
* This RDD is not to be cached in memory, so we can just pass the computed values as an | ||
* iterator directly to the BlockManager rather than first fully unrolling it in memory. | ||
*/ | ||
updatedBlocks ++= | ||
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel) | ||
blockManager.get(key) match { | ||
case Some(v) => v.data.asInstanceOf[Iterator[T]] | ||
case None => | ||
logInfo(s"Failure to store $key") | ||
throw new BlockException(key, s"Block manager failed to return cached value for $key!") | ||
} | ||
} else { | ||
/* This RDD is to be cached in memory. In this case we cannot pass the computed values | ||
/* | ||
* This RDD is to be cached in memory. In this case we cannot pass the computed values | ||
* to the BlockManager as an iterator and expect to read it back later. This is because | ||
* we may end up dropping a partition from memory store before getting it back, e.g. | ||
* when the entirety of the RDD does not fit in memory. */ | ||
val elements = new ArrayBuffer[Any] | ||
elements ++= values | ||
updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) | ||
elements.iterator.asInstanceOf[Iterator[T]] | ||
* we may end up dropping a partition from memory store before getting it back. | ||
* | ||
* In addition, we must be careful to not unroll the entire partition in memory at once. | ||
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this | ||
* single partition. Instead, we unroll the values cautiously, potentially aborting and | ||
* dropping the partition to disk if applicable. | ||
*/ | ||
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match { | ||
case Left(arr) => | ||
// We have successfully unrolled the entire partition, so cache it in memory | ||
updatedBlocks ++= | ||
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel) | ||
arr.iterator.asInstanceOf[Iterator[T]] | ||
case Right(it) => | ||
// There is not enough space to cache this partition in memory | ||
logWarning(s"Not enough space to cache partition $key in memory! " + | ||
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.") | ||
val returnValues = it.asInstanceOf[Iterator[T]] | ||
if (putLevel.useDisk) { | ||
logWarning(s"Persisting partition $key to disk instead.") | ||
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false, | ||
useOffHeap = false, deserialized = false, putLevel.replication) | ||
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel)) | ||
} else { | ||
returnValues | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case of exceptions/etc in this method, we end up with every increasing cacheMemoryMap. |
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we could move this logic to BlockManager later. Probably can be part of another PR, but it's pretty complicated to have this here and then similar logic in there when it's doing a get (not to mention in its put methods). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. The existing BM interface does not return the values you just put (for good reasons), but once we add it, this entire method can be simplified to a wrapper, and there won't be duplicate logic between here and the |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,7 @@ import org.apache.spark.util._ | |
private[spark] sealed trait BlockValues | ||
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues | ||
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues | ||
private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues | ||
private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change was made because |
||
|
||
/* Class for returning a fetched block and associated metrics. */ | ||
private[spark] class BlockResult( | ||
|
@@ -71,9 +71,9 @@ private[spark] class BlockManager( | |
|
||
// Actual storage of where blocks are kept | ||
private var tachyonInitialized = false | ||
private[storage] val memoryStore = new MemoryStore(this, maxMemory) | ||
private[storage] val diskStore = new DiskStore(this, diskBlockManager) | ||
private[storage] lazy val tachyonStore: TachyonStore = { | ||
private[spark] val memoryStore = new MemoryStore(this, maxMemory) | ||
private[spark] val diskStore = new DiskStore(this, diskBlockManager) | ||
private[spark] lazy val tachyonStore: TachyonStore = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We dont need to relax visibility of diskStore and tachyonStore - I did not see them being used ... |
||
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon") | ||
val appFolderName = conf.get("spark.tachyonStore.folderName") | ||
val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}" | ||
|
@@ -463,16 +463,17 @@ private[spark] class BlockManager( | |
val values = dataDeserialize(blockId, bytes) | ||
if (level.deserialized) { | ||
// Cache the values before returning them | ||
// TODO: Consider creating a putValues that also takes in a iterator? | ||
val valuesBuffer = new ArrayBuffer[Any] | ||
valuesBuffer ++= values | ||
memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data | ||
match { | ||
case Left(values2) => | ||
return Some(new BlockResult(values2, DataReadMethod.Disk, info.size)) | ||
case _ => | ||
throw new SparkException("Memory store did not return back an iterator") | ||
} | ||
val putResult = memoryStore.putIterator( | ||
blockId, values, level, returnValues = true, allowPersistToDisk = false) | ||
// The put may or may not have succeeded, depending on whether there was enough | ||
// space to unroll the block. Either way, the put here should return an iterator. | ||
putResult.data match { | ||
case Left(it) => | ||
return Some(new BlockResult(it, DataReadMethod.Disk, info.size)) | ||
case _ => | ||
// This only happens if we dropped the values back to disk (which is never) | ||
throw new SparkException("Memory store did not return an iterator!") | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't it possible that as we unroll the partition here, it will be too large? It's certainly less common than it being too large the first time we read it, but I can see it happening. I'm thinking of the case where someone stores a block as MEMORY_AND_DISK. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah never mind, I see that this checks for it. Might be worthwhile to add a comment here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I added it in my latest commit. |
||
} else { | ||
return Some(new BlockResult(values, DataReadMethod.Disk, info.size)) | ||
} | ||
|
@@ -561,13 +562,14 @@ private[spark] class BlockManager( | |
iter | ||
} | ||
|
||
def put( | ||
def putIterator( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is renamed to avoid method signature conflicts with the new |
||
blockId: BlockId, | ||
values: Iterator[Any], | ||
level: StorageLevel, | ||
tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = { | ||
tellMaster: Boolean = true, | ||
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { | ||
require(values != null, "Values is null") | ||
doPut(blockId, IteratorValues(values), level, tellMaster) | ||
doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel) | ||
} | ||
|
||
/** | ||
|
@@ -589,13 +591,14 @@ private[spark] class BlockManager( | |
* Put a new block of values to the block manager. | ||
* Return a list of blocks updated as a result of this put. | ||
*/ | ||
def put( | ||
def putArray( | ||
blockId: BlockId, | ||
values: ArrayBuffer[Any], | ||
values: Array[Any], | ||
level: StorageLevel, | ||
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { | ||
tellMaster: Boolean = true, | ||
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { | ||
require(values != null, "Values is null") | ||
doPut(blockId, ArrayBufferValues(values), level, tellMaster) | ||
doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel) | ||
} | ||
|
||
/** | ||
|
@@ -606,19 +609,33 @@ private[spark] class BlockManager( | |
blockId: BlockId, | ||
bytes: ByteBuffer, | ||
level: StorageLevel, | ||
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { | ||
tellMaster: Boolean = true, | ||
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { | ||
require(bytes != null, "Bytes is null") | ||
doPut(blockId, ByteBufferValues(bytes), level, tellMaster) | ||
doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel) | ||
} | ||
|
||
/** | ||
* Put the given block according to the given level in one of the block stores, replicating | ||
* the values if necessary. | ||
* | ||
* The effective storage level refers to the level according to which the block will actually be | ||
* handled. This allows the caller to specify an alternate behavior of doPut while preserving | ||
* the original level specified by the user. | ||
*/ | ||
private def doPut( | ||
blockId: BlockId, | ||
data: BlockValues, | ||
level: StorageLevel, | ||
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { | ||
tellMaster: Boolean = true, | ||
effectiveStorageLevel: Option[StorageLevel] = None) | ||
: Seq[(BlockId, BlockStatus)] = { | ||
|
||
require(blockId != null, "BlockId is null") | ||
require(level != null && level.isValid, "StorageLevel is null or invalid") | ||
effectiveStorageLevel.foreach { level => | ||
require(level != null && level.isValid, "Effective StorageLevel is null or invalid") | ||
} | ||
|
||
// Return value | ||
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] | ||
|
@@ -657,13 +674,16 @@ private[spark] class BlockManager( | |
// Size of the block in bytes | ||
var size = 0L | ||
|
||
// The level we actually use to put the block | ||
val putLevel = effectiveStorageLevel.getOrElse(level) | ||
|
||
// If we're storing bytes, then initiate the replication before storing them locally. | ||
// This is faster as data is already serialized and ready to send. | ||
val replicationFuture = data match { | ||
case b: ByteBufferValues if level.replication > 1 => | ||
case b: ByteBufferValues if putLevel.replication > 1 => | ||
// Duplicate doesn't copy the bytes, but just creates a wrapper | ||
val bufferView = b.buffer.duplicate() | ||
Future { replicate(blockId, bufferView, level) } | ||
Future { replicate(blockId, bufferView, putLevel) } | ||
case _ => null | ||
} | ||
|
||
|
@@ -676,18 +696,18 @@ private[spark] class BlockManager( | |
// returnValues - Whether to return the values put | ||
// blockStore - The type of storage to put these values into | ||
val (returnValues, blockStore: BlockStore) = { | ||
if (level.useMemory) { | ||
if (putLevel.useMemory) { | ||
// Put it in memory first, even if it also has useDisk set to true; | ||
// We will drop it to disk later if the memory store can't hold it. | ||
(true, memoryStore) | ||
} else if (level.useOffHeap) { | ||
} else if (putLevel.useOffHeap) { | ||
// Use tachyon for off-heap storage | ||
(false, tachyonStore) | ||
} else if (level.useDisk) { | ||
} else if (putLevel.useDisk) { | ||
// Don't get back the bytes from put unless we replicate them | ||
(level.replication > 1, diskStore) | ||
(putLevel.replication > 1, diskStore) | ||
} else { | ||
assert(level == StorageLevel.NONE) | ||
assert(putLevel == StorageLevel.NONE) | ||
throw new BlockException( | ||
blockId, s"Attempted to put block $blockId without specifying storage level!") | ||
} | ||
|
@@ -696,22 +716,22 @@ private[spark] class BlockManager( | |
// Actually put the values | ||
val result = data match { | ||
case IteratorValues(iterator) => | ||
blockStore.putValues(blockId, iterator, level, returnValues) | ||
case ArrayBufferValues(array) => | ||
blockStore.putValues(blockId, array, level, returnValues) | ||
blockStore.putIterator(blockId, iterator, putLevel, returnValues) | ||
case ArrayValues(array) => | ||
blockStore.putArray(blockId, array, putLevel, returnValues) | ||
case ByteBufferValues(bytes) => | ||
bytes.rewind() | ||
blockStore.putBytes(blockId, bytes, level) | ||
blockStore.putBytes(blockId, bytes, putLevel) | ||
} | ||
size = result.size | ||
result.data match { | ||
case Left (newIterator) if level.useMemory => valuesAfterPut = newIterator | ||
case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator | ||
case Right (newBytes) => bytesAfterPut = newBytes | ||
case _ => | ||
} | ||
|
||
// Keep track of which blocks are dropped from memory | ||
if (level.useMemory) { | ||
if (putLevel.useMemory) { | ||
result.droppedBlocks.foreach { updatedBlocks += _ } | ||
} | ||
|
||
|
@@ -742,7 +762,7 @@ private[spark] class BlockManager( | |
|
||
// Either we're storing bytes and we asynchronously started replication, or we're storing | ||
// values and need to serialize and replicate them now: | ||
if (level.replication > 1) { | ||
if (putLevel.replication > 1) { | ||
data match { | ||
case ByteBufferValues(bytes) => | ||
if (replicationFuture != null) { | ||
|
@@ -758,15 +778,15 @@ private[spark] class BlockManager( | |
} | ||
bytesAfterPut = dataSerialize(blockId, valuesAfterPut) | ||
} | ||
replicate(blockId, bytesAfterPut, level) | ||
replicate(blockId, bytesAfterPut, putLevel) | ||
logDebug("Put block %s remotely took %s" | ||
.format(blockId, Utils.getUsedTimeMs(remoteStartTime))) | ||
} | ||
} | ||
|
||
BlockManager.dispose(bytesAfterPut) | ||
|
||
if (level.replication > 1) { | ||
if (putLevel.replication > 1) { | ||
logDebug("Putting block %s with replication took %s" | ||
.format(blockId, Utils.getUsedTimeMs(startTimeMs))) | ||
} else { | ||
|
@@ -818,7 +838,7 @@ private[spark] class BlockManager( | |
value: Any, | ||
level: StorageLevel, | ||
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { | ||
put(blockId, Iterator(value), level, tellMaster) | ||
putIterator(blockId, Iterator(value), level, tellMaster) | ||
} | ||
|
||
/** | ||
|
@@ -829,7 +849,7 @@ private[spark] class BlockManager( | |
*/ | ||
def dropFromMemory( | ||
blockId: BlockId, | ||
data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = { | ||
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { | ||
|
||
logInfo(s"Dropping block $blockId from memory") | ||
val info = blockInfo.get(blockId).orNull | ||
|
@@ -853,7 +873,7 @@ private[spark] class BlockManager( | |
logInfo(s"Writing block $blockId to disk") | ||
data match { | ||
case Left(elements) => | ||
diskStore.putValues(blockId, elements, level, returnValues = false) | ||
diskStore.putArray(blockId, elements, level, returnValues = false) | ||
case Right(bytes) => | ||
diskStore.putBytes(blockId, bytes, level) | ||
} | ||
|
@@ -1068,9 +1088,11 @@ private[spark] class BlockManager( | |
private[spark] object BlockManager extends Logging { | ||
private val ID_GENERATOR = new IdGenerator | ||
|
||
/** Return the total amount of storage memory available. */ | ||
private def getMaxMemory(conf: SparkConf): Long = { | ||
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) | ||
(Runtime.getRuntime.maxMemory * memoryFraction).toLong | ||
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) | ||
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong | ||
} | ||
|
||
def getHeartBeatFrequency(conf: SparkConf): Long = | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we have
putIterator
interface now, could we just let theblockManager
to handle this logic? CallingunrollSafely
here looks a little coupling, maybe it's better to markunrollSafely
as private and hide it outside memory store.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is first introduced at 40566e1.
It was aimed to avoid unroll the iterator into an ArrayBuffer if block is
DISK_ONLY
. But with the appearance ofputIterator
interface inblockManager
, this logic can be easily done inblockManager
and we can clean the code here. Did I miss something here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this part is actually really tricky. If you just used
putIterator
here, the result would be incorrect and the reason behind that is quite subtle. Here ingetOrCompute
we need to return the actual iterator in addition to storing it inBlockManager
, so if we just useputIterator
withMEMORY_ONLY
level, then other threads might drop our block before we get to read it back, in which case we will have nothing to return because our original iterator was already exhausted. We have no other choice but to attempt to unroll it here first.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, got it. So it's a follow of the origin implementation:
Then we can ensure the returned iterator will always have data for user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, except the original implementation OOM's easily.