From f149147f0283d8c0e53bb850b1548ee6d84d8a70 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 30 Jun 2015 00:35:11 +0800 Subject: [PATCH 01/13] init commit --- .../main/java/org/apache/spark/Spillable.java | 30 ++ .../spark/shuffle/ShuffleMemoryManager.scala | 42 ++- ...llable.scala => CollectionSpillable.scala} | 21 +- .../collection/ExternalAppendOnlyMap.scala | 84 +++++- .../util/collection/ExternalSorter.scala | 259 +++++++++++++----- 5 files changed, 337 insertions(+), 99 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/Spillable.java rename core/src/main/scala/org/apache/spark/util/collection/{Spillable.scala => CollectionSpillable.scala} (89%) diff --git a/core/src/main/java/org/apache/spark/Spillable.java b/core/src/main/java/org/apache/spark/Spillable.java new file mode 100644 index 0000000000000..a343b93f0f1da --- /dev/null +++ b/core/src/main/java/org/apache/spark/Spillable.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +/** + * Force to spill contents of memory buffer to disk and release its memory + */ +public interface Spillable { + + /** + * force to spill contents of memory buffer to disk + * @return numBytes bytes of spilled + */ + public long forceSpill(); +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index 3bcc7178a3d8b..80b5c677f0c66 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.shuffle import scala.collection.mutable -import org.apache.spark.{Logging, SparkException, SparkConf} +import org.apache.spark._ /** * Allocates a pool of memory to task threads for use in shuffle operations. Each disk-spilling @@ -38,8 +38,47 @@ import org.apache.spark.{Logging, SparkException, SparkConf} private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging { private val threadMemory = new mutable.HashMap[Long, Long]() // threadId -> memory bytes + // threadId -> memory reserved list + private val threadReservedList = new mutable.HashMap[Long, mutable.ListBuffer[Spillable]]() + def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf)) + /** + * release other Spillable's memory of current thread until freeMemory >= requestedMemory + */ + def releaseReservedMemory(toGrant: Long, requestedAmount: Long): Long = synchronized { + val threadId = Thread.currentThread().getId + if (toGrant >= requestedAmount || !threadReservedList.contains(threadId)){ + toGrant + } else { + //try to spill objs in current thread to make space for new request + var addedMemory = toGrant + while(addedMemory < requestedAmount && !threadReservedList(threadId).isEmpty ) { + val toSpill = threadReservedList(threadId).remove(0) + val spillMemory = toSpill.forceSpill() + logInfo(s"Thread $threadId forceSpill $spillMemory bytes to be free") + addedMemory += spillMemory + } + if (addedMemory > requestedAmount) { + this.release(addedMemory - requestedAmount) + addedMemory = requestedAmount + } + addedMemory + } + } + + /** + * add Spillable to memoryReservedList of current thread, when current thread has + * no enough memory, we can release memory of current thread's memory reserved list + */ + def addSpillableToReservedList(spill: Spillable) = synchronized { + val threadId = Thread.currentThread().getId + if (!threadReservedList.contains(threadId)) { + threadReservedList(threadId) = new mutable.ListBuffer[Spillable]() + } + threadReservedList(threadId) += spill + } + /** * Try to acquire up to numBytes memory for the current thread, and return the number of bytes * obtained, or 0 if none can be allocated. This call may block until there is enough free memory @@ -108,6 +147,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging { def releaseMemoryForThisThread(): Unit = synchronized { val threadId = Thread.currentThread().getId threadMemory.remove(threadId) + threadReservedList.remove(threadId) notifyAll() // Notify waiters who locked "this" in tryToAcquire that memory has been freed } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala similarity index 89% rename from core/src/main/scala/org/apache/spark/util/collection/Spillable.scala rename to core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala index 747ecf075a397..671a41bde13a7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala @@ -17,14 +17,15 @@ package org.apache.spark.util.collection -import org.apache.spark.Logging -import org.apache.spark.SparkEnv +import org.apache.spark.{Logging, SparkEnv, Spillable} + +import scala.reflect.ClassTag /** * Spills contents of an in-memory collection to disk when the memory threshold * has been exceeded. */ -private[spark] trait Spillable[C] extends Logging { +private[spark] trait CollectionSpillable[C] extends Logging with Spillable{ /** * Spills the current in-memory collection to disk, and releases the memory. * @@ -40,25 +41,25 @@ private[spark] trait Spillable[C] extends Logging { protected def addElementsRead(): Unit = { _elementsRead += 1 } // Memory manager that can be used to acquire/release memory - private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager + protected val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager // Initial threshold for the size of a collection before we start tracking its memory usage // Exposed for testing - private[this] val initialMemoryThreshold: Long = + protected val initialMemoryThreshold: Long = SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024) // Threshold for this collection's size in bytes before we start tracking its memory usage // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0 - private[this] var myMemoryThreshold = initialMemoryThreshold + protected var myMemoryThreshold = initialMemoryThreshold // Number of elements read from input since last spill - private[this] var _elementsRead = 0L + protected var _elementsRead = 0L // Number of bytes spilled in total - private[this] var _memoryBytesSpilled = 0L + protected var _memoryBytesSpilled = 0L // Number of spills - private[this] var _spillCount = 0 + protected var _spillCount = 0 /** * Spills the current in-memory collection to disk if needed. Attempts to acquire more @@ -111,7 +112,7 @@ private[spark] trait Spillable[C] extends Logging { * * @param size number of bytes spilled */ - @inline private def logSpillage(size: Long) { + @inline protected def logSpillage(size: Long) { val threadId = Thread.currentThread().getId logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)" .format(threadId, org.apache.spark.util.Utils.bytesToString(size), diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 1e4531ef395ae..d58e74390e0cc 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -69,7 +69,7 @@ class ExternalAppendOnlyMap[K, V, C]( extends Iterable[(K, C)] with Serializable with Logging - with Spillable[SizeTracker] { + with CollectionSpillable[SizeTracker] { private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] @@ -100,6 +100,8 @@ class ExternalAppendOnlyMap[K, V, C]( private val keyComparator = new HashComparator[K] private val ser = serializer.newInstance() + private var memoryOrDiskIter: Option[MemoryOrDiskIterator] = None + /** * Insert the given key and value into the map. */ @@ -151,6 +153,30 @@ class ExternalAppendOnlyMap[K, V, C]( * Sort the existing contents of the in-memory map and spill them to a temporary file on disk. */ override protected[this] def spill(collection: SizeTracker): Unit = { + val it = currentMap.destructiveSortedIterator(keyComparator) + spilledMaps.append(spillMemoryToDisk(it)) + } + + def diskBytesSpilled: Long = _diskBytesSpilled + + /** + * Return an iterator that merges the in-memory map with the spilled maps. + * If no spill has occurred, simply return the in-memory map's iterator. + */ + override def iterator: Iterator[(K, C)] = { + shuffleMemoryManager.addSpillableToReservedList(this) + if (spilledMaps.isEmpty) { + memoryOrDiskIter = Some(MemoryOrDiskIterator(currentMap.iterator)) + memoryOrDiskIter.get + } else { + new ExternalIterator() + } + } + + /** + * spill contents of the in-memory map to a temporary file on disk. + */ + private[this] def spillMemoryToDisk(it: Iterator[(K, C)]): DiskMapIterator = { val (blockId, file) = diskBlockManager.createTempLocalBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) @@ -171,7 +197,6 @@ class ExternalAppendOnlyMap[K, V, C]( var success = false try { - val it = currentMap.destructiveSortedIterator(keyComparator) while (it.hasNext) { val kv = it.next() writer.write(kv._1, kv._2) @@ -203,21 +228,52 @@ class ExternalAppendOnlyMap[K, V, C]( } } } - - spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes)) + new DiskMapIterator(file, blockId, batchSizes) } - def diskBytesSpilled: Long = _diskBytesSpilled + /** + * spill contents of memory map to disk + */ + override def forceSpill(): Long = { + var freeMemory = 0L + if (memoryOrDiskIter.isDefined) { + _spillCount += 1 + logSpillage(currentMap.estimateSize()) + + memoryOrDiskIter.get.spill() + + _elementsRead = 0 + _memoryBytesSpilled += currentMap.estimateSize() + freeMemory = myMemoryThreshold - initialMemoryThreshold + myMemoryThreshold = initialMemoryThreshold + } + + freeMemory + } /** - * Return an iterator that merges the in-memory map with the spilled maps. - * If no spill has occurred, simply return the in-memory map's iterator. + * An iterator that read the elements from the in-memory iterator or the disk iterator after + * spilling contents of in-memory iterator to disk. */ - override def iterator: Iterator[(K, C)] = { - if (spilledMaps.isEmpty) { - currentMap.iterator - } else { - new ExternalIterator() + case class MemoryOrDiskIterator(memIt: Iterator[(K,C)]) extends Iterator[(K,C)] { + + var currentIt = memIt + + override def hasNext: Boolean = currentIt.hasNext + + override def next(): (K, C) = currentIt.next() + + def spill() = { + if (hasNext) { + currentIt = spillMemoryToDisk(currentIt) + } else { + //the memory iterator is already drained, release it by giving an empty iterator + currentIt = new Iterator[(K,C)]{ + override def hasNext: Boolean = false + override def next(): (K, C) = null + } + logInfo("nothing in memory iterator, do nothing") + } } } @@ -232,7 +288,9 @@ class ExternalAppendOnlyMap[K, V, C]( // Input streams are derived both from the in-memory map and spilled maps on disk // The in-memory map is sorted in place, while the spilled maps are already in sorted order - private val sortedMap = currentMap.destructiveSortedIterator(keyComparator) + memoryOrDiskIter = Some(MemoryOrDiskIterator( + currentMap.destructiveSortedIterator(keyComparator))) + private val sortedMap = memoryOrDiskIter.get private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered) inputStreams.foreach { it => diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 757dec66c203b..922a1b23fa5e4 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -93,7 +93,7 @@ private[spark] class ExternalSorter[K, V, C]( ordering: Option[Ordering[K]] = None, serializer: Option[Serializer] = None) extends Logging - with Spillable[WritablePartitionedPairCollection[K, C]] + with CollectionSpillable[WritablePartitionedPairCollection[K, C]] with SortShuffleFileWriter[K, V] { private val conf = SparkEnv.get.conf @@ -148,6 +148,9 @@ private[spark] class ExternalSorter[K, V, C]( private var map = new PartitionedAppendOnlyMap[K, C] private var buffer = newBuffer() + private var memoryOrDiskIter: Option[MemoryOrDiskIterator] = None + private var isShuffleSort: Boolean = true + // Total spilling statistics private var _diskBytesSpilled = 0L def diskBytesSpilled: Long = _diskBytesSpilled @@ -242,76 +245,9 @@ private[spark] class ExternalSorter[K, V, C]( * @param collection whichever collection we're using (map or buffer) */ override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { - // Because these files may be read during shuffle, their compression must be controlled by - // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use - // createTempShuffleBlock here; see SPARK-3426 for more context. - val (blockId, file) = diskBlockManager.createTempShuffleBlock() - - // These variables are reset after each flush - var objectsWritten: Long = 0 - var spillMetrics: ShuffleWriteMetrics = null - var writer: BlockObjectWriter = null - def openWriter(): Unit = { - assert (writer == null && spillMetrics == null) - spillMetrics = new ShuffleWriteMetrics - writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) - } - openWriter() - - // List of batch sizes (bytes) in the order they are written to disk - val batchSizes = new ArrayBuffer[Long] - - // How many elements we have in each partition - val elementsPerPartition = new Array[Long](numPartitions) - - // Flush the disk writer's contents to disk, and update relevant variables. - // The writer is closed at the end of this process, and cannot be reused. - def flush(): Unit = { - val w = writer - writer = null - w.commitAndClose() - _diskBytesSpilled += spillMetrics.shuffleBytesWritten - batchSizes.append(spillMetrics.shuffleBytesWritten) - spillMetrics = null - objectsWritten = 0 - } - - var success = false - try { - val it = collection.destructiveSortedWritablePartitionedIterator(comparator) - while (it.hasNext) { - val partitionId = it.nextPartition() - it.writeNext(writer) - elementsPerPartition(partitionId) += 1 - objectsWritten += 1 - - if (objectsWritten == serializerBatchSize) { - flush() - openWriter() - } - } - if (objectsWritten > 0) { - flush() - } else if (writer != null) { - val w = writer - writer = null - w.revertPartialWritesAndClose() - } - success = true - } finally { - if (!success) { - // This code path only happens if an exception was thrown above before we set success; - // close our stuff and let the exception be thrown further - if (writer != null) { - writer.revertPartialWritesAndClose() - } - if (file.exists()) { - file.delete() - } - } - } - - spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)) + val it = collection.destructiveSortedWritablePartitionedIterator(comparator) + val spillFile = spillMemoryToDisk(it) + spills.append(spillFile) } /** @@ -602,6 +538,163 @@ private[spark] class ExternalSorter[K, V, C]( } } + /** + * spill contents of the in-memory map to a temporary file on disk. + */ + private def spillMemoryToDisk(it: WritablePartitionedIterator): SpilledFile = { + // Because these files may be read during shuffle, their compression must be controlled by + // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use + // createTempShuffleBlock here; see SPARK-3426 for more context. + val (blockId, file) = diskBlockManager.createTempShuffleBlock() + + // These variables are reset after each flush + var objectsWritten: Long = 0 + var spillMetrics: ShuffleWriteMetrics = null + var writer: BlockObjectWriter = null + def openWriter(): Unit = { + assert (writer == null && spillMetrics == null) + spillMetrics = new ShuffleWriteMetrics + writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) + } + openWriter() + + // List of batch sizes (bytes) in the order they are written to disk + val batchSizes = new ArrayBuffer[Long] + + // How many elements we have in each partition + val elementsPerPartition = new Array[Long](numPartitions) + + // Flush the disk writer's contents to disk, and update relevant variables. + // The writer is closed at the end of this process, and cannot be reused. + def flush(): Unit = { + val w = writer + writer = null + w.commitAndClose() + _diskBytesSpilled += spillMetrics.shuffleBytesWritten + batchSizes.append(spillMetrics.shuffleBytesWritten) + spillMetrics = null + objectsWritten = 0 + } + + var success = false + try { + + while (it.hasNext) { + val partitionId = it.nextPartition() + it.writeNext(writer) + elementsPerPartition(partitionId) += 1 + objectsWritten += 1 + + if (objectsWritten == serializerBatchSize) { + flush() + openWriter() + } + } + if (objectsWritten > 0) { + flush() + } else if (writer != null) { + val w = writer + writer = null + w.revertPartialWritesAndClose() + } + success = true + } finally { + if (!success) { + // This code path only happens if an exception was thrown above before we set success; + // close our stuff and let the exception be thrown further + if (writer != null) { + writer.revertPartialWritesAndClose() + } + if (file.exists()) { + file.delete() + } + } + } + SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition) + } + + /** + * Spill in-memory iterator to a temporary file on disk. + * Return an iterator over a temporary file on disk. + */ + private[this] def spillMemoryToDisk(currentIt: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = { + val it = new WritablePartitionedIterator { + private[this] var cur = if (currentIt.hasNext) currentIt.next() else null + + def writeNext(writer: BlockObjectWriter): Unit = { + writer.write(cur._1._2, cur._2) + cur = if (currentIt.hasNext) currentIt.next() else null + } + + def hasNext(): Boolean = cur != null + + def nextPartition(): Int = cur._1._1 + } + + val spillReader = new SpillReader(spillMemoryToDisk(it)) + + (0 until numPartitions).iterator.flatMap { p => + val iterator = spillReader.readNextPartition() + iterator.map{cur => ((p, cur._1), cur._2)} + } + } + + /** + * An iterator that read the elements from the in-memory iterator or the disk iterator after + * spilling contents of in-memory iterator to disk. + */ + case class MemoryOrDiskIterator(memIt: Iterator[((Int, K), C)]) extends Iterator[((Int, K), C)] { + + var currentIt = memIt + + override def hasNext: Boolean = currentIt.hasNext + + override def next(): ((Int, K), C) = currentIt.next() + + def spill() = { + if (hasNext) { + currentIt = spillMemoryToDisk(currentIt) + } else { + //the memory iterator is already drained, release it by giving an empty iterator + currentIt = new Iterator[((Int, K), C)]{ + override def hasNext: Boolean = false + override def next(): ((Int, K), C) = null + } + logInfo("nothing in memory iterator, do nothing") + } + } + } + + /** + * spill contents of memory map to disk + */ + override def forceSpill(): Long = { + var freeMemory = 0L + if (memoryOrDiskIter.isDefined) { + //has memory buffer that can be spilled + _spillCount += 1 + + val shouldCombine = aggregator.isDefined + if (shouldCombine) { + logSpillage(map.estimateSize()) + } else { + logSpillage(buffer.estimateSize()) + } + + memoryOrDiskIter.get.spill() + + _elementsRead = 0 + if (shouldCombine) { + _memoryBytesSpilled += map.estimateSize() + } else { + _memoryBytesSpilled += buffer.estimateSize() + } + freeMemory = myMemoryThreshold - initialMemoryThreshold + myMemoryThreshold = initialMemoryThreshold + } + freeMemory + } + /** * Return an iterator over all the data written to this object, grouped by partition and * aggregated by the requested aggregator. For each partition we then have an iterator over its @@ -616,26 +709,42 @@ private[spark] class ExternalSorter[K, V, C]( def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { val usingMap = aggregator.isDefined val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer + def changeIterToMemoryOrDiskIter(inMemory: Iterator[((Int, K), C)]) = { + if (isShuffleSort) { + inMemory + } else { + memoryOrDiskIter = Some(MemoryOrDiskIterator(inMemory)) + memoryOrDiskIter.get + } + } + if (spills.isEmpty) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID if (!ordering.isDefined) { // The user hasn't requested sorted keys, so only sort by partition ID, not key - groupByPartition(collection.partitionedDestructiveSortedIterator(None)) + groupByPartition(changeIterToMemoryOrDiskIter( + collection.partitionedDestructiveSortedIterator(None))) } else { // We do need to sort by both partition ID and key - groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator))) + groupByPartition(changeIterToMemoryOrDiskIter( + collection.partitionedDestructiveSortedIterator(Some(keyComparator)))) } } else { // Merge spilled and in-memory data - merge(spills, collection.partitionedDestructiveSortedIterator(comparator)) + merge(spills, changeIterToMemoryOrDiskIter( + collection.partitionedDestructiveSortedIterator(comparator))) } } /** * Return an iterator over all the data written to this object, aggregated by our aggregator. */ - def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2) + def iterator: Iterator[Product2[K, C]] = { + isShuffleSort = false + shuffleMemoryManager.addSpillableToReservedList(this) + partitionedIterator.flatMap(pair => pair._2) + } /** * Write all the data added into this ExternalSorter into a file in the disk store. This is From eb04478ca1f10fb1d40a3c4114aef681d4fa7289 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 30 Jun 2015 00:41:42 +0800 Subject: [PATCH 02/13] refractor code --- .../collection/ExternalAppendOnlyMap.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index d58e74390e0cc..c7b71cc00795f 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -157,22 +157,6 @@ class ExternalAppendOnlyMap[K, V, C]( spilledMaps.append(spillMemoryToDisk(it)) } - def diskBytesSpilled: Long = _diskBytesSpilled - - /** - * Return an iterator that merges the in-memory map with the spilled maps. - * If no spill has occurred, simply return the in-memory map's iterator. - */ - override def iterator: Iterator[(K, C)] = { - shuffleMemoryManager.addSpillableToReservedList(this) - if (spilledMaps.isEmpty) { - memoryOrDiskIter = Some(MemoryOrDiskIterator(currentMap.iterator)) - memoryOrDiskIter.get - } else { - new ExternalIterator() - } - } - /** * spill contents of the in-memory map to a temporary file on disk. */ @@ -231,6 +215,22 @@ class ExternalAppendOnlyMap[K, V, C]( new DiskMapIterator(file, blockId, batchSizes) } + def diskBytesSpilled: Long = _diskBytesSpilled + + /** + * Return an iterator that merges the in-memory map with the spilled maps. + * If no spill has occurred, simply return the in-memory map's iterator. + */ + override def iterator: Iterator[(K, C)] = { + shuffleMemoryManager.addSpillableToReservedList(this) + if (spilledMaps.isEmpty) { + memoryOrDiskIter = Some(MemoryOrDiskIterator(currentMap.iterator)) + memoryOrDiskIter.get + } else { + new ExternalIterator() + } + } + /** * spill contents of memory map to disk */ From 02749c1785f63020c7b998e83353bb9f14e87304 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 30 Jun 2015 23:37:43 +0800 Subject: [PATCH 03/13] add unit test --- .../spark/shuffle/ShuffleMemoryManager.scala | 4 +-- .../shuffle/ShuffleMemoryManagerSuite.scala | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index 80b5c677f0c66..727063706aa56 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -116,7 +116,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging { if (freeMemory >= math.min(maxToGrant, maxMemory / (2 * numActiveThreads) - curMem)) { val toGrant = math.min(maxToGrant, freeMemory) threadMemory(threadId) += toGrant - return toGrant + return this.releaseReservedMemory(toGrant, numBytes) } else { logInfo(s"Thread $threadId waiting for at least 1/2N of shuffle memory pool to be free") wait() @@ -125,7 +125,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging { // Only give it as much memory as is free, which might be none if it reached 1 / numThreads val toGrant = math.min(maxToGrant, freeMemory) threadMemory(threadId) += toGrant - return toGrant + return this.releaseReservedMemory(toGrant, numBytes) } } 0L // Never reached diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala index 96778c9ebafb1..c31294b4233bc 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala @@ -23,6 +23,18 @@ import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.CountDownLatch import org.apache.spark.SparkFunSuite +import org.apache.spark.Spillable + +class FakeSpillable extends Spillable { + var myMemoryThreshold: Long = 0L + def addMemory(currentMemory: Long) = { + myMemoryThreshold += currentMemory + } + + override def forceSpill(): Long = { + return myMemoryThreshold + } +} class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts { /** Launch a thread with the given body block and return it. */ @@ -307,4 +319,17 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts { val granted = manager.tryToAcquire(300L) assert(0 === granted, "granted is negative") } + + test("latter spillable grab full memory of previous spillable") { + val manager = new ShuffleMemoryManager(1000L) + val spill1 = new FakeSpillable() + val spill2 = new FakeSpillable() + spill1.addMemory(manager.tryToAcquire(700L)) + spill1.addMemory(manager.tryToAcquire(300L)) + manager.addSpillableToReservedList(spill1) + val granted1 = manager.tryToAcquire(300L) + assert(300L === granted1, "granted memory") + val granted2 = manager.tryToAcquire(800L) + assert(700L === granted2, "granted remained memory") + } } From 5cc88ab51cd974a8ad2438019a5e04052c7e2b0b Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 1 Jul 2015 00:10:07 +0800 Subject: [PATCH 04/13] refactor style --- .../spark/shuffle/ShuffleMemoryManager.scala | 22 ++--- .../collection/ExternalAppendOnlyMap.scala | 40 ++++----- .../util/collection/ExternalSorter.scala | 84 +++++++++---------- .../shuffle/ShuffleMemoryManagerSuite.scala | 5 ++ 4 files changed, 78 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index 727063706aa56..cff9b3aa2c05b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -46,30 +46,30 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging { /** * release other Spillable's memory of current thread until freeMemory >= requestedMemory */ - def releaseReservedMemory(toGrant: Long, requestedAmount: Long): Long = synchronized { + def releaseReservedMemory(toGrant: Long, requestMemory: Long): Long = synchronized { val threadId = Thread.currentThread().getId - if (toGrant >= requestedAmount || !threadReservedList.contains(threadId)){ + if (toGrant >= requestMemory || !threadReservedList.contains(threadId)){ toGrant } else { - //try to spill objs in current thread to make space for new request - var addedMemory = toGrant - while(addedMemory < requestedAmount && !threadReservedList(threadId).isEmpty ) { + //try to release Spillable's memory in current thread to make space for new request + var addMemory = toGrant + while(addMemory < requestMemory && !threadReservedList(threadId).isEmpty ) { val toSpill = threadReservedList(threadId).remove(0) val spillMemory = toSpill.forceSpill() logInfo(s"Thread $threadId forceSpill $spillMemory bytes to be free") - addedMemory += spillMemory + addMemory += spillMemory } - if (addedMemory > requestedAmount) { - this.release(addedMemory - requestedAmount) - addedMemory = requestedAmount + if (addMemory > requestMemory) { + this.release(addMemory - requestMemory) + addMemory = requestMemory } - addedMemory + addMemory } } /** * add Spillable to memoryReservedList of current thread, when current thread has - * no enough memory, we can release memory of current thread's memory reserved list + * no enough memory, we can release memory of current thread's memoryReservedList */ def addSpillableToReservedList(spill: Spillable) = synchronized { val threadId = Thread.currentThread().getId diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index c7b71cc00795f..2d9cdf4a31ab6 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -35,7 +35,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics /** * :: DeveloperApi :: - * An append-only map that spills sorted content to disk when there is insufficient space for it + * An append-only map that spills sorted content to disk when there is insufficient space for inMemory * to grow. * * This map takes two passes over the data: @@ -160,7 +160,7 @@ class ExternalAppendOnlyMap[K, V, C]( /** * spill contents of the in-memory map to a temporary file on disk. */ - private[this] def spillMemoryToDisk(it: Iterator[(K, C)]): DiskMapIterator = { + private[this] def spillMemoryToDisk(inMemory: Iterator[(K, C)]): DiskMapIterator = { val (blockId, file) = diskBlockManager.createTempLocalBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) @@ -181,8 +181,8 @@ class ExternalAppendOnlyMap[K, V, C]( var success = false try { - while (it.hasNext) { - val kv = it.next() + while (inMemory.hasNext) { + val kv = inMemory.next() writer.write(kv._1, kv._2) objectsWritten += 1 @@ -232,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C]( } /** - * spill contents of memory map to disk + * spill contents of memory map to disk and release its memory */ override def forceSpill(): Long = { var freeMemory = 0L @@ -251,24 +251,24 @@ class ExternalAppendOnlyMap[K, V, C]( freeMemory } - /** - * An iterator that read the elements from the in-memory iterator or the disk iterator after - * spilling contents of in-memory iterator to disk. + /* + * An iterator that read elements from in-memory iterator or disk iterator when in-memory + * iterator have spilled to disk. */ - case class MemoryOrDiskIterator(memIt: Iterator[(K,C)]) extends Iterator[(K,C)] { + case class MemoryOrDiskIterator(memIter: Iterator[(K,C)]) extends Iterator[(K,C)] { - var currentIt = memIt + var currentIter = memIter - override def hasNext: Boolean = currentIt.hasNext + override def hasNext: Boolean = currentIter.hasNext - override def next(): (K, C) = currentIt.next() + override def next(): (K, C) = currentIter.next() def spill() = { if (hasNext) { - currentIt = spillMemoryToDisk(currentIt) + currentIter = spillMemoryToDisk(currentIter) } else { - //the memory iterator is already drained, release it by giving an empty iterator - currentIt = new Iterator[(K,C)]{ + //in-memory iterator is already drained, release it by giving an empty iterator + currentIter = new Iterator[(K,C)]{ override def hasNext: Boolean = false override def next(): (K, C) = null } @@ -283,7 +283,7 @@ class ExternalAppendOnlyMap[K, V, C]( private class ExternalIterator extends Iterator[(K, C)] { // A queue that maintains a buffer for each stream we are currently merging - // This queue maintains the invariant that it only contains non-empty buffers + // This queue maintains the invariant that inMemory only contains non-empty buffers private val mergeHeap = new mutable.PriorityQueue[StreamBuffer] // Input streams are derived both from the in-memory map and spilled maps on disk @@ -332,7 +332,7 @@ class ExternalAppendOnlyMap[K, V, C]( val pair = buffer.pairs(i) if (pair._1 == key) { // Note that there's at most one pair in the buffer with a given key, since we always - // merge stuff in a map before spilling, so it's safe to return after the first we find + // merge stuff in a map before spilling, so inMemory's safe to return after the first we find removeFromBuffer(buffer.pairs, i) return mergeCombiners(baseCombiner, pair._2) } @@ -343,7 +343,7 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Remove the index'th element from an ArrayBuffer in constant time, swapping another element - * into its place. This is more efficient than the ArrayBuffer.remove method because it does + * into its place. This is more efficient than the ArrayBuffer.remove method because inMemory does * not have to shift all the elements in the array over. It works for our array buffers because * we don't care about the order of elements inside, we just want to search them for a key. */ @@ -385,7 +385,7 @@ class ExternalAppendOnlyMap[K, V, C]( mergedBuffers += newBuffer } - // Repopulate each visited stream buffer and add it back to the queue if it is non-empty + // Repopulate each visited stream buffer and add inMemory back to the queue if inMemory is non-empty mergedBuffers.foreach { buffer => if (buffer.isEmpty) { readNextHashCode(buffer.iterator, buffer.pairs) @@ -488,7 +488,7 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Return the next (K, C) pair from the deserialization stream. * - * If the current batch is drained, construct a stream for the next batch and read from it. + * If the current batch is drained, construct a stream for the next batch and read from inMemory. * If no more pairs are left, return null. */ private def readNextItem(): (K, C) = { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 922a1b23fa5e4..cbf5a1aa5c2a3 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -50,10 +50,10 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * @param ordering optional Ordering to sort keys within each partition; should be a total ordering * @param serializer serializer to use when spilling to disk * - * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * Note that if an Ordering is given, we'll always sort using inMemory, so only provide inMemory if you really * want the output keys to be sorted. In a map task without map-side combine for example, you * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do - * want to do combining, having an Ordering is more efficient than not having it. + * want to do combining, having an Ordering is more efficient than not having inMemory. * * Users interact with this class in the following way: * @@ -61,7 +61,7 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * * 2. Call insertAll() with a set of records. * - * 3. Request an iterator() back to traverse sorted/aggregated records. + * 3. Request an inMemory() back to traverse sorted/aggregated records. * - or - * Invoke writePartitionedFile() to create a file containing sorted/aggregated outputs * that can be used in Spark's sort shuffle. @@ -74,12 +74,12 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * To avoid calling the partitioner multiple times with each key, we store the partition ID * alongside each record. * - * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * - When each buffer reaches our memory limit, we spill inMemory to a file. This file is sorted first * by partition ID and possibly second by key or by hash code of the key, if we want to do * aggregation. For each file, we track how many objects were in each partition in memory, so we * don't have to write out the partition ID for every element. * - * - When the user requests an iterator or file output, the spilled files are merged, along with + * - When the user requests an inMemory or file output, the spilled files are merged, along with * any remaining in-memory data, using the same sort order defined above (unless both sorting * and aggregation are disabled). If we need to aggregate by key, we either use a total ordering * from the ordering parameter, or read the keys with the same hash code and compare them with @@ -240,7 +240,7 @@ private[spark] class ExternalSorter[K, V, C]( /** * Spill our in-memory collection to a sorted file that we can merge later. - * We add this file into `spilledFiles` to find it later. + * We add this file into `spilledFiles` to find inMemory later. * * @param collection whichever collection we're using (map or buffer) */ @@ -251,12 +251,12 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * Merge a sequence of sorted files, giving an iterator over partitions and then over elements + * Merge a sequence of sorted files, giving an inMemory over partitions and then over elements * inside each partition. This can be used to either write out a new file or return data to * the user. * - * Returns an iterator over all the data written to this object, grouped by partition. For each - * partition we then have an iterator over its contents, and these are expected to be accessed + * Returns an inMemory over all the data written to this object, grouped by partition. For each + * partition we then have an inMemory over its contents, and these are expected to be accessed * in order (you can't "skip ahead" to one partition without reading the previous one). * Guaranteed to return a key-value pair for each partition, in order of partition ID. */ @@ -313,7 +313,7 @@ private[spark] class ExternalSorter[K, V, C]( /** * Merge a sequence of (K, C) iterators by aggregating values for each key, assuming that each - * iterator is sorted by key with a given comparator. If the comparator is not a total ordering + * inMemory is sorted by key with a given comparator. If the comparator is not a total ordering * (e.g. when we sort objects by hash code and different keys may compare as equal although * they're not), we still merge them by doing equality tests for all keys that compare as equal. */ @@ -364,8 +364,8 @@ private[spark] class ExternalSorter[K, V, C]( } } - // Note that we return an iterator of elements since we could've had many keys marked - // equal by the partial order; we flatten this below to get a flat iterator of (K, C). + // Note that we return an inMemory of elements since we could've had many keys marked + // equal by the partial order; we flatten this below to get a flat inMemory of (K, C). keys.iterator.zip(combiners.iterator) } }.flatMap(i => i) @@ -468,7 +468,7 @@ private[spark] class ExternalSorter[K, V, C]( * Return the next (K, C) pair from the deserialization stream and update partitionId, * indexInPartition, indexInBatch and such to match its location. * - * If the current batch is drained, construct a stream for the next batch and read from it. + * If the current batch is drained, construct a stream for the next batch and read from inMemory. * If no more pairs are left, return null. */ private def readNextItem(): (K, C) = { @@ -539,9 +539,9 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * spill contents of the in-memory map to a temporary file on disk. + * spill contents of in-memory iterator to a temporary file on disk. */ - private def spillMemoryToDisk(it: WritablePartitionedIterator): SpilledFile = { + private def spillMemoryToDisk(inMemory: WritablePartitionedIterator): SpilledFile = { // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. @@ -579,9 +579,9 @@ private[spark] class ExternalSorter[K, V, C]( var success = false try { - while (it.hasNext) { - val partitionId = it.nextPartition() - it.writeNext(writer) + while (inMemory.hasNext) { + val partitionId = inMemory.nextPartition() + inMemory.writeNext(writer) elementsPerPartition(partitionId) += 1 objectsWritten += 1 @@ -614,16 +614,16 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * Spill in-memory iterator to a temporary file on disk. - * Return an iterator over a temporary file on disk. + * Spill in-memory inMemory to a temporary file on disk. + * Return on-disk iterator over a temporary file. */ - private[this] def spillMemoryToDisk(currentIt: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = { + private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = { val it = new WritablePartitionedIterator { - private[this] var cur = if (currentIt.hasNext) currentIt.next() else null + private[this] var cur = if (iterator.hasNext) iterator.next() else null def writeNext(writer: BlockObjectWriter): Unit = { writer.write(cur._1._2, cur._2) - cur = if (currentIt.hasNext) currentIt.next() else null + cur = if (iterator.hasNext) iterator.next() else null } def hasNext(): Boolean = cur != null @@ -635,32 +635,32 @@ private[spark] class ExternalSorter[K, V, C]( (0 until numPartitions).iterator.flatMap { p => val iterator = spillReader.readNextPartition() - iterator.map{cur => ((p, cur._1), cur._2)} + iterator.map(cur => ((p, cur._1), cur._2)) } } /** - * An iterator that read the elements from the in-memory iterator or the disk iterator after - * spilling contents of in-memory iterator to disk. + * An iterator that read elements from in-memory iterator or disk iterator when in-memory + * iterator have spilled to disk. */ - case class MemoryOrDiskIterator(memIt: Iterator[((Int, K), C)]) extends Iterator[((Int, K), C)] { + case class MemoryOrDiskIterator(memIter: Iterator[((Int, K), C)]) extends Iterator[((Int, K), C)] { - var currentIt = memIt + var currentIter = memIter - override def hasNext: Boolean = currentIt.hasNext + override def hasNext: Boolean = currentIter.hasNext - override def next(): ((Int, K), C) = currentIt.next() + override def next(): ((Int, K), C) = currentIter.next() def spill() = { if (hasNext) { - currentIt = spillMemoryToDisk(currentIt) + currentIter = spillMemoryToDisk(currentIter) } else { - //the memory iterator is already drained, release it by giving an empty iterator - currentIt = new Iterator[((Int, K), C)]{ + //in-memory iterator is already drained, release it by giving an empty iterator + currentIter = new Iterator[((Int, K), C)]{ override def hasNext: Boolean = false override def next(): ((Int, K), C) = null } - logInfo("nothing in memory iterator, do nothing") + logInfo("nothing in memory inMemory, do nothing") } } } @@ -696,8 +696,8 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * Return an iterator over all the data written to this object, grouped by partition and - * aggregated by the requested aggregator. For each partition we then have an iterator over its + * Return an inMemory over all the data written to this object, grouped by partition and + * aggregated by the requested aggregator. For each partition we then have an inMemory over its * contents, and these are expected to be accessed in order (you can't "skip ahead" to one * partition without reading the previous one). Guaranteed to return a key-value pair for each * partition, in order of partition ID. @@ -738,7 +738,7 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * Return an iterator over all the data written to this object, aggregated by our aggregator. + * Return an inMemory over all the data written to this object, aggregated by our aggregator. */ def iterator: Iterator[Product2[K, C]] = { isShuffleSort = false @@ -778,7 +778,7 @@ private[spark] class ExternalSorter[K, V, C]( lengths(partitionId) = segment.length } } else { - // We must perform merge-sort; get an iterator by partition and write everything directly. + // We must perform merge-sort; get an inMemory by partition and write everything directly. for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, @@ -806,9 +806,9 @@ private[spark] class ExternalSorter[K, V, C]( /** * Given a stream of ((partition, key), combiner) pairs *assumed to be sorted by partition ID*, - * group together the pairs for each partition into a sub-iterator. + * group together the pairs for each partition into a sub-inMemory. * - * @param data an iterator of elements, assumed to already be sorted by partition ID + * @param data an inMemory of elements, assumed to already be sorted by partition ID */ private def groupByPartition(data: Iterator[((Int, K), C)]) : Iterator[(Int, Iterator[Product2[K, C]])] = @@ -818,8 +818,8 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * An iterator that reads only the elements for a given partition ID from an underlying buffered - * stream, assuming this partition is the next one to be read. Used to make it easier to return + * An inMemory that reads only the elements for a given partition ID from an underlying buffered + * stream, assuming this partition is the next one to be read. Used to make inMemory easier to return * partitioned iterators from our in-memory collection. */ private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)]) diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala index c31294b4233bc..0b1bb733d44da 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala @@ -26,7 +26,9 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.Spillable class FakeSpillable extends Spillable { + var myMemoryThreshold: Long = 0L + def addMemory(currentMemory: Long) = { myMemoryThreshold += currentMemory } @@ -322,11 +324,14 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts { test("latter spillable grab full memory of previous spillable") { val manager = new ShuffleMemoryManager(1000L) + val spill1 = new FakeSpillable() val spill2 = new FakeSpillable() + spill1.addMemory(manager.tryToAcquire(700L)) spill1.addMemory(manager.tryToAcquire(300L)) manager.addSpillableToReservedList(spill1) + val granted1 = manager.tryToAcquire(300L) assert(300L === granted1, "granted memory") val granted2 = manager.tryToAcquire(800L) From 67969d1ca63c1f7aba162bada304d7f00fcfb91f Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 1 Jul 2015 00:37:08 +0800 Subject: [PATCH 05/13] set initialMemory=0 --- .../org/apache/spark/util/collection/CollectionSpillable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala index 671a41bde13a7..f7ddb5360339c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala @@ -46,7 +46,7 @@ private[spark] trait CollectionSpillable[C] extends Logging with Spillable{ // Initial threshold for the size of a collection before we start tracking its memory usage // Exposed for testing protected val initialMemoryThreshold: Long = - SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024) + SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 0L) // Threshold for this collection's size in bytes before we start tracking its memory usage // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0 From 5280ef05df50eb72b2df0f90f2066c5a366c9ef3 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 1 Jul 2015 00:40:11 +0800 Subject: [PATCH 06/13] remove initialMemoryThreshold --- .../spark/util/collection/CollectionSpillable.scala | 12 +++--------- .../util/collection/ExternalAppendOnlyMap.scala | 4 ++-- .../spark/util/collection/ExternalSorter.scala | 4 ++-- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala index f7ddb5360339c..d3b03e002515c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala @@ -43,14 +43,8 @@ private[spark] trait CollectionSpillable[C] extends Logging with Spillable{ // Memory manager that can be used to acquire/release memory protected val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager - // Initial threshold for the size of a collection before we start tracking its memory usage - // Exposed for testing - protected val initialMemoryThreshold: Long = - SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 0L) - // Threshold for this collection's size in bytes before we start tracking its memory usage - // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0 - protected var myMemoryThreshold = initialMemoryThreshold + protected var myMemoryThreshold = 0L // Number of elements read from input since last spill protected var _elementsRead = 0L @@ -103,8 +97,8 @@ private[spark] trait CollectionSpillable[C] extends Logging with Spillable{ */ private def releaseMemoryForThisThread(): Unit = { // The amount we requested does not include the initial memory tracking threshold - shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold) - myMemoryThreshold = initialMemoryThreshold + shuffleMemoryManager.release(myMemoryThreshold) + myMemoryThreshold = 0L } /** diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 2d9cdf4a31ab6..55523978d5518 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -244,8 +244,8 @@ class ExternalAppendOnlyMap[K, V, C]( _elementsRead = 0 _memoryBytesSpilled += currentMap.estimateSize() - freeMemory = myMemoryThreshold - initialMemoryThreshold - myMemoryThreshold = initialMemoryThreshold + freeMemory = myMemoryThreshold + myMemoryThreshold = 0L } freeMemory diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index cbf5a1aa5c2a3..6b14ef6d7ebef 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -689,8 +689,8 @@ private[spark] class ExternalSorter[K, V, C]( } else { _memoryBytesSpilled += buffer.estimateSize() } - freeMemory = myMemoryThreshold - initialMemoryThreshold - myMemoryThreshold = initialMemoryThreshold + freeMemory = myMemoryThreshold + myMemoryThreshold = 0L } freeMemory } From c17c0475708099cc613fb03c6b87b62963c8cd3a Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 1 Jul 2015 00:57:21 +0800 Subject: [PATCH 07/13] refactor style --- .../spark/shuffle/ShuffleMemoryManager.scala | 7 ++-- .../util/collection/CollectionSpillable.scala | 2 - .../collection/ExternalAppendOnlyMap.scala | 12 +++--- .../util/collection/ExternalSorter.scala | 38 +++++++++---------- 4 files changed, 29 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index cff9b3aa2c05b..8a4c5923ba13d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -46,12 +46,13 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging { /** * release other Spillable's memory of current thread until freeMemory >= requestedMemory */ - def releaseReservedMemory(toGrant: Long, requestMemory: Long): Long = synchronized { + private[spark] def releaseReservedMemory(toGrant: Long, requestMemory: Long): Long = + synchronized { val threadId = Thread.currentThread().getId if (toGrant >= requestMemory || !threadReservedList.contains(threadId)){ toGrant } else { - //try to release Spillable's memory in current thread to make space for new request + // try to release Spillable's memory in current thread to make space for new request var addMemory = toGrant while(addMemory < requestMemory && !threadReservedList(threadId).isEmpty ) { val toSpill = threadReservedList(threadId).remove(0) @@ -71,7 +72,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging { * add Spillable to memoryReservedList of current thread, when current thread has * no enough memory, we can release memory of current thread's memoryReservedList */ - def addSpillableToReservedList(spill: Spillable) = synchronized { + private[spark] def addSpillableToReservedList(spill: Spillable) = synchronized { val threadId = Thread.currentThread().getId if (!threadReservedList.contains(threadId)) { threadReservedList(threadId) = new mutable.ListBuffer[Spillable]() diff --git a/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala index d3b03e002515c..dd8da7007749e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala @@ -19,8 +19,6 @@ package org.apache.spark.util.collection import org.apache.spark.{Logging, SparkEnv, Spillable} -import scala.reflect.ClassTag - /** * Spills contents of an in-memory collection to disk when the memory threshold * has been exceeded. diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 55523978d5518..69efd81b5a181 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -35,7 +35,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics /** * :: DeveloperApi :: - * An append-only map that spills sorted content to disk when there is insufficient space for inMemory + * An append-only map that spills sorted content to disk when there is insufficient space for it * to grow. * * This map takes two passes over the data: @@ -283,7 +283,7 @@ class ExternalAppendOnlyMap[K, V, C]( private class ExternalIterator extends Iterator[(K, C)] { // A queue that maintains a buffer for each stream we are currently merging - // This queue maintains the invariant that inMemory only contains non-empty buffers + // This queue maintains the invariant that it only contains non-empty buffers private val mergeHeap = new mutable.PriorityQueue[StreamBuffer] // Input streams are derived both from the in-memory map and spilled maps on disk @@ -332,7 +332,7 @@ class ExternalAppendOnlyMap[K, V, C]( val pair = buffer.pairs(i) if (pair._1 == key) { // Note that there's at most one pair in the buffer with a given key, since we always - // merge stuff in a map before spilling, so inMemory's safe to return after the first we find + // merge stuff in a map before spilling, so it's safe to return after the first we find removeFromBuffer(buffer.pairs, i) return mergeCombiners(baseCombiner, pair._2) } @@ -343,7 +343,7 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Remove the index'th element from an ArrayBuffer in constant time, swapping another element - * into its place. This is more efficient than the ArrayBuffer.remove method because inMemory does + * into its place. This is more efficient than the ArrayBuffer.remove method because it does * not have to shift all the elements in the array over. It works for our array buffers because * we don't care about the order of elements inside, we just want to search them for a key. */ @@ -385,7 +385,7 @@ class ExternalAppendOnlyMap[K, V, C]( mergedBuffers += newBuffer } - // Repopulate each visited stream buffer and add inMemory back to the queue if inMemory is non-empty + // Repopulate each visited stream buffer and add it back to the queue if it is non-empty mergedBuffers.foreach { buffer => if (buffer.isEmpty) { readNextHashCode(buffer.iterator, buffer.pairs) @@ -488,7 +488,7 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Return the next (K, C) pair from the deserialization stream. * - * If the current batch is drained, construct a stream for the next batch and read from inMemory. + * If the current batch is drained, construct a stream for the next batch and read from it. * If no more pairs are left, return null. */ private def readNextItem(): (K, C) = { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 6b14ef6d7ebef..7e473449d76d6 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -50,10 +50,10 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * @param ordering optional Ordering to sort keys within each partition; should be a total ordering * @param serializer serializer to use when spilling to disk * - * Note that if an Ordering is given, we'll always sort using inMemory, so only provide inMemory if you really + * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really * want the output keys to be sorted. In a map task without map-side combine for example, you * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do - * want to do combining, having an Ordering is more efficient than not having inMemory. + * want to do combining, having an Ordering is more efficient than not having it. * * Users interact with this class in the following way: * @@ -61,7 +61,7 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * * 2. Call insertAll() with a set of records. * - * 3. Request an inMemory() back to traverse sorted/aggregated records. + * 3. Request an iterator() back to traverse sorted/aggregated records. * - or - * Invoke writePartitionedFile() to create a file containing sorted/aggregated outputs * that can be used in Spark's sort shuffle. @@ -74,12 +74,12 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter} * To avoid calling the partitioner multiple times with each key, we store the partition ID * alongside each record. * - * - When each buffer reaches our memory limit, we spill inMemory to a file. This file is sorted first + * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first * by partition ID and possibly second by key or by hash code of the key, if we want to do * aggregation. For each file, we track how many objects were in each partition in memory, so we * don't have to write out the partition ID for every element. * - * - When the user requests an inMemory or file output, the spilled files are merged, along with + * - When the user requests an iterator or file output, the spilled files are merged, along with * any remaining in-memory data, using the same sort order defined above (unless both sorting * and aggregation are disabled). If we need to aggregate by key, we either use a total ordering * from the ordering parameter, or read the keys with the same hash code and compare them with @@ -240,7 +240,7 @@ private[spark] class ExternalSorter[K, V, C]( /** * Spill our in-memory collection to a sorted file that we can merge later. - * We add this file into `spilledFiles` to find inMemory later. + * We add this file into `spilledFiles` to find it later. * * @param collection whichever collection we're using (map or buffer) */ @@ -251,12 +251,12 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * Merge a sequence of sorted files, giving an inMemory over partitions and then over elements + * Merge a sequence of sorted files, giving an iterator over partitions and then over elements * inside each partition. This can be used to either write out a new file or return data to * the user. * - * Returns an inMemory over all the data written to this object, grouped by partition. For each - * partition we then have an inMemory over its contents, and these are expected to be accessed + * Returns an iterator over all the data written to this object, grouped by partition. For each + * partition we then have an iterator over its contents, and these are expected to be accessed * in order (you can't "skip ahead" to one partition without reading the previous one). * Guaranteed to return a key-value pair for each partition, in order of partition ID. */ @@ -313,7 +313,7 @@ private[spark] class ExternalSorter[K, V, C]( /** * Merge a sequence of (K, C) iterators by aggregating values for each key, assuming that each - * inMemory is sorted by key with a given comparator. If the comparator is not a total ordering + * iterator is sorted by key with a given comparator. If the comparator is not a total ordering * (e.g. when we sort objects by hash code and different keys may compare as equal although * they're not), we still merge them by doing equality tests for all keys that compare as equal. */ @@ -364,8 +364,8 @@ private[spark] class ExternalSorter[K, V, C]( } } - // Note that we return an inMemory of elements since we could've had many keys marked - // equal by the partial order; we flatten this below to get a flat inMemory of (K, C). + // Note that we return an iterator of elements since we could've had many keys marked + // equal by the partial order; we flatten this below to get a flat iterator of (K, C). keys.iterator.zip(combiners.iterator) } }.flatMap(i => i) @@ -468,7 +468,7 @@ private[spark] class ExternalSorter[K, V, C]( * Return the next (K, C) pair from the deserialization stream and update partitionId, * indexInPartition, indexInBatch and such to match its location. * - * If the current batch is drained, construct a stream for the next batch and read from inMemory. + * If the current batch is drained, construct a stream for the next batch and read from it. * If no more pairs are left, return null. */ private def readNextItem(): (K, C) = { @@ -738,7 +738,7 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * Return an inMemory over all the data written to this object, aggregated by our aggregator. + * Return an iterator over all the data written to this object, aggregated by our aggregator. */ def iterator: Iterator[Product2[K, C]] = { isShuffleSort = false @@ -778,7 +778,7 @@ private[spark] class ExternalSorter[K, V, C]( lengths(partitionId) = segment.length } } else { - // We must perform merge-sort; get an inMemory by partition and write everything directly. + // We must perform merge-sort; get an iterator by partition and write everything directly. for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, @@ -806,9 +806,9 @@ private[spark] class ExternalSorter[K, V, C]( /** * Given a stream of ((partition, key), combiner) pairs *assumed to be sorted by partition ID*, - * group together the pairs for each partition into a sub-inMemory. + * group together the pairs for each partition into a sub-iterator. * - * @param data an inMemory of elements, assumed to already be sorted by partition ID + * @param data an iterator of elements, assumed to already be sorted by partition ID */ private def groupByPartition(data: Iterator[((Int, K), C)]) : Iterator[(Int, Iterator[Product2[K, C]])] = @@ -818,8 +818,8 @@ private[spark] class ExternalSorter[K, V, C]( } /** - * An inMemory that reads only the elements for a given partition ID from an underlying buffered - * stream, assuming this partition is the next one to be read. Used to make inMemory easier to return + * An iterator that reads only the elements for a given partition ID from an underlying buffered + * stream, assuming this partition is the next one to be read. Used to make it easier to return * partitioned iterators from our in-memory collection. */ private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)]) From 2e0b237b692388cef620bdcf6b66cbf47e628be7 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 1 Jul 2015 01:12:03 +0800 Subject: [PATCH 08/13] fix style --- .../spark/util/collection/ExternalAppendOnlyMap.scala | 10 +++++----- .../apache/spark/util/collection/ExternalSorter.scala | 6 +++--- .../spark/shuffle/ShuffleMemoryManagerSuite.scala | 1 - 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 69efd81b5a181..5ad26520a7459 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -234,7 +234,7 @@ class ExternalAppendOnlyMap[K, V, C]( /** * spill contents of memory map to disk and release its memory */ - override def forceSpill(): Long = { + override def forceSpill(): Long = { var freeMemory = 0L if (memoryOrDiskIter.isDefined) { _spillCount += 1 @@ -255,7 +255,7 @@ class ExternalAppendOnlyMap[K, V, C]( * An iterator that read elements from in-memory iterator or disk iterator when in-memory * iterator have spilled to disk. */ - case class MemoryOrDiskIterator(memIter: Iterator[(K,C)]) extends Iterator[(K,C)] { + case class MemoryOrDiskIterator(memIter: Iterator[(K, C)]) extends Iterator[(K, C)] { var currentIter = memIter @@ -263,12 +263,12 @@ class ExternalAppendOnlyMap[K, V, C]( override def next(): (K, C) = currentIter.next() - def spill() = { + private[spark] def spill() = { if (hasNext) { currentIter = spillMemoryToDisk(currentIter) } else { - //in-memory iterator is already drained, release it by giving an empty iterator - currentIter = new Iterator[(K,C)]{ + // in-memory iterator is already drained, release it by giving an empty iterator + currentIter = new Iterator[(K, C)]{ override def hasNext: Boolean = false override def next(): (K, C) = null } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 7e473449d76d6..06492bae841ba 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -651,11 +651,11 @@ private[spark] class ExternalSorter[K, V, C]( override def next(): ((Int, K), C) = currentIter.next() - def spill() = { + private[spark] def spill() = { if (hasNext) { currentIter = spillMemoryToDisk(currentIter) } else { - //in-memory iterator is already drained, release it by giving an empty iterator + // in-memory iterator is already drained, release it by giving an empty iterator currentIter = new Iterator[((Int, K), C)]{ override def hasNext: Boolean = false override def next(): ((Int, K), C) = null @@ -671,7 +671,7 @@ private[spark] class ExternalSorter[K, V, C]( override def forceSpill(): Long = { var freeMemory = 0L if (memoryOrDiskIter.isDefined) { - //has memory buffer that can be spilled + // has memory buffer that can be spilled _spillCount += 1 val shouldCombine = aggregator.isDefined diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala index 0b1bb733d44da..93eff7fcd5718 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala @@ -326,7 +326,6 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts { val manager = new ShuffleMemoryManager(1000L) val spill1 = new FakeSpillable() - val spill2 = new FakeSpillable() spill1.addMemory(manager.tryToAcquire(700L)) spill1.addMemory(manager.tryToAcquire(300L)) From e70f79f13c39591e5d33653e4de8e51d25e079dd Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 1 Jul 2015 01:39:01 +0800 Subject: [PATCH 09/13] fix style --- .../spark/shuffle/ShuffleMemoryManager.scala | 2 +- .../util/collection/CollectionSpillable.scala | 22 +++++++++++++---- .../collection/ExternalAppendOnlyMap.scala | 10 +------- .../util/collection/ExternalSorter.scala | 24 ++++++------------- .../shuffle/ShuffleMemoryManagerSuite.scala | 2 +- 5 files changed, 28 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index 8a4c5923ba13d..f3466454d8fbf 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -46,7 +46,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging { /** * release other Spillable's memory of current thread until freeMemory >= requestedMemory */ - private[spark] def releaseReservedMemory(toGrant: Long, requestMemory: Long): Long = + private[this] def releaseReservedMemory(toGrant: Long, requestMemory: Long): Long = synchronized { val threadId = Thread.currentThread().getId if (toGrant >= requestMemory || !threadReservedList.contains(threadId)){ diff --git a/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala index dd8da7007749e..e7dbd35356995 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala @@ -42,16 +42,16 @@ private[spark] trait CollectionSpillable[C] extends Logging with Spillable{ protected val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager // Threshold for this collection's size in bytes before we start tracking its memory usage - protected var myMemoryThreshold = 0L + private[this] var myMemoryThreshold = 0L // Number of elements read from input since last spill - protected var _elementsRead = 0L + private[this] var _elementsRead = 0L // Number of bytes spilled in total - protected var _memoryBytesSpilled = 0L + private[this] var _memoryBytesSpilled = 0L // Number of spills - protected var _spillCount = 0 + private[this] var _spillCount = 0 /** * Spills the current in-memory collection to disk if needed. Attempts to acquire more @@ -110,4 +110,18 @@ private[spark] trait CollectionSpillable[C] extends Logging with Spillable{ .format(threadId, org.apache.spark.util.Utils.bytesToString(size), _spillCount, if (_spillCount > 1) "s" else "")) } + + /** + * log ForceSpill and return collection's size + */ + protected def logForceSpill(currentMemory: Long): Long = { + _spillCount += 1 + logSpillage(currentMemory) + + _elementsRead = 0 + _memoryBytesSpilled += currentMemory + val freeMemory = myMemoryThreshold + myMemoryThreshold = 0L + freeMemory + } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 5ad26520a7459..eccd1353abe15 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -237,17 +237,9 @@ class ExternalAppendOnlyMap[K, V, C]( override def forceSpill(): Long = { var freeMemory = 0L if (memoryOrDiskIter.isDefined) { - _spillCount += 1 - logSpillage(currentMap.estimateSize()) - + freeMemory = logForceSpill(currentMap.estimateSize()) memoryOrDiskIter.get.spill() - - _elementsRead = 0 - _memoryBytesSpilled += currentMap.estimateSize() - freeMemory = myMemoryThreshold - myMemoryThreshold = 0L } - freeMemory } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 06492bae841ba..6856f8619aa11 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -617,7 +617,9 @@ private[spark] class ExternalSorter[K, V, C]( * Spill in-memory inMemory to a temporary file on disk. * Return on-disk iterator over a temporary file. */ - private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = { + private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]) + : Iterator[((Int, K), C)] = { + val it = new WritablePartitionedIterator { private[this] var cur = if (iterator.hasNext) iterator.next() else null @@ -643,7 +645,8 @@ private[spark] class ExternalSorter[K, V, C]( * An iterator that read elements from in-memory iterator or disk iterator when in-memory * iterator have spilled to disk. */ - case class MemoryOrDiskIterator(memIter: Iterator[((Int, K), C)]) extends Iterator[((Int, K), C)] { + case class MemoryOrDiskIterator(memIter: Iterator[((Int, K), C)]) + extends Iterator[((Int, K), C)] { var currentIter = memIter @@ -671,26 +674,13 @@ private[spark] class ExternalSorter[K, V, C]( override def forceSpill(): Long = { var freeMemory = 0L if (memoryOrDiskIter.isDefined) { - // has memory buffer that can be spilled - _spillCount += 1 - val shouldCombine = aggregator.isDefined if (shouldCombine) { - logSpillage(map.estimateSize()) + freeMemory = logForceSpill(map.estimateSize()) } else { - logSpillage(buffer.estimateSize()) + freeMemory = logForceSpill(buffer.estimateSize()) } - memoryOrDiskIter.get.spill() - - _elementsRead = 0 - if (shouldCombine) { - _memoryBytesSpilled += map.estimateSize() - } else { - _memoryBytesSpilled += buffer.estimateSize() - } - freeMemory = myMemoryThreshold - myMemoryThreshold = 0L } freeMemory } diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala index 93eff7fcd5718..0d71b5e929f77 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala @@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLatch import org.apache.spark.SparkFunSuite import org.apache.spark.Spillable -class FakeSpillable extends Spillable { +private[this] class FakeSpillable extends Spillable { var myMemoryThreshold: Long = 0L From 72e3be87158dde301f6347aab5d8ca773927757b Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 1 Jul 2015 01:43:05 +0800 Subject: [PATCH 10/13] fix style --- .../org/apache/spark/util/collection/CollectionSpillable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala index e7dbd35356995..00cdf35cbcfde 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/CollectionSpillable.scala @@ -104,7 +104,7 @@ private[spark] trait CollectionSpillable[C] extends Logging with Spillable{ * * @param size number of bytes spilled */ - @inline protected def logSpillage(size: Long) { + @inline private def logSpillage(size: Long) { val threadId = Thread.currentThread().getId logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)" .format(threadId, org.apache.spark.util.Utils.bytesToString(size), From 83040cfcc583c1ef64dcc486baf34bdc04d7e250 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 1 Jul 2015 01:57:00 +0800 Subject: [PATCH 11/13] fix style --- .../org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala index 0d71b5e929f77..17071d1eed2b4 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala @@ -29,7 +29,7 @@ private[this] class FakeSpillable extends Spillable { var myMemoryThreshold: Long = 0L - def addMemory(currentMemory: Long) = { + private[spark] def addMemory(currentMemory: Long) = { myMemoryThreshold += currentMemory } From f4737bbafc495f56fef2246f41c964c7295a7eb2 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 1 Jul 2015 22:56:46 +0800 Subject: [PATCH 12/13] fix file's leak --- .../util/collection/ExternalSorter.scala | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 6856f8619aa11..be0aeb5b22b58 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -615,10 +615,8 @@ private[spark] class ExternalSorter[K, V, C]( /** * Spill in-memory inMemory to a temporary file on disk. - * Return on-disk iterator over a temporary file. */ - private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]) - : Iterator[((Int, K), C)] = { + private[this] def spillMemoryToDisk(iterator: Iterator[((Int, K), C)]): SpilledFile = { val it = new WritablePartitionedIterator { private[this] var cur = if (iterator.hasNext) iterator.next() else null @@ -633,12 +631,7 @@ private[spark] class ExternalSorter[K, V, C]( def nextPartition(): Int = cur._1._1 } - val spillReader = new SpillReader(spillMemoryToDisk(it)) - - (0 until numPartitions).iterator.flatMap { p => - val iterator = spillReader.readNextPartition() - iterator.map(cur => ((p, cur._1), cur._2)) - } + spillMemoryToDisk(it) } /** @@ -649,6 +642,7 @@ private[spark] class ExternalSorter[K, V, C]( extends Iterator[((Int, K), C)] { var currentIter = memIter + var spillFile: Option[SpilledFile] = None override def hasNext: Boolean = currentIter.hasNext @@ -656,7 +650,13 @@ private[spark] class ExternalSorter[K, V, C]( private[spark] def spill() = { if (hasNext) { - currentIter = spillMemoryToDisk(currentIter) + spillFile = Some(spillMemoryToDisk(currentIter)) + val spillReader = new SpillReader(spillFile.get) + + currentIter = (0 until numPartitions).iterator.flatMap { p => + val iterator = spillReader.readNextPartition() + iterator.map(cur => ((p, cur._1), cur._2)) + } } else { // in-memory iterator is already drained, release it by giving an empty iterator currentIter = new Iterator[((Int, K), C)]{ @@ -666,6 +666,10 @@ private[spark] class ExternalSorter[K, V, C]( logInfo("nothing in memory inMemory, do nothing") } } + + def cleanup(): Unit = { + spillFile.foreach(_.file.delete()) + } } /** @@ -792,6 +796,7 @@ private[spark] class ExternalSorter[K, V, C]( def stop(): Unit = { spills.foreach(s => s.file.delete()) spills.clear() + memoryOrDiskIter.foreach(_.cleanup()) } /** From 907366b04af0731e480b15804d3420ba39f6108e Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 2 Jul 2015 00:40:19 +0800 Subject: [PATCH 13/13] fix style --- .../org/apache/spark/util/collection/ExternalSorter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index be0aeb5b22b58..70dbb501a0e9d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -180,7 +180,7 @@ private[spark] class ExternalSorter[K, V, C]( // Information about a spilled file. Includes sizes in bytes of "batches" written by the // serializer as we periodically reset its stream, as well as number of elements in each // partition, used to efficiently keep track of partitions when merging. - private[this] case class SpilledFile( + private[spark] case class SpilledFile( file: File, blockId: BlockId, serializerBatchSizes: Array[Long], @@ -667,7 +667,7 @@ private[spark] class ExternalSorter[K, V, C]( } } - def cleanup(): Unit = { + private[spark] def cleanup(): Unit = { spillFile.foreach(_.file.delete()) } }