diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 57295b44d3234..79c9c451d273d 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -56,9 +56,12 @@ case class Aggregator[K, V, C] ( } else { val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) combiners.insertAll(iter) - // TODO: Make this non optional in a future release - Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) - Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled) + // Update task metrics if context is not null + // TODO: Make context non optional in a future release + Option(context).foreach { c => + c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled + c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled + } combiners.iterator } } @@ -87,9 +90,12 @@ case class Aggregator[K, V, C] ( val pair = iter.next() combiners.insert(pair._1, pair._2) } - // TODO: Make this non optional in a future release - Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) - Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled) + // Update task metrics if context is not null + // TODO: Make context non-optional in a future release + Option(context).foreach { c => + c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled + c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled + } combiners.iterator } } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index a7342cfef7dd7..fabb882cdd4b3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -158,8 +158,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: for ((it, depNum) <- rddIterators) { map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) } - context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled + context.taskMetrics.memoryBytesSpilled += map.memoryBytesSpilled + context.taskMetrics.diskBytesSpilled += map.diskBytesSpilled new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 4039b8f94d291..6dcca47ea7c0c 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -65,14 +65,14 @@ private[spark] class SortShuffleManager extends ShuffleManager { def getBlockLocation(blockId: ShuffleBlockId, diskManager: DiskBlockManager): FileSegment = { // The block is actually going to be a range of a single map output file for this map, so // figure out the ID of the consolidated file, then the offset within that from our index - val realId = ShuffleBlockId(blockId.shuffleId, blockId.mapId, 0) - val indexFile = diskManager.getFile(realId.name + ".index") + val consolidatedId = blockId.copy(reduceId = 0) + val indexFile = diskManager.getFile(consolidatedId.name + ".index") val in = new DataInputStream(new FileInputStream(indexFile)) try { in.skip(blockId.reduceId * 8) val offset = in.readLong() val nextOffset = in.readLong() - new FileSegment(diskManager.getFile(realId), offset, nextOffset - offset) + new FileSegment(diskManager.getFile(consolidatedId), offset, nextOffset - offset) } finally { in.close() } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 85b4d647fe31a..42fcd07fa18bc 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -37,7 +37,7 @@ private[spark] class SortShuffleWriter[K, V, C]( private val numPartitions = dep.partitioner.numPartitions private val blockManager = SparkEnv.get.blockManager - private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) + private val ser = Serializer.getSerializer(dep.serializer.orNull) private val conf = SparkEnv.get.conf private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 @@ -45,11 +45,16 @@ private[spark] class SortShuffleWriter[K, V, C]( private var sorter: ExternalSorter[K, V, _] = null private var outputFile: File = null + // Are we in the process of stopping? Because map tasks can call stop() with success = true + // and then call stop() with success = false if they get an exception, we want to make sure + // we don't try deleting files, etc twice. private var stopping = false + private var mapStatus: MapStatus = null /** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { + // Get an iterator with the elements for each partition ID val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = { if (dep.mapSideCombine) { if (!dep.aggregator.isDefined) { @@ -97,7 +102,7 @@ private[spark] class SortShuffleWriter[K, V, C]( totalTime += writer.timeWriting() totalBytes += segment.length } else { - // Don't create a new writer to avoid writing any headers and things like that + // The partition is empty; don't create a new writer to avoid writing headers, etc offsets(id + 1) = offsets(id) } } @@ -106,8 +111,8 @@ private[spark] class SortShuffleWriter[K, V, C]( shuffleMetrics.shuffleBytesWritten = totalBytes shuffleMetrics.shuffleWriteTime = totalTime context.taskMetrics.shuffleWriteMetrics = Some(shuffleMetrics) - context.taskMetrics.memoryBytesSpilled = sorter.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled = sorter.diskBytesSpilled + context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled + context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled // Write an index file with the offsets of each block, plus a final offset at the end for the // end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure @@ -153,6 +158,7 @@ private[spark] class SortShuffleWriter[K, V, C]( // Clean up our sorter, which may have its own intermediate files if (sorter != null) { sorter.stop() + sorter = null } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 2a2ca0d5e9364..c1756ac905417 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -54,14 +54,12 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { } @DeveloperApi -case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) - extends BlockId { +case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId } @DeveloperApi -case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) - extends BlockId { +case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index" } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index a9dadb8d306a0..4d66ccea211fa 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -35,11 +35,13 @@ import org.apache.spark.shuffle.sort.SortShuffleManager * * @param rootDirs The directories to use for storing block files. Data will be hashed among these. */ -private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String) +private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, rootDirs: String) extends PathResolver with Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 - private val subDirsPerLocalDir = shuffleManager.conf.getInt("spark.diskStore.subDirectories", 64) + + private val subDirsPerLocalDir = + shuffleBlockManager.conf.getInt("spark.diskStore.subDirectories", 64) /* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid @@ -60,12 +62,14 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD * Otherwise, we assume the Block is mapped to the whole file identified by the BlockId. */ def getBlockLocation(blockId: BlockId): FileSegment = { - val env = SparkEnv.get + val env = SparkEnv.get // NOTE: can be null in unit tests if (blockId.isShuffle && env != null && env.shuffleManager.isInstanceOf[SortShuffleManager]) { - val sortShuffleManager = SparkEnv.get.shuffleManager.asInstanceOf[SortShuffleManager] + // For sort-based shuffle, let it figure out its blocks + val sortShuffleManager = env.shuffleManager.asInstanceOf[SortShuffleManager] sortShuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId], this) - } else if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) { - shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) + } else if (blockId.isShuffle && shuffleBlockManager.consolidateShuffleFiles) { + // For hash-based shuffle with consolidated files, ShuffleBlockManager takes care of this + shuffleBlockManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) } else { val file = getFile(blockId.name) new FileSegment(file, 0, file.length()) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 00019ef35c9e7..b34512ef9eb60 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -110,7 +110,6 @@ class ExternalAppendOnlyMap[K, V, C]( private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 private val keyComparator = new HashComparator[K] private val ser = serializer.newInstance() - private val threadId = Thread.currentThread().getId /** * Insert the given key and value into the map. @@ -148,6 +147,7 @@ class ExternalAppendOnlyMap[K, V, C]( // Atomically check whether there is sufficient memory in the global pool for // this map to grow and, if possible, allocate the required amount shuffleMemoryMap.synchronized { + val threadId = Thread.currentThread().getId val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) val availableMemory = maxMemoryThreshold - (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) @@ -187,7 +187,8 @@ class ExternalAppendOnlyMap[K, V, C]( */ private def spill(mapSize: Long): Unit = { spillCount += 1 - logWarning("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)" + val threadId = Thread.currentThread().getId + logInfo("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)" .format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) val (blockId, file) = diskBlockManager.createTempBlock() var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index ffedb3b947d42..54c3310744136 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -84,6 +84,14 @@ private[spark] class ExternalSorter[K, V, C]( private val conf = SparkEnv.get.conf private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true) private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 + + // Size of object batches when reading/writing from serializers. + // + // Objects are written in batches, with each batch using its own serialization stream. This + // cuts down on the size of reference-tracking maps constructed when deserializing a stream. + // + // NOTE: Setting this too low can cause excessive copying when serializing, since some serializers + // grow internal data structures by growing + copying every time the number of objects doubles. private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000) private def getPartition(key: K): Int = { @@ -93,8 +101,8 @@ private[spark] class ExternalSorter[K, V, C]( // Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we // store them in an array buffer. - var map = new SizeTrackingAppendOnlyMap[(Int, K), C] - var buffer = new SizeTrackingPairBuffer[(Int, K), C] + private var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + private var buffer = new SizeTrackingPairBuffer[(Int, K), C] // Number of pairs read from input since last spill; note that we count them even if a value is // merged with a previous key in case we're doing something like groupBy where the result grows @@ -118,11 +126,11 @@ private[spark] class ExternalSorter[K, V, C]( // How much of the shared memory pool this collection has claimed private var myMemoryThreshold = 0L - // A comparator for keys K that orders them within a partition to allow partial aggregation. + // A comparator for keys K that orders them within a partition to allow aggregation or sorting. // Can be a partial ordering by hash code if a total ordering is not provided through by the // user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some // non-equal keys also have this, so we need to do a later pass to find truly equal keys). - // Note that we ignore this if no aggregator is given. + // Note that we ignore this if no aggregator and no ordering are given. private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] { override def compare(a: K, b: K): Int = { val h1 = if (a == null) 0 else a.hashCode() @@ -194,6 +202,11 @@ private[spark] class ExternalSorter[K, V, C]( } } + /** + * Spill the current in-memory collection to disk if needed. + * + * @param usingMap whether we're using a map or buffer as our current in-memory collection + */ private def maybeSpill(usingMap: Boolean): Unit = { if (!spillingEnabled) { return @@ -201,11 +214,12 @@ private[spark] class ExternalSorter[K, V, C]( val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer + // TODO: factor this out of both here and ExternalAppendOnlyMap if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 && collection.estimateSize() >= myMemoryThreshold) { // TODO: This logic doesn't work if there are two external collections being used in the same - // task (e.g. to read shuffle output and write it out into another shuffle). + // task (e.g. to read shuffle output and write it out into another shuffle) [SPARK-2711] val currentSize = collection.estimateSize() var shouldSpill = false @@ -243,8 +257,9 @@ private[spark] class ExternalSorter[K, V, C]( val memorySize = collection.estimateSize() spillCount += 1 - logWarning("Spilling in-memory batch of %d MB to disk (%d spill%s so far)" - .format(memorySize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) + val threadId = Thread.currentThread().getId + logInfo("Thread %d spilling in-memory batch of %d MB to disk (%d spill%s so far)" + .format(threadId, memorySize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) val (blockId, file) = diskBlockManager.createTempBlock() var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize) var objectsWritten = 0 // Objects written since the last flush @@ -261,6 +276,7 @@ private[spark] class ExternalSorter[K, V, C]( val bytesWritten = writer.bytesWritten batchSizes.append(bytesWritten) _diskBytesSpilled += bytesWritten + objectsWritten = 0 } try { @@ -277,7 +293,6 @@ private[spark] class ExternalSorter[K, V, C]( if (objectsWritten == serializerBatchSize) { flush() - objectsWritten = 0 writer.close() writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize) } @@ -350,9 +365,10 @@ private[spark] class ExternalSorter[K, V, C]( val bufferedIters = iterators.filter(_.hasNext).map(_.buffered) type Iter = BufferedIterator[Product2[K, C]] val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] { + // Use the reverse of comparator.compare because PriorityQueue dequeues the max override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1) }) - heap.enqueue(bufferedIters: _*) + heap.enqueue(bufferedIters: _*) // Will contain only the iterators with hasNext = true new Iterator[Product2[K, C]] { override def hasNext: Boolean = !heap.isEmpty @@ -429,42 +445,21 @@ private[spark] class ExternalSorter[K, V, C]( } }.flatMap(i => i) } else { - // We have a total ordering. This means we can merge objects one by one as we read them - // from the iterators, without buffering all the ones that are "equal" to a given key. - // We do so with code similar to mergeSort, except our Iterator.next combines together all - // the elements with the given key. - val bufferedIters = iterators.filter(_.hasNext).map(_.buffered) - type Iter = BufferedIterator[Product2[K, C]] - val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] { - override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1) - }) - heap.enqueue(bufferedIters: _*) + // We have a total ordering, so the objects with the same key are sequential. new Iterator[Product2[K, C]] { - override def hasNext: Boolean = !heap.isEmpty + val sorted = mergeSort(iterators, comparator).buffered + + override def hasNext: Boolean = sorted.hasNext override def next(): Product2[K, C] = { if (!hasNext) { throw new NoSuchElementException } - val firstBuf = heap.dequeue() - val firstPair = firstBuf.next() - val k = firstPair._1 - var c = firstPair._2 - if (firstBuf.hasNext) { - heap.enqueue(firstBuf) - } - var shouldStop = false - while (!heap.isEmpty && !shouldStop) { - shouldStop = true // Stop unless we find another element with the same key - val newBuf = heap.dequeue() - while (newBuf.hasNext && newBuf.head._1 == k) { - val elem = newBuf.next() - c = mergeCombiners(c, elem._2) - shouldStop = false - } - if (newBuf.hasNext) { - heap.enqueue(newBuf) - } + val elem = sorted.next() + val k = elem._1 + var c = elem._2 + while (sorted.hasNext && sorted.head._1 == k) { + c = mergeCombiners(c, sorted.head._2) } (k, c) } @@ -635,7 +630,7 @@ private[spark] class ExternalSorter[K, V, C]( /** * Given a stream of ((partition, key), combiner) pairs *assumed to be sorted by partition ID*, - * group together the pairs for each for each partition into a sub-iterator. + * group together the pairs for each partition into a sub-iterator. * * @param data an iterator of elements, assumed to already be sorted by partition ID */