Skip to content

Commit

Permalink
Various review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed Jul 30, 2014
1 parent a611159 commit d1c137f
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 67 deletions.
18 changes: 12 additions & 6 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
combiners.insertAll(iter)
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
// Update task metrics if context is not null
// TODO: Make context non optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
}
combiners.iterator
}
}
Expand Down Expand Up @@ -87,9 +90,12 @@ case class Aggregator[K, V, C] (
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
}
combiners.iterator
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
for ((it, depNum) <- rddIterators) {
map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
context.taskMetrics.memoryBytesSpilled += map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += map.diskBytesSpilled
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ private[spark] class SortShuffleManager extends ShuffleManager {
def getBlockLocation(blockId: ShuffleBlockId, diskManager: DiskBlockManager): FileSegment = {
// The block is actually going to be a range of a single map output file for this map, so
// figure out the ID of the consolidated file, then the offset within that from our index
val realId = ShuffleBlockId(blockId.shuffleId, blockId.mapId, 0)
val indexFile = diskManager.getFile(realId.name + ".index")
val consolidatedId = blockId.copy(reduceId = 0)
val indexFile = diskManager.getFile(consolidatedId.name + ".index")
val in = new DataInputStream(new FileInputStream(indexFile))
try {
in.skip(blockId.reduceId * 8)
val offset = in.readLong()
val nextOffset = in.readLong()
new FileSegment(diskManager.getFile(realId), offset, nextOffset - offset)
new FileSegment(diskManager.getFile(consolidatedId), offset, nextOffset - offset)
} finally {
in.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,24 @@ private[spark] class SortShuffleWriter[K, V, C](
private val numPartitions = dep.partitioner.numPartitions

private val blockManager = SparkEnv.get.blockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
private val ser = Serializer.getSerializer(dep.serializer.orNull)

private val conf = SparkEnv.get.conf
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024

private var sorter: ExternalSorter[K, V, _] = null
private var outputFile: File = null

// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
// we don't try deleting files, etc twice.
private var stopping = false

private var mapStatus: MapStatus = null

/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
// Get an iterator with the elements for each partition ID
val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = {
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
Expand Down Expand Up @@ -97,7 +102,7 @@ private[spark] class SortShuffleWriter[K, V, C](
totalTime += writer.timeWriting()
totalBytes += segment.length
} else {
// Don't create a new writer to avoid writing any headers and things like that
// The partition is empty; don't create a new writer to avoid writing headers, etc
offsets(id + 1) = offsets(id)
}
}
Expand All @@ -106,8 +111,8 @@ private[spark] class SortShuffleWriter[K, V, C](
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
context.taskMetrics.shuffleWriteMetrics = Some(shuffleMetrics)
context.taskMetrics.memoryBytesSpilled = sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = sorter.diskBytesSpilled
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled

// Write an index file with the offsets of each block, plus a final offset at the end for the
// end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure
Expand Down Expand Up @@ -153,6 +158,7 @@ private[spark] class SortShuffleWriter[K, V, C](
// Clean up our sorter, which may have its own intermediate files
if (sorter != null) {
sorter.stop()
sorter = null
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,12 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
}

@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
extends BlockId {
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}

@DeveloperApi
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
extends BlockId {
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
*
* @param rootDirs The directories to use for storing block files. Data will be hashed among these.
*/
private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String)
private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, rootDirs: String)
extends PathResolver with Logging {

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private val subDirsPerLocalDir = shuffleManager.conf.getInt("spark.diskStore.subDirectories", 64)

private val subDirsPerLocalDir =
shuffleBlockManager.conf.getInt("spark.diskStore.subDirectories", 64)

/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
Expand All @@ -60,12 +62,14 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
* Otherwise, we assume the Block is mapped to the whole file identified by the BlockId.
*/
def getBlockLocation(blockId: BlockId): FileSegment = {
val env = SparkEnv.get
val env = SparkEnv.get // NOTE: can be null in unit tests
if (blockId.isShuffle && env != null && env.shuffleManager.isInstanceOf[SortShuffleManager]) {
val sortShuffleManager = SparkEnv.get.shuffleManager.asInstanceOf[SortShuffleManager]
// For sort-based shuffle, let it figure out its blocks
val sortShuffleManager = env.shuffleManager.asInstanceOf[SortShuffleManager]
sortShuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId], this)
} else if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) {
shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
} else if (blockId.isShuffle && shuffleBlockManager.consolidateShuffleFiles) {
// For hash-based shuffle with consolidated files, ShuffleBlockManager takes care of this
shuffleBlockManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
} else {
val file = getFile(blockId.name)
new FileSegment(file, 0, file.length())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class ExternalAppendOnlyMap[K, V, C](
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()
private val threadId = Thread.currentThread().getId

/**
* Insert the given key and value into the map.
Expand Down Expand Up @@ -148,6 +147,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Atomically check whether there is sufficient memory in the global pool for
// this map to grow and, if possible, allocate the required amount
shuffleMemoryMap.synchronized {
val threadId = Thread.currentThread().getId
val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
val availableMemory = maxMemoryThreshold -
(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
Expand Down Expand Up @@ -187,7 +187,8 @@ class ExternalAppendOnlyMap[K, V, C](
*/
private def spill(mapSize: Long): Unit = {
spillCount += 1
logWarning("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)"
val threadId = Thread.currentThread().getId
logInfo("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)"
.format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ private[spark] class ExternalSorter[K, V, C](
private val conf = SparkEnv.get.conf
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024

// Size of object batches when reading/writing from serializers.
//
// Objects are written in batches, with each batch using its own serialization stream. This
// cuts down on the size of reference-tracking maps constructed when deserializing a stream.
//
// NOTE: Setting this too low can cause excessive copying when serializing, since some serializers
// grow internal data structures by growing + copying every time the number of objects doubles.
private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000)

private def getPartition(key: K): Int = {
Expand All @@ -93,8 +101,8 @@ private[spark] class ExternalSorter[K, V, C](
// Data structures to store in-memory objects before we spill. Depending on whether we have an
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
// store them in an array buffer.
var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
var buffer = new SizeTrackingPairBuffer[(Int, K), C]
private var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
private var buffer = new SizeTrackingPairBuffer[(Int, K), C]

// Number of pairs read from input since last spill; note that we count them even if a value is
// merged with a previous key in case we're doing something like groupBy where the result grows
Expand All @@ -118,11 +126,11 @@ private[spark] class ExternalSorter[K, V, C](
// How much of the shared memory pool this collection has claimed
private var myMemoryThreshold = 0L

// A comparator for keys K that orders them within a partition to allow partial aggregation.
// A comparator for keys K that orders them within a partition to allow aggregation or sorting.
// Can be a partial ordering by hash code if a total ordering is not provided through by the
// user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some
// non-equal keys also have this, so we need to do a later pass to find truly equal keys).
// Note that we ignore this if no aggregator is given.
// Note that we ignore this if no aggregator and no ordering are given.
private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] {
override def compare(a: K, b: K): Int = {
val h1 = if (a == null) 0 else a.hashCode()
Expand Down Expand Up @@ -194,18 +202,24 @@ private[spark] class ExternalSorter[K, V, C](
}
}

/**
* Spill the current in-memory collection to disk if needed.
*
* @param usingMap whether we're using a map or buffer as our current in-memory collection
*/
private def maybeSpill(usingMap: Boolean): Unit = {
if (!spillingEnabled) {
return
}

val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer

// TODO: factor this out of both here and ExternalAppendOnlyMap
if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
collection.estimateSize() >= myMemoryThreshold)
{
// TODO: This logic doesn't work if there are two external collections being used in the same
// task (e.g. to read shuffle output and write it out into another shuffle).
// task (e.g. to read shuffle output and write it out into another shuffle) [SPARK-2711]

val currentSize = collection.estimateSize()
var shouldSpill = false
Expand Down Expand Up @@ -243,8 +257,9 @@ private[spark] class ExternalSorter[K, V, C](
val memorySize = collection.estimateSize()

spillCount += 1
logWarning("Spilling in-memory batch of %d MB to disk (%d spill%s so far)"
.format(memorySize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val threadId = Thread.currentThread().getId
logInfo("Thread %d spilling in-memory batch of %d MB to disk (%d spill%s so far)"
.format(threadId, memorySize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize)
var objectsWritten = 0 // Objects written since the last flush
Expand All @@ -261,6 +276,7 @@ private[spark] class ExternalSorter[K, V, C](
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
objectsWritten = 0
}

try {
Expand All @@ -277,7 +293,6 @@ private[spark] class ExternalSorter[K, V, C](

if (objectsWritten == serializerBatchSize) {
flush()
objectsWritten = 0
writer.close()
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize)
}
Expand Down Expand Up @@ -350,9 +365,10 @@ private[spark] class ExternalSorter[K, V, C](
val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)
type Iter = BufferedIterator[Product2[K, C]]
val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {
// Use the reverse of comparator.compare because PriorityQueue dequeues the max
override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1)
})
heap.enqueue(bufferedIters: _*)
heap.enqueue(bufferedIters: _*) // Will contain only the iterators with hasNext = true
new Iterator[Product2[K, C]] {
override def hasNext: Boolean = !heap.isEmpty

Expand Down Expand Up @@ -429,42 +445,21 @@ private[spark] class ExternalSorter[K, V, C](
}
}.flatMap(i => i)
} else {
// We have a total ordering. This means we can merge objects one by one as we read them
// from the iterators, without buffering all the ones that are "equal" to a given key.
// We do so with code similar to mergeSort, except our Iterator.next combines together all
// the elements with the given key.
val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)
type Iter = BufferedIterator[Product2[K, C]]
val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {
override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1)
})
heap.enqueue(bufferedIters: _*)
// We have a total ordering, so the objects with the same key are sequential.
new Iterator[Product2[K, C]] {
override def hasNext: Boolean = !heap.isEmpty
val sorted = mergeSort(iterators, comparator).buffered

override def hasNext: Boolean = sorted.hasNext

override def next(): Product2[K, C] = {
if (!hasNext) {
throw new NoSuchElementException
}
val firstBuf = heap.dequeue()
val firstPair = firstBuf.next()
val k = firstPair._1
var c = firstPair._2
if (firstBuf.hasNext) {
heap.enqueue(firstBuf)
}
var shouldStop = false
while (!heap.isEmpty && !shouldStop) {
shouldStop = true // Stop unless we find another element with the same key
val newBuf = heap.dequeue()
while (newBuf.hasNext && newBuf.head._1 == k) {
val elem = newBuf.next()
c = mergeCombiners(c, elem._2)
shouldStop = false
}
if (newBuf.hasNext) {
heap.enqueue(newBuf)
}
val elem = sorted.next()
val k = elem._1
var c = elem._2
while (sorted.hasNext && sorted.head._1 == k) {
c = mergeCombiners(c, sorted.head._2)
}
(k, c)
}
Expand Down Expand Up @@ -635,7 +630,7 @@ private[spark] class ExternalSorter[K, V, C](

/**
* Given a stream of ((partition, key), combiner) pairs *assumed to be sorted by partition ID*,
* group together the pairs for each for each partition into a sub-iterator.
* group together the pairs for each partition into a sub-iterator.
*
* @param data an iterator of elements, assumed to already be sorted by partition ID
*/
Expand Down

0 comments on commit d1c137f

Please sign in to comment.