From 39e339afe72ab1c861aef960ad3a5f76bc349153 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 4 Nov 2014 23:30:46 -0800 Subject: [PATCH 1/3] Don't spill more blocks than we need to --- .../shuffle/sort/SortShuffleReader.scala | 96 +++++++++++-------- 1 file changed, 58 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala index 8772b9740e12c..63c1dab1cd348 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala @@ -20,7 +20,7 @@ package org.apache.spark.shuffle.sort import java.io.{BufferedOutputStream, FileOutputStream} import java.util.Comparator -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable.{ArrayBuffer, HashMap, Queue} import scala.util.{Failure, Success, Try} import org.apache.spark._ @@ -59,6 +59,9 @@ private[spark] class SortShuffleReader[K, C]( /** Shuffle block fetcher iterator */ private var shuffleRawBlockFetcherItr: ShuffleRawBlockFetcherIterator = _ + /** Number of bytes left to fetch */ + private var unfetchedBytes: Long = _ + private val dep = handle.dependency private val conf = SparkEnv.get.conf private val blockManager = SparkEnv.get.blockManager @@ -68,7 +71,7 @@ private[spark] class SortShuffleReader[K, C]( private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024 /** ArrayBuffer to store in-memory shuffle blocks */ - private val inMemoryBlocks = new ArrayBuffer[MemoryShuffleBlock]() + private val inMemoryBlocks = new Queue[MemoryShuffleBlock]() /** Manage the BlockManagerId and related shuffle blocks */ private var statuses: Array[(BlockManagerId, Long)] = _ @@ -104,55 +107,26 @@ private[spark] class SortShuffleReader[K, C]( } } - inMemoryBlocks += MemoryShuffleBlock(blockId, blockData) - // Try to fit block in memory. If this fails, merge in-memory blocks to disk. val blockSize = blockData.size val granted = shuffleMemoryManager.tryToAcquire(blockSize) - if (granted < blockSize) { logInfo(s"Granted $granted memory is not enough to store shuffle block ($blockSize), " + - s"try to consolidate in-memory blocks to release the memory") + s"spilling in-memory blocks to release the memory") shuffleMemoryManager.release(granted) - - // Write merged blocks to disk - val (tmpBlockId, file) = blockManager.diskBlockManager.createTempShuffleBlock() - val fos = new FileOutputStream(file) - val bos = new BufferedOutputStream(fos, fileBufferSize) - - if (inMemoryBlocks.size > 1) { - val itrGroup = inMemoryBlocksToIterators() - val partialMergedItr = - MergeUtil.mergeSort(itrGroup, keyComparator, dep.keyOrdering, dep.aggregator) - blockManager.dataSerializeStream(tmpBlockId, bos, partialMergedItr, ser) - } else { - val buffer = inMemoryBlocks.map(_.blockData.nioByteBuffer()).head - val channel = fos.getChannel - while (buffer.hasRemaining) { - channel.write(buffer) - } - channel.close() - } - - tieredMerger.registerOnDiskBlock(tmpBlockId, file) - - logInfo(s"Merge ${inMemoryBlocks.size} in-memory blocks into file ${file.getName}") - - for (block <- inMemoryBlocks) { - block.blockData.release() - shuffleMemoryManager.release(block.blockData.size) - } - inMemoryBlocks.clear() + spillInMemoryBlocks(MemoryShuffleBlock(blockId, blockData)) } + unfetchedBytes -= blockData.size() shuffleRawBlockFetcherItr.currentResult = null } + assert(unfetchedBytes == 0) tieredMerger.doneRegisteringOnDiskBlocks() // Merge on-disk blocks with in-memory blocks to directly feed to the reducer. - val finalItrGroup = inMemoryBlocksToIterators() ++ Seq(tieredMerger.readMerged()) + val finalItrGroup = inMemoryBlocksToIterators(inMemoryBlocks) ++ Seq(tieredMerger.readMerged()) val mergedItr = MergeUtil.mergeSort(finalItrGroup, keyComparator, dep.keyOrdering, dep.aggregator) @@ -169,8 +143,53 @@ private[spark] class SortShuffleReader[K, C]( new InterruptibleIterator(context, completionItr.map(p => (p._1, p._2))) } - private def inMemoryBlocksToIterators(): Seq[Iterator[Product2[K, C]]] = { - inMemoryBlocks.map{ case MemoryShuffleBlock(id, buf) => + def spillInMemoryBlocks(tippingBlock: MemoryShuffleBlock): Unit = { + // Write merged blocks to disk + val (tmpBlockId, file) = blockManager.diskBlockManager.createTempShuffleBlock() + val fos = new FileOutputStream(file) + val bos = new BufferedOutputStream(fos, fileBufferSize) + + // If the remaining unfetched data would fit inside our current allocation, we don't want to + // waste time spilling blocks beyond the space needed for it. + var bytesToSpill = unfetchedBytes + val blocksToSpill = new ArrayBuffer[MemoryShuffleBlock]() + blocksToSpill += tippingBlock + bytesToSpill -= tippingBlock.blockData.size + while (bytesToSpill > 0 && inMemoryBlocks.isEmpty) { + val block = inMemoryBlocks.dequeue() + blocksToSpill += block + bytesToSpill -= block.blockData.size + } + + if (blocksToSpill.size > 1) { + val itrGroup = inMemoryBlocksToIterators(blocksToSpill) + val partialMergedItr = + MergeUtil.mergeSort(itrGroup, keyComparator, dep.keyOrdering, dep.aggregator) + blockManager.dataSerializeStream(tmpBlockId, bos, partialMergedItr, ser) + } else { + val buffer = blocksToSpill.map(_.blockData.nioByteBuffer()).head + val channel = fos.getChannel + while (buffer.hasRemaining) { + channel.write(buffer) + } + channel.close() + } + + tieredMerger.registerOnDiskBlock(tmpBlockId, file) + + logInfo(s"Merged ${blocksToSpill.size} in-memory blocks into file ${file.getName}") + + for (block <- blocksToSpill) { + block.blockData.release() + if (block != tippingBlock) { + shuffleMemoryManager.release(block.blockData.size) + } + } + } + + private def inMemoryBlocksToIterators(blocks: Seq[MemoryShuffleBlock]) + : Seq[Iterator[Product2[K, C]]] = { + blocks.map{ case MemoryShuffleBlock(id, buf) => blockManager.dataDeserialize(id, buf.nioByteBuffer(), ser) .asInstanceOf[Iterator[Product2[K, C]]] } @@ -190,6 +209,7 @@ private[spark] class SortShuffleReader[K, C]( } (address, blocks.toSeq) } + unfetchedBytes = blocksByAddress.flatMap(a => a._2.map(b => b._2)).sum shuffleRawBlockFetcherItr = new ShuffleRawBlockFetcherIterator( context, From f410f1f3b77fc56d72260eb8c75e1546058d3fd8 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 5 Nov 2014 00:57:16 -0800 Subject: [PATCH 2/3] Fix bug: add to inMemoryBlocks --- .../org/apache/spark/shuffle/sort/SortShuffleReader.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala index 63c1dab1cd348..6ab6da52c0151 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala @@ -110,12 +110,15 @@ private[spark] class SortShuffleReader[K, C]( // Try to fit block in memory. If this fails, merge in-memory blocks to disk. val blockSize = blockData.size val granted = shuffleMemoryManager.tryToAcquire(blockSize) + val block = MemoryShuffleBlock(blockId, blockData) if (granted < blockSize) { logInfo(s"Granted $granted memory is not enough to store shuffle block ($blockSize), " + s"spilling in-memory blocks to release the memory") shuffleMemoryManager.release(granted) - spillInMemoryBlocks(MemoryShuffleBlock(blockId, blockData)) + spillInMemoryBlocks(block) + } else { + inMemoryBlocks += block } unfetchedBytes -= blockData.size() From 9dc1af9df4cad5c79484b732739319e25f7b3235 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 5 Nov 2014 10:39:12 -0800 Subject: [PATCH 3/3] Fix another bug --- .../scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala index 6ab6da52c0151..df1a3fbfc4b9f 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleReader.scala @@ -158,7 +158,7 @@ private[spark] class SortShuffleReader[K, C]( val blocksToSpill = new ArrayBuffer[MemoryShuffleBlock]() blocksToSpill += tippingBlock bytesToSpill -= tippingBlock.blockData.size - while (bytesToSpill > 0 && inMemoryBlocks.isEmpty) { + while (bytesToSpill > 0 && !inMemoryBlocks.isEmpty) { val block = inMemoryBlocks.dequeue() blocksToSpill += block bytesToSpill -= block.blockData.size