diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 8f867686a0443..5ddda4d6953fa 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -17,9 +17,9 @@ package org.apache.spark -import scala.collection.mutable.{ArrayBuffer, HashSet} +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer -import org.apache.spark.executor.InputMetrics import org.apache.spark.rdd.RDD import org.apache.spark.storage._ @@ -30,7 +30,7 @@ import org.apache.spark.storage._ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { /** Keys of RDD partitions that are being computed/loaded. */ - private val loading = new HashSet[RDDBlockId]() + private val loading = new mutable.HashSet[RDDBlockId] /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T]( @@ -118,21 +118,29 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } /** - * Cache the values of a partition, keeping track of any updates in the storage statuses - * of other blocks along the way. + * Cache the values of a partition, keeping track of any updates in the storage statuses of + * other blocks along the way. + * + * The effective storage level refers to the level that actually specifies BlockManager put + * behavior, not the level originally specified by the user. This is mainly for forcing a + * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition, + * while preserving the the original semantics of the RDD as specified by the application. */ private def putInBlockManager[T]( key: BlockId, values: Iterator[T], - storageLevel: StorageLevel, - updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = { - - if (!storageLevel.useMemory) { - /* This RDD is not to be cached in memory, so we can just pass the computed values - * as an iterator directly to the BlockManager, rather than first fully unrolling - * it in memory. The latter option potentially uses much more memory and risks OOM - * exceptions that can be avoided. */ - updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true) + level: StorageLevel, + updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)], + effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = { + + val putLevel = effectiveStorageLevel.getOrElse(level) + if (!putLevel.useMemory) { + /* + * This RDD is not to be cached in memory, so we can just pass the computed values as an + * iterator directly to the BlockManager rather than first fully unrolling it in memory. + */ + updatedBlocks ++= + blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel) blockManager.get(key) match { case Some(v) => v.data.asInstanceOf[Iterator[T]] case None => @@ -140,14 +148,36 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { throw new BlockException(key, s"Block manager failed to return cached value for $key!") } } else { - /* This RDD is to be cached in memory. In this case we cannot pass the computed values + /* + * This RDD is to be cached in memory. In this case we cannot pass the computed values * to the BlockManager as an iterator and expect to read it back later. This is because - * we may end up dropping a partition from memory store before getting it back, e.g. - * when the entirety of the RDD does not fit in memory. */ - val elements = new ArrayBuffer[Any] - elements ++= values - updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator.asInstanceOf[Iterator[T]] + * we may end up dropping a partition from memory store before getting it back. + * + * In addition, we must be careful to not unroll the entire partition in memory at once. + * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this + * single partition. Instead, we unroll the values cautiously, potentially aborting and + * dropping the partition to disk if applicable. + */ + blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match { + case Left(arr) => + // We have successfully unrolled the entire partition, so cache it in memory + updatedBlocks ++= + blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel) + arr.iterator.asInstanceOf[Iterator[T]] + case Right(it) => + // There is not enough space to cache this partition in memory + logWarning(s"Not enough space to cache partition $key in memory! " + + s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.") + val returnValues = it.asInstanceOf[Iterator[T]] + if (putLevel.useDisk) { + logWarning(s"Persisting partition $key to disk instead.") + val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false, + useOffHeap = false, deserialized = false, putLevel.replication) + putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel)) + } else { + returnValues + } + } } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 8f70744d804d9..6ee731b22c03c 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -67,7 +67,7 @@ class SparkEnv ( val metricsSystem: MetricsSystem, val conf: SparkConf) extends Logging { - // A mapping of thread ID to amount of memory used for shuffle in bytes + // A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b16133b20cc02..3b69bc4ca4142 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -266,11 +266,13 @@ private[spark] class Executor( } } } finally { - // TODO: Unregister shuffle memory only for ResultTask + // Release memory used by this thread for shuffles val shuffleMemoryMap = env.shuffleMemoryMap shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(Thread.currentThread().getId) } + // Release memory used by this thread for unrolling blocks + env.blockManager.memoryStore.releaseUnrollMemoryForThisThread() runningTasks.remove(taskId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0db0a5bc7341b..d746526639e58 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -38,7 +38,7 @@ import org.apache.spark.util._ private[spark] sealed trait BlockValues private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues -private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues +private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues /* Class for returning a fetched block and associated metrics. */ private[spark] class BlockResult( @@ -71,9 +71,9 @@ private[spark] class BlockManager( // Actual storage of where blocks are kept private var tachyonInitialized = false - private[storage] val memoryStore = new MemoryStore(this, maxMemory) - private[storage] val diskStore = new DiskStore(this, diskBlockManager) - private[storage] lazy val tachyonStore: TachyonStore = { + private[spark] val memoryStore = new MemoryStore(this, maxMemory) + private[spark] val diskStore = new DiskStore(this, diskBlockManager) + private[spark] lazy val tachyonStore: TachyonStore = { val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon") val appFolderName = conf.get("spark.tachyonStore.folderName") val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}" @@ -463,16 +463,17 @@ private[spark] class BlockManager( val values = dataDeserialize(blockId, bytes) if (level.deserialized) { // Cache the values before returning them - // TODO: Consider creating a putValues that also takes in a iterator? - val valuesBuffer = new ArrayBuffer[Any] - valuesBuffer ++= values - memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data - match { - case Left(values2) => - return Some(new BlockResult(values2, DataReadMethod.Disk, info.size)) - case _ => - throw new SparkException("Memory store did not return back an iterator") - } + val putResult = memoryStore.putIterator( + blockId, values, level, returnValues = true, allowPersistToDisk = false) + // The put may or may not have succeeded, depending on whether there was enough + // space to unroll the block. Either way, the put here should return an iterator. + putResult.data match { + case Left(it) => + return Some(new BlockResult(it, DataReadMethod.Disk, info.size)) + case _ => + // This only happens if we dropped the values back to disk (which is never) + throw new SparkException("Memory store did not return an iterator!") + } } else { return Some(new BlockResult(values, DataReadMethod.Disk, info.size)) } @@ -561,13 +562,14 @@ private[spark] class BlockManager( iter } - def put( + def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, - tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = { + tellMaster: Boolean = true, + effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { require(values != null, "Values is null") - doPut(blockId, IteratorValues(values), level, tellMaster) + doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel) } /** @@ -589,13 +591,14 @@ private[spark] class BlockManager( * Put a new block of values to the block manager. * Return a list of blocks updated as a result of this put. */ - def put( + def putArray( blockId: BlockId, - values: ArrayBuffer[Any], + values: Array[Any], level: StorageLevel, - tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { + tellMaster: Boolean = true, + effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { require(values != null, "Values is null") - doPut(blockId, ArrayBufferValues(values), level, tellMaster) + doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel) } /** @@ -606,19 +609,33 @@ private[spark] class BlockManager( blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, - tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { + tellMaster: Boolean = true, + effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { require(bytes != null, "Bytes is null") - doPut(blockId, ByteBufferValues(bytes), level, tellMaster) + doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel) } + /** + * Put the given block according to the given level in one of the block stores, replicating + * the values if necessary. + * + * The effective storage level refers to the level according to which the block will actually be + * handled. This allows the caller to specify an alternate behavior of doPut while preserving + * the original level specified by the user. + */ private def doPut( blockId: BlockId, data: BlockValues, level: StorageLevel, - tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { + tellMaster: Boolean = true, + effectiveStorageLevel: Option[StorageLevel] = None) + : Seq[(BlockId, BlockStatus)] = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") + effectiveStorageLevel.foreach { level => + require(level != null && level.isValid, "Effective StorageLevel is null or invalid") + } // Return value val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] @@ -657,13 +674,16 @@ private[spark] class BlockManager( // Size of the block in bytes var size = 0L + // The level we actually use to put the block + val putLevel = effectiveStorageLevel.getOrElse(level) + // If we're storing bytes, then initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. val replicationFuture = data match { - case b: ByteBufferValues if level.replication > 1 => + case b: ByteBufferValues if putLevel.replication > 1 => // Duplicate doesn't copy the bytes, but just creates a wrapper val bufferView = b.buffer.duplicate() - Future { replicate(blockId, bufferView, level) } + Future { replicate(blockId, bufferView, putLevel) } case _ => null } @@ -676,18 +696,18 @@ private[spark] class BlockManager( // returnValues - Whether to return the values put // blockStore - The type of storage to put these values into val (returnValues, blockStore: BlockStore) = { - if (level.useMemory) { + if (putLevel.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. (true, memoryStore) - } else if (level.useOffHeap) { + } else if (putLevel.useOffHeap) { // Use tachyon for off-heap storage (false, tachyonStore) - } else if (level.useDisk) { + } else if (putLevel.useDisk) { // Don't get back the bytes from put unless we replicate them - (level.replication > 1, diskStore) + (putLevel.replication > 1, diskStore) } else { - assert(level == StorageLevel.NONE) + assert(putLevel == StorageLevel.NONE) throw new BlockException( blockId, s"Attempted to put block $blockId without specifying storage level!") } @@ -696,22 +716,22 @@ private[spark] class BlockManager( // Actually put the values val result = data match { case IteratorValues(iterator) => - blockStore.putValues(blockId, iterator, level, returnValues) - case ArrayBufferValues(array) => - blockStore.putValues(blockId, array, level, returnValues) + blockStore.putIterator(blockId, iterator, putLevel, returnValues) + case ArrayValues(array) => + blockStore.putArray(blockId, array, putLevel, returnValues) case ByteBufferValues(bytes) => bytes.rewind() - blockStore.putBytes(blockId, bytes, level) + blockStore.putBytes(blockId, bytes, putLevel) } size = result.size result.data match { - case Left (newIterator) if level.useMemory => valuesAfterPut = newIterator + case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator case Right (newBytes) => bytesAfterPut = newBytes case _ => } // Keep track of which blocks are dropped from memory - if (level.useMemory) { + if (putLevel.useMemory) { result.droppedBlocks.foreach { updatedBlocks += _ } } @@ -742,7 +762,7 @@ private[spark] class BlockManager( // Either we're storing bytes and we asynchronously started replication, or we're storing // values and need to serialize and replicate them now: - if (level.replication > 1) { + if (putLevel.replication > 1) { data match { case ByteBufferValues(bytes) => if (replicationFuture != null) { @@ -758,7 +778,7 @@ private[spark] class BlockManager( } bytesAfterPut = dataSerialize(blockId, valuesAfterPut) } - replicate(blockId, bytesAfterPut, level) + replicate(blockId, bytesAfterPut, putLevel) logDebug("Put block %s remotely took %s" .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) } @@ -766,7 +786,7 @@ private[spark] class BlockManager( BlockManager.dispose(bytesAfterPut) - if (level.replication > 1) { + if (putLevel.replication > 1) { logDebug("Putting block %s with replication took %s" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } else { @@ -818,7 +838,7 @@ private[spark] class BlockManager( value: Any, level: StorageLevel, tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { - put(blockId, Iterator(value), level, tellMaster) + putIterator(blockId, Iterator(value), level, tellMaster) } /** @@ -829,7 +849,7 @@ private[spark] class BlockManager( */ def dropFromMemory( blockId: BlockId, - data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = { + data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { logInfo(s"Dropping block $blockId from memory") val info = blockInfo.get(blockId).orNull @@ -853,7 +873,7 @@ private[spark] class BlockManager( logInfo(s"Writing block $blockId to disk") data match { case Left(elements) => - diskStore.putValues(blockId, elements, level, returnValues = false) + diskStore.putArray(blockId, elements, level, returnValues = false) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } @@ -1068,9 +1088,11 @@ private[spark] class BlockManager( private[spark] object BlockManager extends Logging { private val ID_GENERATOR = new IdGenerator + /** Return the total amount of storage memory available. */ private def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) - (Runtime.getRuntime.maxMemory * memoryFraction).toLong + val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) + (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } def getHeartBeatFrequency(conf: SparkConf): Long = diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index b9b53b1a2f118..69985c9759e2d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -37,15 +37,15 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends * @return a PutResult that contains the size of the data, as well as the values put if * returnValues is true (if not, the result's data field can be null) */ - def putValues( + def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, returnValues: Boolean): PutResult - def putValues( + def putArray( blockId: BlockId, - values: ArrayBuffer[Any], + values: Array[Any], level: StorageLevel, returnValues: Boolean): PutResult diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index ebff0cb5ba153..c83261dd91b36 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -21,8 +21,6 @@ import java.io.{FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.Logging import org.apache.spark.serializer.Serializer import org.apache.spark.util.Utils @@ -30,7 +28,7 @@ import org.apache.spark.util.Utils /** * Stores BlockManager blocks on disk. */ -private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) +private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) extends BlockStore(blockManager) with Logging { val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L) @@ -57,15 +55,15 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage PutResult(bytes.limit(), Right(bytes.duplicate())) } - override def putValues( + override def putArray( blockId: BlockId, - values: ArrayBuffer[Any], + values: Array[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - putValues(blockId, values.toIterator, level, returnValues) + putIterator(blockId, values.toIterator, level, returnValues) } - override def putValues( + override def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 71f66c826c5b3..28f675c2bbb1e 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -20,27 +20,45 @@ package org.apache.spark.storage import java.nio.ByteBuffer import java.util.LinkedHashMap +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.util.{SizeEstimator, Utils} +import org.apache.spark.util.collection.SizeTrackingVector private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) /** - * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as + * Stores blocks in memory, either as Arrays of deserialized Java objects or as * serialized ByteBuffers. */ -private class MemoryStore(blockManager: BlockManager, maxMemory: Long) +private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) extends BlockStore(blockManager) { + private val conf = blockManager.conf private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) + @volatile private var currentMemory = 0L - // Object used to ensure that only one thread is putting blocks and if necessary, dropping - // blocks from the memory store. - private val putLock = new Object() + + // Ensure only one thread is putting, and if necessary, dropping blocks at any given time + private val accountingLock = new Object + + // A mapping from thread ID to amount of memory used for unrolling a block (in bytes) + // All accesses of this map are assumed to have manually synchronized on `accountingLock` + private val unrollMemoryMap = mutable.HashMap[Long, Long]() + + /** + * The amount of space ensured for unrolling values in memory, shared across all cores. + * This space is not reserved in advance, but allocated dynamically by dropping existing blocks. + */ + private val maxUnrollMemory: Long = { + val unrollFraction = conf.getDouble("spark.storage.unrollFraction", 0.2) + (maxMemory * unrollFraction).toLong + } logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory))) + /** Free memory not occupied by existing blocks. Note that this does not include unroll memory. */ def freeMemory: Long = maxMemory - currentMemory override def getSize(blockId: BlockId): Long = { @@ -55,20 +73,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) bytes.rewind() if (level.deserialized) { val values = blockManager.dataDeserialize(blockId, bytes) - val elements = new ArrayBuffer[Any] - elements ++= values - val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) - val putAttempt = tryToPut(blockId, elements, sizeEstimate, deserialized = true) - PutResult(sizeEstimate, Left(values.toIterator), putAttempt.droppedBlocks) + putIterator(blockId, values, level, returnValues = true) } else { val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) } } - override def putValues( + override def putArray( blockId: BlockId, - values: ArrayBuffer[Any], + values: Array[Any], level: StorageLevel, returnValues: Boolean): PutResult = { if (level.deserialized) { @@ -82,14 +96,52 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } - override def putValues( + override def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - val valueEntries = new ArrayBuffer[Any]() - valueEntries ++= values - putValues(blockId, valueEntries, level, returnValues) + putIterator(blockId, values, level, returnValues, allowPersistToDisk = true) + } + + /** + * Attempt to put the given block in memory store. + * + * There may not be enough space to fully unroll the iterator in memory, in which case we + * optionally drop the values to disk if + * (1) the block's storage level specifies useDisk, and + * (2) `allowPersistToDisk` is true. + * + * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block + * back from disk and attempts to cache it in memory. In this case, we should not persist the + * block back on disk again, as it is already in disk store. + */ + private[storage] def putIterator( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel, + returnValues: Boolean, + allowPersistToDisk: Boolean): PutResult = { + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + val unrolledValues = unrollSafely(blockId, values, droppedBlocks) + unrolledValues match { + case Left(arrayValues) => + // Values are fully unrolled in memory, so store them as an array + val res = putArray(blockId, arrayValues, level, returnValues) + droppedBlocks ++= res.droppedBlocks + PutResult(res.size, res.data, droppedBlocks) + case Right(iteratorValues) => + // Not enough space to unroll this block; drop to disk if applicable + logWarning(s"Not enough space to store block $blockId in memory! " + + s"Free memory is $freeMemory bytes.") + if (level.useDisk && allowPersistToDisk) { + logWarning(s"Persisting block $blockId to disk instead.") + val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) + PutResult(res.size, res.data, droppedBlocks) + } else { + PutResult(0, Left(iteratorValues), droppedBlocks) + } + } } override def getBytes(blockId: BlockId): Option[ByteBuffer] = { @@ -99,7 +151,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (entry == null) { None } else if (entry.deserialized) { - Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)) + Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator)) } else { Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data } @@ -112,7 +164,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (entry == null) { None } else if (entry.deserialized) { - Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) + Some(entry.value.asInstanceOf[Array[Any]].iterator) } else { val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data Some(blockManager.dataDeserialize(blockId, buffer)) @@ -140,6 +192,93 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) logInfo("MemoryStore cleared") } + /** + * Unroll the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unrolling the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unrolling blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either an array with the contents of the entire block or an iterator + * containing the values of the block (if the array would have exceeded available memory). + */ + def unrollSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) + : Either[Array[Any], Iterator[Any]] = { + + // Number of elements unrolled so far + var elementsUnrolled = 0 + // Whether there is still enough memory for us to continue unrolling this block + var keepUnrolling = true + // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing. + val initialMemoryThreshold = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) + // How often to check whether we need to request more memory + val memoryCheckPeriod = 16 + // Memory currently reserved by this thread for this particular unrolling operation + var memoryThreshold = initialMemoryThreshold + // Memory to request as a multiple of current vector size + val memoryGrowthFactor = 1.5 + // Previous unroll memory held by this thread, for releasing later (only at the very end) + val previousMemoryReserved = currentUnrollMemoryForThisThread + // Underlying vector for unrolling the block + var vector = new SizeTrackingVector[Any] + + // Request enough memory to begin unrolling + keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold) + + // Unroll this block safely, checking whether we have exceeded our threshold periodically + try { + while (values.hasNext && keepUnrolling) { + vector += values.next() + if (elementsUnrolled % memoryCheckPeriod == 0) { + // If our vector's size has exceeded the threshold, request more memory + val currentSize = vector.estimateSize() + if (currentSize >= memoryThreshold) { + val amountToRequest = (currentSize * (memoryGrowthFactor - 1)).toLong + // Hold the accounting lock, in case another thread concurrently puts a block that + // takes up the unrolling space we just ensured here + accountingLock.synchronized { + if (!reserveUnrollMemoryForThisThread(amountToRequest)) { + // If the first request is not granted, try again after ensuring free space + // If there is still not enough space, give up and drop the partition + val spaceToEnsure = maxUnrollMemory - currentUnrollMemory + if (spaceToEnsure > 0) { + val result = ensureFreeSpace(blockId, spaceToEnsure) + droppedBlocks ++= result.droppedBlocks + } + keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest) + } + } + // New threshold is currentSize * memoryGrowthFactor + memoryThreshold = currentSize + amountToRequest + } + } + elementsUnrolled += 1 + } + + if (keepUnrolling) { + // We successfully unrolled the entirety of this block + Left(vector.toArray) + } else { + // We ran out of space while unrolling the values for this block + Right(vector.iterator ++ values) + } + + } finally { + // If we return an array, the values returned do not depend on the underlying vector and + // we can immediately free up space for other threads. Otherwise, if we return an iterator, + // we release the memory claimed by this thread later on when the task finishes. + if (keepUnrolling) { + val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved + releaseUnrollMemoryForThisThread(amountToRelease) + } + } + } + /** * Return the RDD ID that a given block ID is from, or None if it is not an RDD block. */ @@ -149,10 +288,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) /** * Try to put in a set of values, if we can free up enough space. The value should either be - * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) - * size must also be passed by the caller. + * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size + * must also be passed by the caller. * - * Lock on the object putLock to ensure that all the put requests and its associated block + * Synchronize on `accountingLock` to ensure that all the put requests and its associated block * dropping is done by only on thread at a time. Otherwise while one thread is dropping * blocks to free memory for one block, another thread may use up the freed space for * another block. @@ -174,7 +313,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) var putSuccess = false val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - putLock.synchronized { + accountingLock.synchronized { val freeSpaceResult = ensureFreeSpace(blockId, size) val enoughFreeSpace = freeSpaceResult.success droppedBlocks ++= freeSpaceResult.droppedBlocks @@ -193,7 +332,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. val data = if (deserialized) { - Left(value.asInstanceOf[ArrayBuffer[Any]]) + Left(value.asInstanceOf[Array[Any]]) } else { Right(value.asInstanceOf[ByteBuffer].duplicate()) } @@ -210,12 +349,14 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that * don't fit into memory that we want to avoid). * - * Assume that a lock is held by the caller to ensure only one thread is dropping blocks. - * Otherwise, the freed space may fill up before the caller puts in their new value. + * Assume that `accountingLock` is held by the caller to ensure only one thread is dropping + * blocks. Otherwise, the freed space may fill up before the caller puts in their new value. * * Return whether there is enough free space, along with the blocks dropped in the process. */ - private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = { + private def ensureFreeSpace( + blockIdToAdd: BlockId, + space: Long): ResultWithDroppedBlocks = { logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory") val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] @@ -225,9 +366,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) return ResultWithDroppedBlocks(success = false, droppedBlocks) } - if (maxMemory - currentMemory < space) { + // Take into account the amount of memory currently occupied by unrolling blocks + val actualFreeMemory = freeMemory - currentUnrollMemory + + if (actualFreeMemory < space) { val rddToAdd = getRddId(blockIdToAdd) - val selectedBlocks = new ArrayBuffer[BlockId]() + val selectedBlocks = new ArrayBuffer[BlockId] var selectedMemory = 0L // This is synchronized to ensure that the set of entries is not changed @@ -235,7 +379,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) // can lead to exceptions. entries.synchronized { val iterator = entries.entrySet().iterator() - while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { + while (actualFreeMemory + selectedMemory < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { @@ -245,7 +389,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } - if (maxMemory - (currentMemory - selectedMemory) >= space) { + if (actualFreeMemory + selectedMemory >= space) { logInfo(s"${selectedBlocks.size} blocks selected for dropping") for (blockId <- selectedBlocks) { val entry = entries.synchronized { entries.get(blockId) } @@ -254,7 +398,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) // future safety. if (entry != null) { val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[ArrayBuffer[Any]]) + Left(entry.value.asInstanceOf[Array[Any]]) } else { Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } @@ -275,8 +419,56 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def contains(blockId: BlockId): Boolean = { entries.synchronized { entries.containsKey(blockId) } } + + /** + * Reserve additional memory for unrolling blocks used by this thread. + * Return whether the request is granted. + */ + private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = { + accountingLock.synchronized { + val granted = freeMemory > currentUnrollMemory + memory + if (granted) { + val threadId = Thread.currentThread().getId + unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory + } + granted + } + } + + /** + * Release memory used by this thread for unrolling blocks. + * If the amount is not specified, remove the current thread's allocation altogether. + */ + private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + if (memory < 0) { + unrollMemoryMap.remove(threadId) + } else { + unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory + // If this thread claims no more unroll memory, release it completely + if (unrollMemoryMap(threadId) <= 0) { + unrollMemoryMap.remove(threadId) + } + } + } + } + + /** + * Return the amount of memory currently occupied for unrolling blocks across all threads. + */ + private[spark] def currentUnrollMemory: Long = accountingLock.synchronized { + unrollMemoryMap.values.sum + } + + /** + * Return the amount of memory currently occupied for unrolling blocks by this thread. + */ + private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized { + unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L) + } } -private case class ResultWithDroppedBlocks( +private[spark] case class ResultWithDroppedBlocks( success: Boolean, droppedBlocks: Seq[(BlockId, BlockStatus)]) diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index d8ff4ff6bd42c..932b5616043b4 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -20,8 +20,6 @@ package org.apache.spark.storage import java.io.IOException import java.nio.ByteBuffer -import scala.collection.mutable.ArrayBuffer - import tachyon.client.{ReadType, WriteType} import org.apache.spark.Logging @@ -30,7 +28,7 @@ import org.apache.spark.util.Utils /** * Stores BlockManager blocks on Tachyon. */ -private class TachyonStore( +private[spark] class TachyonStore( blockManager: BlockManager, tachyonManager: TachyonBlockManager) extends BlockStore(blockManager: BlockManager) with Logging { @@ -45,15 +43,15 @@ private class TachyonStore( putIntoTachyonStore(blockId, bytes, returnValues = true) } - override def putValues( + override def putArray( blockId: BlockId, - values: ArrayBuffer[Any], + values: Array[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - putValues(blockId, values.toIterator, level, returnValues) + putIterator(blockId, values.toIterator, level, returnValues) } - override def putValues( + override def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index 328be158db680..75c2e09a6bbb8 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -48,7 +48,7 @@ private[spark] object ThreadingTest { val block = (1 to blockSize).map(_ => Random.nextInt()) val level = randomLevel() val startTime = System.currentTimeMillis() - manager.put(blockId, block.iterator, level, tellMaster = true) + manager.putIterator(blockId, block.iterator, level, tellMaster = true) println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") queue.add((blockId, block)) } diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index 08465575309c6..bce3b3afe9aba 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -180,7 +180,7 @@ private[spark] object SizeEstimator extends Logging { } } - // Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling. + // Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling. private val ARRAY_SIZE_FOR_SAMPLING = 200 private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala index b84eb65c62bc7..7e76d060d6000 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala @@ -36,7 +36,7 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: _array(index) } - def +=(value: V) { + def +=(value: V): Unit = { if (_numElements == _array.length) { resize(_array.length * 2) } @@ -50,6 +50,19 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: def size: Int = _numElements + def iterator: Iterator[V] = new Iterator[V] { + var index = 0 + override def hasNext: Boolean = index < _numElements + override def next(): V = { + if (!hasNext) { + throw new NoSuchElementException + } + val value = _array(index) + index += 1 + value + } + } + /** Gets the underlying array backing this vector. */ def array: Array[V] = _array diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala new file mode 100644 index 0000000000000..3eb1010dc1e8d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.collection.mutable + +import org.apache.spark.util.SizeEstimator + +/** + * A general interface for collections to keep track of their estimated sizes in bytes. + * We sample with a slow exponential back-off using the SizeEstimator to amortize the time, + * as each call to SizeEstimator is somewhat expensive (order of a few milliseconds). + */ +private[spark] trait SizeTracker { + + import SizeTracker._ + + /** + * Controls the base of the exponential which governs the rate of sampling. + * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements. + */ + private val SAMPLE_GROWTH_RATE = 1.1 + + /** Samples taken since last resetSamples(). Only the last two are kept for extrapolation. */ + private val samples = new mutable.Queue[Sample] + + /** The average number of bytes per update between our last two samples. */ + private var bytesPerUpdate: Double = _ + + /** Total number of insertions and updates into the map since the last resetSamples(). */ + private var numUpdates: Long = _ + + /** The value of 'numUpdates' at which we will take our next sample. */ + private var nextSampleNum: Long = _ + + resetSamples() + + /** + * Reset samples collected so far. + * This should be called after the collection undergoes a dramatic change in size. + */ + protected def resetSamples(): Unit = { + numUpdates = 1 + nextSampleNum = 1 + samples.clear() + takeSample() + } + + /** + * Callback to be invoked after every update. + */ + protected def afterUpdate(): Unit = { + numUpdates += 1 + if (nextSampleNum == numUpdates) { + takeSample() + } + } + + /** + * Take a new sample of the current collection's size. + */ + private def takeSample(): Unit = { + samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates)) + // Only use the last two samples to extrapolate + if (samples.size > 2) { + samples.dequeue() + } + val bytesDelta = samples.toList.reverse match { + case latest :: previous :: tail => + (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates) + // If fewer than 2 samples, assume no change + case _ => 0 + } + bytesPerUpdate = math.max(0, bytesDelta) + nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong + } + + /** + * Estimate the current size of the collection in bytes. O(1) time. + */ + def estimateSize(): Long = { + assert(samples.nonEmpty) + val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates) + (samples.last.size + extrapolatedDelta).toLong + } +} + +private object SizeTracker { + case class Sample(size: Long, numUpdates: Long) +} diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala index 204330dad48b9..de61e1d17fe10 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala @@ -17,85 +17,24 @@ package org.apache.spark.util.collection -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.util.SizeEstimator -import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample - /** - * Append-only map that keeps track of its estimated size in bytes. - * We sample with a slow exponential back-off using the SizeEstimator to amortize the time, - * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds). + * An append-only map that keeps track of its estimated size in bytes. */ -private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] { - - /** - * Controls the base of the exponential which governs the rate of sampling. - * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements. - */ - private val SAMPLE_GROWTH_RATE = 1.1 - - /** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */ - private val samples = new ArrayBuffer[Sample]() - - /** Total number of insertions and updates into the map since the last resetSamples(). */ - private var numUpdates: Long = _ - - /** The value of 'numUpdates' at which we will take our next sample. */ - private var nextSampleNum: Long = _ - - /** The average number of bytes per update between our last two samples. */ - private var bytesPerUpdate: Double = _ - - resetSamples() - - /** Called after the map grows in size, as this can be a dramatic change for small objects. */ - def resetSamples() { - numUpdates = 1 - nextSampleNum = 1 - samples.clear() - takeSample() - } +private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] with SizeTracker { override def update(key: K, value: V): Unit = { super.update(key, value) - numUpdates += 1 - if (nextSampleNum == numUpdates) { takeSample() } + super.afterUpdate() } override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { val newValue = super.changeValue(key, updateFunc) - numUpdates += 1 - if (nextSampleNum == numUpdates) { takeSample() } + super.afterUpdate() newValue } - /** Takes a new sample of the current map's size. */ - def takeSample() { - samples += Sample(SizeEstimator.estimate(this), numUpdates) - // Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change. - bytesPerUpdate = math.max(0, samples.toSeq.reverse match { - case latest :: previous :: tail => - (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates) - case _ => - 0 - }) - nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong - } - - override protected def growTable() { + override protected def growTable(): Unit = { super.growTable() resetSamples() } - - /** Estimates the current size of the map in bytes. O(1) time. */ - def estimateSize(): Long = { - assert(samples.nonEmpty) - val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates) - (samples.last.size + extrapolatedDelta).toLong - } -} - -private object SizeTrackingAppendOnlyMap { - case class Sample(size: Long, numUpdates: Long) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala new file mode 100644 index 0000000000000..65a7b4e0d497b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.reflect.ClassTag + +/** + * An append-only buffer that keeps track of its estimated size in bytes. + */ +private[spark] class SizeTrackingVector[T: ClassTag] + extends PrimitiveVector[T] + with SizeTracker { + + override def +=(value: T): Unit = { + super.+=(value) + super.afterUpdate() + } + + override def resize(newLength: Int): PrimitiveVector[T] = { + super.resize(newLength) + resetSamples() + this + } + + /** + * Return a trimmed version of the underlying array. + */ + def toArray: Array[T] = { + super.iterator.toArray + } +} diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 7f5d0b061e8b0..9c5f394d3899d 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark -import scala.collection.mutable.ArrayBuffer - import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.mock.EasyMockSugar @@ -52,22 +50,21 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } test("get uncached rdd") { - expecting { - blockManager.get(RDDBlockId(0, 0)).andReturn(None) - blockManager.put(RDDBlockId(0, 0), ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY, - true).andStubReturn(Seq[(BlockId, BlockStatus)]()) - } - - whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0) - val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) - assert(value.toList === List(1, 2, 3, 4)) - } + // Do not mock this test, because attempting to match Array[Any], which is not covariant, + // in blockManager.put is a losing battle. You have been warned. + blockManager = sc.env.blockManager + cacheManager = sc.env.cacheManager + val context = new TaskContext(0, 0, 0) + val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) + val getValue = blockManager.get(RDDBlockId(rdd.id, split.index)) + assert(computeValue.toList === List(1, 2, 3, 4)) + assert(getValue.isDefined, "Block cached from getOrCompute is not found!") + assert(getValue.get.data.toList === List(1, 2, 3, 4)) } test("get cached rdd") { expecting { - val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12) + val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12) blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result)) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 23cb6905bfdeb..dd4fd535d3577 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -31,7 +31,7 @@ import org.scalatest.concurrent.Timeouts._ import org.scalatest.Matchers import org.scalatest.time.SpanSugar._ -import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} import org.apache.spark.executor.DataReadMethod import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} @@ -43,6 +43,7 @@ import scala.language.postfixOps class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester { + private val conf = new SparkConf(false) var store: BlockManager = null var store2: BlockManager = null @@ -61,21 +62,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId) + private def makeBlockManager(maxMem: Long, name: String = ""): BlockManager = { + new BlockManager( + name, actorSystem, master, serializer, maxMem, conf, securityMgr, mapOutputTracker) + } + before { - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf, - securityManager = securityMgr) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem( + "test", "localhost", 0, conf = conf, securityManager = securityMgr) this.actorSystem = actorSystem - conf.set("spark.driver.port", boundPort.toString) - - master = new BlockManagerMaster( - actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), - conf) // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case oldArch = System.setProperty("os.arch", "amd64") conf.set("os.arch", "amd64") conf.set("spark.test.useCompressedOops", "true") conf.set("spark.storage.disableBlockManagerHeartBeat", "true") + conf.set("spark.driver.port", boundPort.toString) + conf.set("spark.storage.unrollFraction", "0.4") + conf.set("spark.storage.unrollMemoryThreshold", "512") + + master = new BlockManagerMaster( + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), + conf) + val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() } @@ -138,11 +147,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("master + 1 manager interaction") { - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) - val a1 = new Array[Byte](400) - val a2 = new Array[Byte](400) - val a3 = new Array[Byte](400) + store = makeBlockManager(20000) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) @@ -169,10 +177,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("master + 2 managers interaction") { - store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) - store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf, - securityMgr, mapOutputTracker) + store = makeBlockManager(2000, "exec1") + store2 = makeBlockManager(2000, "exec2") val peers = master.getPeers(store.blockManagerId, 1) assert(peers.size === 1, "master did not return the other manager as a peer") @@ -187,11 +193,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("removing block") { - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) - val a1 = new Array[Byte](400) - val a2 = new Array[Byte](400) - val a3 = new Array[Byte](400) + store = makeBlockManager(20000) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY) @@ -200,8 +205,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter // Checking whether blocks are in memory and memory size val memStatus = master.getMemoryStatus.head._2 - assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000") - assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200") + assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000") + assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000") assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store") assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store") assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store") @@ -230,17 +235,16 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { val memStatus = master.getMemoryStatus.head._2 - memStatus._1 should equal (2000L) - memStatus._2 should equal (2000L) + memStatus._1 should equal (20000L) + memStatus._2 should equal (20000L) } } test("removing rdd") { - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) - val a1 = new Array[Byte](400) - val a2 = new Array[Byte](400) - val a3 = new Array[Byte](400) + store = makeBlockManager(20000) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) // Putting a1, a2 and a3 in memory. store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) @@ -270,11 +274,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("removing broadcast") { - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) + store = makeBlockManager(2000) val driverStore = store - val executorStore = new BlockManager("executor", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) + val executorStore = makeBlockManager(2000, "executor") val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -343,8 +345,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) + store = makeBlockManager(2000) val a1 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) @@ -380,8 +381,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter test("reregistration doesn't dead lock") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) + store = makeBlockManager(2000) val a1 = new Array[Byte](400) val a2 = List(new Array[Byte](400)) @@ -390,7 +390,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter master.removeExecutor(store.blockManagerId.executorId) val t1 = new Thread { override def run() { - store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } } val t2 = new Thread { @@ -418,19 +418,14 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("correct BlockResult returned from get() calls") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr, - mapOutputTracker) - val list1 = List(new Array[Byte](200), new Array[Byte](200)) - val list1ForSizeEstimate = new ArrayBuffer[Any] - list1ForSizeEstimate ++= list1.iterator - val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate) - val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150)) - val list2ForSizeEstimate = new ArrayBuffer[Any] - list2ForSizeEstimate ++= list2.iterator - val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate) - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + store = makeBlockManager(12000) + val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) + val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500)) + val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray) + val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray) + store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true) val list1Get = store.get("list1") assert(list1Get.isDefined, "list1 expected to be in store") assert(list1Get.get.data.size === 2) @@ -451,11 +446,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("in-memory LRU storage") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val a1 = new Array[Byte](400) - val a2 = new Array[Byte](400) - val a3 = new Array[Byte](400) + store = makeBlockManager(12000) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY) @@ -471,11 +465,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("in-memory LRU storage with serialization") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val a1 = new Array[Byte](400) - val a2 = new Array[Byte](400) - val a3 = new Array[Byte](400) + store = makeBlockManager(12000) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER) @@ -491,11 +484,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("in-memory LRU for partitions of same RDD") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val a1 = new Array[Byte](400) - val a2 = new Array[Byte](400) - val a3 = new Array[Byte](400) + store = makeBlockManager(12000) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY) store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY) store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY) @@ -511,11 +503,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("in-memory LRU for partitions of multiple RDDs") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store = makeBlockManager(12000) + store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // At this point rdd_1_1 should've replaced rdd_0_1 assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store") assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store") @@ -523,8 +514,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter // Do a get() on rdd_0_2 so that it is the most recently used item assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") // Put in more partitions from RDD 0; they should replace rdd_1_1 - store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped // when we try to add rdd_0_4. assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store") @@ -538,8 +529,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar. val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false) if (tachyonUnitTestEnabled) { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) + store = makeBlockManager(1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -555,8 +545,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("on-disk storage") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) + store = makeBlockManager(1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -569,11 +558,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("disk and memory storage") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val a1 = new Array[Byte](400) - val a2 = new Array[Byte](400) - val a3 = new Array[Byte](400) + store = makeBlockManager(12000) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) @@ -585,11 +573,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("disk and memory storage with getLocalBytes") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val a1 = new Array[Byte](400) - val a2 = new Array[Byte](400) - val a3 = new Array[Byte](400) + store = makeBlockManager(12000) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) @@ -601,11 +588,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("disk and memory storage with serialization") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val a1 = new Array[Byte](400) - val a2 = new Array[Byte](400) - val a3 = new Array[Byte](400) + store = makeBlockManager(12000) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) @@ -617,11 +603,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("disk and memory storage with serialization and getLocalBytes") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val a1 = new Array[Byte](400) - val a2 = new Array[Byte](400) - val a3 = new Array[Byte](400) + store = makeBlockManager(12000) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) @@ -633,12 +618,11 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("LRU with mixed storage levels") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val a1 = new Array[Byte](400) - val a2 = new Array[Byte](400) - val a3 = new Array[Byte](400) - val a4 = new Array[Byte](400) + store = makeBlockManager(12000) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) + val a4 = new Array[Byte](4000) // First store a1 and a2, both in memory, and a3, on disk only store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) @@ -656,14 +640,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("in-memory LRU with streams") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val list1 = List(new Array[Byte](200), new Array[Byte](200)) - val list2 = List(new Array[Byte](200), new Array[Byte](200)) - val list3 = List(new Array[Byte](200), new Array[Byte](200)) - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store = makeBlockManager(12000) + val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) + val list2 = List(new Array[Byte](2000), new Array[Byte](2000)) + val list3 = List(new Array[Byte](2000), new Array[Byte](2000)) + store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.data.size === 2) assert(store.get("list3").isDefined, "list3 was not in store") @@ -672,7 +655,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(store.get("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.data.size === 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list1").isDefined, "list1 was not in store") assert(store.get("list1").get.data.size === 2) assert(store.get("list2").isDefined, "list2 was not in store") @@ -681,16 +664,15 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("LRU with mixed storage levels and streams") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val list1 = List(new Array[Byte](200), new Array[Byte](200)) - val list2 = List(new Array[Byte](200), new Array[Byte](200)) - val list3 = List(new Array[Byte](200), new Array[Byte](200)) - val list4 = List(new Array[Byte](200), new Array[Byte](200)) + store = makeBlockManager(12000) + val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) + val list2 = List(new Array[Byte](2000), new Array[Byte](2000)) + val list3 = List(new Array[Byte](2000), new Array[Byte](2000)) + val list4 = List(new Array[Byte](2000), new Array[Byte](2000)) // First store list1 and list2, both in memory, and list3, on disk only - store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) - store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) - store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) + store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) + store.putIterator("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) val listForSizeEstimate = new ArrayBuffer[Any] listForSizeEstimate ++= list1.iterator val listSize = SizeEstimator.estimate(listForSizeEstimate) @@ -708,7 +690,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(store.get("list3").isDefined, "list3 was not in store") assert(store.get("list3").get.data.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out - store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) + store.putIterator("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2").isDefined, "list2 was not in store") assert(store.get("list2").get.data.size === 2) @@ -731,11 +713,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("overly large block") { - store = new BlockManager("", actorSystem, master, serializer, 500, conf, - securityMgr, mapOutputTracker) - store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) + store = makeBlockManager(5000) + store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") === None, "a1 was in store") - store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) + store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK) assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store") assert(store.getSingle("a2").isDefined, "a2 was not in store") } @@ -743,8 +724,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter test("block compression") { try { conf.set("spark.shuffle.compress", "true") - store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) + store = makeBlockManager(20000, "exec1") store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100, "shuffle_0_0_0 was not compressed") @@ -752,52 +732,46 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store = null conf.set("spark.shuffle.compress", "false") - store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) - store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000, + store = makeBlockManager(20000, "exec2") + store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000, "shuffle_0_0_0 was compressed") store.stop() store = null conf.set("spark.broadcast.compress", "true") - store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) - store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100, + store = makeBlockManager(20000, "exec3") + store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000, "broadcast_0 was not compressed") store.stop() store = null conf.set("spark.broadcast.compress", "false") - store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) - store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed") + store = makeBlockManager(20000, "exec4") + store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed") store.stop() store = null conf.set("spark.rdd.compress", "true") - store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) - store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed") + store = makeBlockManager(20000, "exec5") + store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed") store.stop() store = null conf.set("spark.rdd.compress", "false") - store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) - store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed") + store = makeBlockManager(20000, "exec6") + store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed") store.stop() store = null // Check that any other block types are also kept uncompressed - store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf, - securityMgr, mapOutputTracker) - store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) - assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") + store = makeBlockManager(20000, "exec7") + store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) + assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed") store.stop() store = null } finally { @@ -871,30 +845,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(Arrays.equals(mappedAsArray, bytes)) assert(Arrays.equals(notMappedAsArray, bytes)) } - + test("updated block statuses") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val list = List.fill(2)(new Array[Byte](200)) - val bigList = List.fill(8)(new Array[Byte](200)) + store = makeBlockManager(12000) + val list = List.fill(2)(new Array[Byte](2000)) + val bigList = List.fill(8)(new Array[Byte](2000)) // 1 updated block (i.e. list1) val updatedBlocks1 = - store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(updatedBlocks1.size === 1) assert(updatedBlocks1.head._1 === TestBlockId("list1")) assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY) // 1 updated block (i.e. list2) val updatedBlocks2 = - store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) assert(updatedBlocks2.size === 1) assert(updatedBlocks2.head._1 === TestBlockId("list2")) assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY) // 2 updated blocks - list1 is kicked out of memory while list3 is added val updatedBlocks3 = - store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(updatedBlocks3.size === 2) updatedBlocks3.foreach { case (id, status) => id match { @@ -903,11 +876,11 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter case _ => fail("Updated block is neither list1 nor list3") } } - assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.memoryStore.contains("list3"), "list3 was not in memory store") // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added val updatedBlocks4 = - store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(updatedBlocks4.size === 2) updatedBlocks4.foreach { case (id, status) => id match { @@ -916,26 +889,37 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter case _ => fail("Updated block is neither list2 nor list4") } } - assert(store.get("list4").isDefined, "list4 was not in store") + assert(store.diskStore.contains("list2"), "list2 was not in disk store") + assert(store.memoryStore.contains("list4"), "list4 was not in memory store") - // No updated blocks - nothing is kicked out of memory because list5 is too big to be added + // No updated blocks - list5 is too big to fit in store and nothing is kicked out val updatedBlocks5 = - store.put("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(updatedBlocks5.size === 0) - assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list4").isDefined, "list4 was not in store") - assert(!store.get("list5").isDefined, "list5 was in store") + + // memory store contains only list3 and list4 + assert(!store.memoryStore.contains("list1"), "list1 was in memory store") + assert(!store.memoryStore.contains("list2"), "list2 was in memory store") + assert(store.memoryStore.contains("list3"), "list3 was not in memory store") + assert(store.memoryStore.contains("list4"), "list4 was not in memory store") + assert(!store.memoryStore.contains("list5"), "list5 was in memory store") + + // disk store contains only list2 + assert(!store.diskStore.contains("list1"), "list1 was in disk store") + assert(store.diskStore.contains("list2"), "list2 was not in disk store") + assert(!store.diskStore.contains("list3"), "list3 was in disk store") + assert(!store.diskStore.contains("list4"), "list4 was in disk store") + assert(!store.diskStore.contains("list5"), "list5 was in disk store") } test("query block statuses") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val list = List.fill(2)(new Array[Byte](200)) + store = makeBlockManager(12000) + val list = List.fill(2)(new Array[Byte](2000)) // Tell master. By LRU, only list2 and list3 remains. - store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) - store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) // getLocations and getBlockStatus should yield the same locations assert(store.master.getLocations("list1").size === 0) @@ -949,9 +933,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1) // This time don't tell master and see what happens. By LRU, only list5 and list6 remains. - store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) - store.put("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) - store.put("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) + store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) + store.putIterator("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) + store.putIterator("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) // getLocations should return nothing because the master is not informed // getBlockStatus without asking slaves should have the same result @@ -968,23 +952,22 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("get matching blocks") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - val list = List.fill(2)(new Array[Byte](10)) + store = makeBlockManager(12000) + val list = List.fill(2)(new Array[Byte](100)) // insert some blocks - store.put("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.put("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIterator("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) // getLocations and getBlockStatus should yield the same locations assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size === 3) assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1) // insert some more blocks - store.put("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) - store.put("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) - store.put("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) + store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.putIterator("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) + store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) // getLocations and getBlockStatus should yield the same locations assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1) @@ -992,7 +975,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0)) blockIds.foreach { blockId => - store.put(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.putIterator(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } val matchedBlockIds = store.master.getMatchingBlockIds(_ match { case RDDBlockId(1, _) => true @@ -1002,17 +985,240 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, - securityMgr, mapOutputTracker) - store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY) - store.putSingle(rdd(1, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store = makeBlockManager(12000) + store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // Access rdd_1_0 to ensure it's not least recently used. assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") // According to the same-RDD rule, rdd_1_0 should be replaced here. - store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) // rdd_1_0 should have been replaced, even it's not least recently used. assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store") assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store") assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") } + + test("reserve/release unroll memory") { + store = makeBlockManager(12000) + val memoryStore = store.memoryStore + assert(memoryStore.currentUnrollMemory === 0) + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + + // Reserve + memoryStore.reserveUnrollMemoryForThisThread(100) + assert(memoryStore.currentUnrollMemoryForThisThread === 100) + memoryStore.reserveUnrollMemoryForThisThread(200) + assert(memoryStore.currentUnrollMemoryForThisThread === 300) + memoryStore.reserveUnrollMemoryForThisThread(500) + assert(memoryStore.currentUnrollMemoryForThisThread === 800) + memoryStore.reserveUnrollMemoryForThisThread(1000000) + assert(memoryStore.currentUnrollMemoryForThisThread === 800) // not granted + // Release + memoryStore.releaseUnrollMemoryForThisThread(100) + assert(memoryStore.currentUnrollMemoryForThisThread === 700) + memoryStore.releaseUnrollMemoryForThisThread(100) + assert(memoryStore.currentUnrollMemoryForThisThread === 600) + // Reserve again + memoryStore.reserveUnrollMemoryForThisThread(4400) + assert(memoryStore.currentUnrollMemoryForThisThread === 5000) + memoryStore.reserveUnrollMemoryForThisThread(20000) + assert(memoryStore.currentUnrollMemoryForThisThread === 5000) // not granted + // Release again + memoryStore.releaseUnrollMemoryForThisThread(1000) + assert(memoryStore.currentUnrollMemoryForThisThread === 4000) + memoryStore.releaseUnrollMemoryForThisThread() // release all + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + } + + /** + * Verify the result of MemoryStore#unrollSafely is as expected. + */ + private def verifyUnroll( + expected: Iterator[Any], + result: Either[Array[Any], Iterator[Any]], + shouldBeArray: Boolean): Unit = { + val actual: Iterator[Any] = result match { + case Left(arr: Array[Any]) => + assert(shouldBeArray, "expected iterator from unroll!") + arr.iterator + case Right(it: Iterator[Any]) => + assert(!shouldBeArray, "expected array from unroll!") + it + case _ => + fail("unroll returned neither an iterator nor an array...") + } + expected.zip(actual).foreach { case (e, a) => + assert(e === a, "unroll did not return original values!") + } + } + + test("safely unroll blocks") { + store = makeBlockManager(12000) + val smallList = List.fill(40)(new Array[Byte](100)) + val bigList = List.fill(40)(new Array[Byte](1000)) + val memoryStore = store.memoryStore + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + + // Unroll with all the space in the world. This should succeed and return an array. + var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) + verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + + // Unroll with not enough space. This should succeed after kicking out someBlock1. + store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) + store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) + unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) + verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(droppedBlocks.size === 1) + assert(droppedBlocks.head._1 === TestBlockId("someBlock1")) + droppedBlocks.clear() + + // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = + // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. + // In the mean time, however, we kicked out someBlock2 before giving up. + store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) + unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks) + verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false) + assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator + assert(droppedBlocks.size === 1) + assert(droppedBlocks.head._1 === TestBlockId("someBlock2")) + droppedBlocks.clear() + } + + test("safely unroll blocks through putIterator") { + store = makeBlockManager(12000) + val memOnly = StorageLevel.MEMORY_ONLY + val memoryStore = store.memoryStore + val smallList = List.fill(40)(new Array[Byte](100)) + val bigList = List.fill(40)(new Array[Byte](1000)) + def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]] + def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]] + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + + // Unroll with plenty of space. This should succeed and cache both blocks. + val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true) + val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true) + assert(memoryStore.contains("b1")) + assert(memoryStore.contains("b2")) + assert(result1.size > 0) // unroll was successful + assert(result2.size > 0) + assert(result1.data.isLeft) // unroll did not drop this block to disk + assert(result2.data.isLeft) + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + + // Re-put these two blocks so block manager knows about them too. Otherwise, block manager + // would not know how to drop them from memory later. + memoryStore.remove("b1") + memoryStore.remove("b2") + store.putIterator("b1", smallIterator, memOnly) + store.putIterator("b2", smallIterator, memOnly) + + // Unroll with not enough space. This should succeed but kick out b1 in the process. + val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true) + assert(result3.size > 0) + assert(result3.data.isLeft) + assert(!memoryStore.contains("b1")) + assert(memoryStore.contains("b2")) + assert(memoryStore.contains("b3")) + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + memoryStore.remove("b3") + store.putIterator("b3", smallIterator, memOnly) + + // Unroll huge block with not enough space. This should fail and kick out b2 in the process. + val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true) + assert(result4.size === 0) // unroll was unsuccessful + assert(result4.data.isLeft) + assert(!memoryStore.contains("b1")) + assert(!memoryStore.contains("b2")) + assert(memoryStore.contains("b3")) + assert(!memoryStore.contains("b4")) + assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator + } + + /** + * This test is essentially identical to the preceding one, except that it uses MEMORY_AND_DISK. + */ + test("safely unroll blocks through putIterator (disk)") { + store = makeBlockManager(12000) + val memAndDisk = StorageLevel.MEMORY_AND_DISK + val memoryStore = store.memoryStore + val diskStore = store.diskStore + val smallList = List.fill(40)(new Array[Byte](100)) + val bigList = List.fill(40)(new Array[Byte](1000)) + def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]] + def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]] + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + + store.putIterator("b1", smallIterator, memAndDisk) + store.putIterator("b2", smallIterator, memAndDisk) + + // Unroll with not enough space. This should succeed but kick out b1 in the process. + // Memory store should contain b2 and b3, while disk store should contain only b1 + val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, returnValues = true) + assert(result3.size > 0) + assert(!memoryStore.contains("b1")) + assert(memoryStore.contains("b2")) + assert(memoryStore.contains("b3")) + assert(diskStore.contains("b1")) + assert(!diskStore.contains("b2")) + assert(!diskStore.contains("b3")) + memoryStore.remove("b3") + store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY) + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + + // Unroll huge block with not enough space. This should fail and drop the new block to disk + // directly in addition to kicking out b2 in the process. Memory store should contain only + // b3, while disk store should contain b1, b2 and b4. + val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true) + assert(result4.size > 0) + assert(result4.data.isRight) // unroll returned bytes from disk + assert(!memoryStore.contains("b1")) + assert(!memoryStore.contains("b2")) + assert(memoryStore.contains("b3")) + assert(!memoryStore.contains("b4")) + assert(diskStore.contains("b1")) + assert(diskStore.contains("b2")) + assert(!diskStore.contains("b3")) + assert(diskStore.contains("b4")) + assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator + } + + test("multiple unrolls by the same thread") { + store = makeBlockManager(12000) + val memOnly = StorageLevel.MEMORY_ONLY + val memoryStore = store.memoryStore + val smallList = List.fill(40)(new Array[Byte](100)) + def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]] + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + + // All unroll memory used is released because unrollSafely returned an array + memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true) + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true) + assert(memoryStore.currentUnrollMemoryForThisThread === 0) + + // Unroll memory is not released because unrollSafely returned an iterator + // that still depends on the underlying vector used in the process + memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true) + val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisThread + assert(unrollMemoryAfterB3 > 0) + + // The unroll memory owned by this thread builds on top of its value after the previous unrolls + memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true) + val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisThread + assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) + + // ... but only to a certain extent (until we run out of free space to grant new unroll memory) + memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true) + val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisThread + memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true) + val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisThread + memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true) + val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisThread + assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) + assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) + assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) + } } diff --git a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala deleted file mode 100644 index 93f0c6a8e6408..0000000000000 --- a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import scala.util.Random - -import org.scalatest.{BeforeAndAfterAll, FunSuite} - -import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass -import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap} - -class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll { - val NORMAL_ERROR = 0.20 - val HIGH_ERROR = 0.30 - - test("fixed size insertions") { - testWith[Int, Long](10000, i => (i, i.toLong)) - testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong))) - testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass())) - } - - test("variable size insertions") { - val rand = new Random(123456789) - def randString(minLen: Int, maxLen: Int): String = { - "a" * (rand.nextInt(maxLen - minLen) + minLen) - } - testWith[Int, String](10000, i => (i, randString(0, 10))) - testWith[Int, String](10000, i => (i, randString(0, 100))) - testWith[Int, String](10000, i => (i, randString(90, 100))) - } - - test("updates") { - val rand = new Random(123456789) - def randString(minLen: Int, maxLen: Int): String = { - "a" * (rand.nextInt(maxLen - minLen) + minLen) - } - testWith[String, Int](10000, i => (randString(0, 10000), i)) - } - - def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) { - val map = new SizeTrackingAppendOnlyMap[K, V]() - for (i <- 0 until numElements) { - val (k, v) = makeElement(i) - map(k) = v - expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR) - } - } - - def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) { - val betterEstimatedSize = SizeEstimator.estimate(obj) - assert(betterEstimatedSize * (1 - error) < estimatedSize, - s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize") - assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize, - s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize") - } -} - -object SizeTrackingAppendOnlyMapSuite { - // Speed test, for reproducibility of results. - // These could be highly non-deterministic in general, however. - // Results: - // AppendOnlyMap: 31 ms - // SizeTracker: 54 ms - // SizeEstimator: 1500 ms - def main(args: Array[String]) { - val numElements = 100000 - - val baseTimes = for (i <- 0 until 10) yield time { - val map = new AppendOnlyMap[Int, LargeDummyClass]() - for (i <- 0 until numElements) { - map(i) = new LargeDummyClass() - } - } - - val sampledTimes = for (i <- 0 until 10) yield time { - val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]() - for (i <- 0 until numElements) { - map(i) = new LargeDummyClass() - map.estimateSize() - } - } - - val unsampledTimes = for (i <- 0 until 3) yield time { - val map = new AppendOnlyMap[Int, LargeDummyClass]() - for (i <- 0 until numElements) { - map(i) = new LargeDummyClass() - SizeEstimator.estimate(map) - } - } - - println("Base: " + baseTimes) - println("SizeTracker (sampled): " + sampledTimes) - println("SizeEstimator (unsampled): " + unsampledTimes) - } - - def time(f: => Unit): Long = { - val start = System.currentTimeMillis() - f - System.currentTimeMillis() - start - } - - private class LargeDummyClass { - val arr = new Array[Int](100) - } -} diff --git a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala new file mode 100644 index 0000000000000..1f33967249654 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.reflect.ClassTag +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.util.SizeEstimator + +class SizeTrackerSuite extends FunSuite { + val NORMAL_ERROR = 0.20 + val HIGH_ERROR = 0.30 + + import SizeTrackerSuite._ + + test("vector fixed size insertions") { + testVector[Long](10000, i => i.toLong) + testVector[(Long, Long)](10000, i => (i.toLong, i.toLong)) + testVector[LargeDummyClass](10000, i => new LargeDummyClass) + } + + test("vector variable size insertions") { + val rand = new Random(123456789) + def randString(minLen: Int, maxLen: Int): String = { + "a" * (rand.nextInt(maxLen - minLen) + minLen) + } + testVector[String](10000, i => randString(0, 10)) + testVector[String](10000, i => randString(0, 100)) + testVector[String](10000, i => randString(90, 100)) + } + + test("map fixed size insertions") { + testMap[Int, Long](10000, i => (i, i.toLong)) + testMap[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong))) + testMap[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass)) + } + + test("map variable size insertions") { + val rand = new Random(123456789) + def randString(minLen: Int, maxLen: Int): String = { + "a" * (rand.nextInt(maxLen - minLen) + minLen) + } + testMap[Int, String](10000, i => (i, randString(0, 10))) + testMap[Int, String](10000, i => (i, randString(0, 100))) + testMap[Int, String](10000, i => (i, randString(90, 100))) + } + + test("map updates") { + val rand = new Random(123456789) + def randString(minLen: Int, maxLen: Int): String = { + "a" * (rand.nextInt(maxLen - minLen) + minLen) + } + testMap[String, Int](10000, i => (randString(0, 10000), i)) + } + + def testVector[T: ClassTag](numElements: Int, makeElement: Int => T) { + val vector = new SizeTrackingVector[T] + for (i <- 0 until numElements) { + val item = makeElement(i) + vector += item + expectWithinError(vector, vector.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR) + } + } + + def testMap[K, V](numElements: Int, makeElement: (Int) => (K, V)) { + val map = new SizeTrackingAppendOnlyMap[K, V] + for (i <- 0 until numElements) { + val (k, v) = makeElement(i) + map(k) = v + expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR) + } + } + + def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) { + val betterEstimatedSize = SizeEstimator.estimate(obj) + assert(betterEstimatedSize * (1 - error) < estimatedSize, + s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize") + assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize, + s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize") + } +} + +private object SizeTrackerSuite { + + /** + * Run speed tests for size tracking collections. + */ + def main(args: Array[String]): Unit = { + if (args.size < 1) { + println("Usage: SizeTrackerSuite [num elements]") + System.exit(1) + } + val numElements = args(0).toInt + vectorSpeedTest(numElements) + mapSpeedTest(numElements) + } + + /** + * Speed test for SizeTrackingVector. + * + * Results for 100000 elements (possibly non-deterministic): + * PrimitiveVector 15 ms + * SizeTracker 51 ms + * SizeEstimator 2000 ms + */ + def vectorSpeedTest(numElements: Int): Unit = { + val baseTimes = for (i <- 0 until 10) yield time { + val vector = new PrimitiveVector[LargeDummyClass] + for (i <- 0 until numElements) { + vector += new LargeDummyClass + } + } + val sampledTimes = for (i <- 0 until 10) yield time { + val vector = new SizeTrackingVector[LargeDummyClass] + for (i <- 0 until numElements) { + vector += new LargeDummyClass + vector.estimateSize() + } + } + val unsampledTimes = for (i <- 0 until 3) yield time { + val vector = new PrimitiveVector[LargeDummyClass] + for (i <- 0 until numElements) { + vector += new LargeDummyClass + SizeEstimator.estimate(vector) + } + } + printSpeedTestResult("SizeTrackingVector", baseTimes, sampledTimes, unsampledTimes) + } + + /** + * Speed test for SizeTrackingAppendOnlyMap. + * + * Results for 100000 elements (possibly non-deterministic): + * AppendOnlyMap 30 ms + * SizeTracker 41 ms + * SizeEstimator 1666 ms + */ + def mapSpeedTest(numElements: Int): Unit = { + val baseTimes = for (i <- 0 until 10) yield time { + val map = new AppendOnlyMap[Int, LargeDummyClass] + for (i <- 0 until numElements) { + map(i) = new LargeDummyClass + } + } + val sampledTimes = for (i <- 0 until 10) yield time { + val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass] + for (i <- 0 until numElements) { + map(i) = new LargeDummyClass + map.estimateSize() + } + } + val unsampledTimes = for (i <- 0 until 3) yield time { + val map = new AppendOnlyMap[Int, LargeDummyClass] + for (i <- 0 until numElements) { + map(i) = new LargeDummyClass + SizeEstimator.estimate(map) + } + } + printSpeedTestResult("SizeTrackingAppendOnlyMap", baseTimes, sampledTimes, unsampledTimes) + } + + def printSpeedTestResult( + testName: String, + baseTimes: Seq[Long], + sampledTimes: Seq[Long], + unsampledTimes: Seq[Long]): Unit = { + println(s"Average times for $testName (ms):") + println(" Base - " + averageTime(baseTimes)) + println(" SizeTracker (sampled) - " + averageTime(sampledTimes)) + println(" SizeEstimator (unsampled) - " + averageTime(unsampledTimes)) + println() + } + + def time(f: => Unit): Long = { + val start = System.currentTimeMillis() + f + System.currentTimeMillis() - start + } + + def averageTime(v: Seq[Long]): Long = { + v.sum / v.size + } + + private class LargeDummyClass { + val arr = new Array[Int](100) + } +} diff --git a/docs/configuration.md b/docs/configuration.md index 46e3dd914b5ac..2e6c85cc2bcca 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -480,6 +480,15 @@ Apart from these, the following properties are also available, and may be useful increase it if you configure your own old generation size. + + spark.storage.unrollFraction + 0.2 + + Fraction of spark.storage.memoryFraction to use for unrolling blocks in memory. + This is dynamically allocated by dropping existing blocks when there is not enough free + storage space to unroll the new block in its entirety. + + spark.tachyonStore.baseDir System.getProperty("java.io.tmpdir") diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index e9220db6b1f9a..5ff88f0dd1cac 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -31,7 +31,6 @@ import com.typesafe.tools.mima.core._ * MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap") */ object MimaExcludes { - def excludes(version: String) = version match { case v if v.startsWith("1.1") => @@ -62,6 +61,15 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.storage.MemoryStore.Entry") ) ++ + Seq( + // Renamed putValues -> putArray + putIterator + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.storage.MemoryStore.putValues"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.storage.DiskStore.putValues"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.storage.TachyonStore.putValues") + ) ++ Seq( ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this") ) ++ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index ce8316bb14891..d934b9cbfc3e8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -110,8 +110,7 @@ private[streaming] class ReceiverSupervisorImpl( ) { val blockId = optionalBlockId.getOrElse(nextBlockId) val time = System.currentTimeMillis - blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], - storageLevel, tellMaster = true) + blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true) logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms") reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata) } @@ -124,7 +123,7 @@ private[streaming] class ReceiverSupervisorImpl( ) { val blockId = optionalBlockId.getOrElse(nextBlockId) val time = System.currentTimeMillis - blockManager.put(blockId, iterator, storageLevel, tellMaster = true) + blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true) logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms") reportPushedBlock(blockId, -1, optionalMetadata) }