Skip to content

Commit

Permalink
Move existing logic for writing partitioned files into ExternalSorter
Browse files Browse the repository at this point in the history
Also renamed ExternalSorter.write(Iterator) to insertAll, to match
ExternalAppendOnlyMap
  • Loading branch information
mateiz committed Aug 7, 2014
1 parent a263a7e commit 31e5d7c
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[spark] class HashShuffleReader[K, C](
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.write(aggregatedIter)
sorter.insertAll(aggregatedIter)
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
sorter.iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[spark] class SortShuffleWriter[K, V, C](

private var sorter: ExternalSorter[K, V, _] = null
private var outputFile: File = null
private var indexFile: File = null

// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
Expand All @@ -58,77 +59,40 @@ private[spark] class SortShuffleWriter[K, V, C](
/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
// Get an iterator with the elements for each partition ID
val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = {
val externalSorter: ExternalSorter[K, V, _] = {
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
sorter = new ExternalSorter[K, V, C](
val sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.write(records)
sorter.partitionedIterator
sorter.insertAll(records)
sorter
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we
// don't care whether the keys get sorted in each partition; that will be done on the
// reduce side if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
val sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
sorter.write(records)
sorter.partitionedIterator
sorter.insertAll(records)
sorter
}
}

// Create a single shuffle file with reduce ID 0 that we'll write all results to. We'll later
// serve different ranges of this file using an index file that we create at the end.
val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0)
outputFile = blockManager.diskBlockManager.getFile(blockId)

// Track location of each range in the output file
val offsets = new Array[Long](numPartitions + 1)
val lengths = new Array[Long](numPartitions)

for ((id, elements) <- partitions) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize,
writeMetrics)
for (elem <- elements) {
writer.write(elem)
}
writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
} else {
// The partition is empty; don't create a new writer to avoid writing headers, etc
offsets(id + 1) = offsets(id)
}
}

context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
outputFile = blockManager.diskBlockManager.getFile(blockId)
indexFile = blockManager.diskBlockManager.getFile(blockId.name + ".index")

// Write an index file with the offsets of each block, plus a final offset at the end for the
// end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure
// out where each block begins and ends.

val diskBlockManager = blockManager.diskBlockManager
val indexFile = diskBlockManager.getFile(blockId.name + ".index")
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
try {
var i = 0
while (i < numPartitions + 1) {
out.writeLong(offsets(i))
i += 1
}
} finally {
out.close()
}
val partitionLengths = sorter.writePartitionedFile(blockId, context)

// Register our map output with the ShuffleBlockManager, which handles cleaning it over time
blockManager.shuffleBlockManager.addCompletedMap(dep.shuffleId, mapId, numPartitions)

mapStatus = new MapStatus(blockManager.blockManagerId,
lengths.map(MapOutputTracker.compressSize))
partitionLengths.map(MapOutputTracker.compressSize))
}

/** Close this writer, passing along whether the map completed */
Expand All @@ -145,6 +109,9 @@ private[spark] class SortShuffleWriter[K, V, C](
if (outputFile != null) {
outputFile.delete()
}
if (indexFile != null) {
indexFile.delete()
}
return None
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable

import com.google.common.io.ByteStreams

import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
import org.apache.spark._
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.storage.BlockId
import org.apache.spark.executor.ShuffleWriteMetrics
Expand Down Expand Up @@ -171,7 +171,7 @@ private[spark] class ExternalSorter[K, V, C](
elementsPerPartition: Array[Long])
private val spills = new ArrayBuffer[SpilledFile]

def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined

Expand Down Expand Up @@ -645,6 +645,72 @@ private[spark] class ExternalSorter[K, V, C](
*/
def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2)

/**
* Write all the data added into this ExternalSorter into a file in the disk store, creating
* an .index file for it as well with the offsets of each partition. This is called by the
* SortShuffleWriter and can go through an efficient path of just concatenating binary files
* if we decided to avoid merge-sorting.
*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
* @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
* @return array of lengths, in bytes, for each partition of the file (for map output tracker)
*/
def writePartitionedFile(blockId: BlockId, context: TaskContext): Array[Long] = {
val outputFile = blockManager.diskBlockManager.getFile(blockId)

// Track location of each range in the output file
val offsets = new Array[Long](numPartitions + 1)
val lengths = new Array[Long](numPartitions)

// Statistics
var totalBytes = 0L
var totalTime = 0L

for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize)
for (elem <- elements) {
writer.write(elem)
}
writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
totalTime += writer.timeWriting()
totalBytes += segment.length
} else {
// The partition is empty; don't create a new writer to avoid writing headers, etc
offsets(id + 1) = offsets(id)
}
}

val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
context.taskMetrics.shuffleWriteMetrics = Some(shuffleMetrics)
context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += diskBytesSpilled

// Write an index file with the offsets of each block, plus a final offset at the end for the
// end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure
// out where each block begins and ends.

val diskBlockManager = blockManager.diskBlockManager
val indexFile = diskBlockManager.getFile(blockId.name + ".index")
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
try {
var i = 0
while (i < numPartitions + 1) {
out.writeLong(offsets(i))
i += 1
}
} finally {
out.close()
}

lengths
}

def stop(): Unit = {
spills.foreach(s => s.file.delete())
spills.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,28 +86,28 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
// Both aggregator and ordering
val sorter = new ExternalSorter[Int, Int, Int](
Some(agg), Some(new HashPartitioner(7)), Some(ord), None)
sorter.write(elements.iterator)
sorter.insertAll(elements.iterator)
assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter.stop()

// Only aggregator
val sorter2 = new ExternalSorter[Int, Int, Int](
Some(agg), Some(new HashPartitioner(7)), None, None)
sorter2.write(elements.iterator)
sorter2.insertAll(elements.iterator)
assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter2.stop()

// Only ordering
val sorter3 = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(7)), Some(ord), None)
sorter3.write(elements.iterator)
sorter3.insertAll(elements.iterator)
assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter3.stop()

// Neither aggregator nor ordering
val sorter4 = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(7)), None, None)
sorter4.write(elements.iterator)
sorter4.insertAll(elements.iterator)
assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter4.stop()
}
Expand All @@ -124,7 +124,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {

val sorter = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(7)), None, None)
sorter.write(elements)
sorter.insertAll(elements)
assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled
val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList))
assert(iter.next() === (0, Nil))
Expand Down Expand Up @@ -287,13 +287,13 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager

val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
sorter.write((0 until 100000).iterator.map(i => (i, i)))
sorter.insertAll((0 until 100000).iterator.map(i => (i, i)))
assert(diskBlockManager.getAllFiles().length > 0)
sorter.stop()
assert(diskBlockManager.getAllBlocks().length === 0)

val sorter2 = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
sorter2.write((0 until 100000).iterator.map(i => (i, i)))
sorter2.insertAll((0 until 100000).iterator.map(i => (i, i)))
assert(diskBlockManager.getAllFiles().length > 0)
assert(sorter2.iterator.toSet === (0 until 100000).map(i => (i, i)).toSet)
sorter2.stop()
Expand All @@ -309,7 +309,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {

val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
intercept[SparkException] {
sorter.write((0 until 100000).iterator.map(i => {
sorter.insertAll((0 until 100000).iterator.map(i => {
if (i == 99990) {
throw new SparkException("Intentional failure")
}
Expand Down Expand Up @@ -365,7 +365,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test", conf)

val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
sorter.write((0 until 100000).iterator.map(i => (i / 4, i)))
sorter.insertAll((0 until 100000).iterator.map(i => (i / 4, i)))
val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
val expected = (0 until 3).map(p => {
(p, (0 until 100000).map(i => (i / 4, i)).filter(_._1 % 3 == p).toSet)
Expand All @@ -381,7 +381,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {

val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None)
sorter.write((0 until 100).iterator.map(i => (i / 2, i)))
sorter.insertAll((0 until 100).iterator.map(i => (i / 2, i)))
val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
val expected = (0 until 3).map(p => {
(p, (0 until 50).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
Expand All @@ -397,7 +397,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {

val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None)
sorter.write((0 until 100000).iterator.map(i => (i / 2, i)))
sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i)))
val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
val expected = (0 until 3).map(p => {
(p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
Expand All @@ -414,7 +414,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
val ord = implicitly[Ordering[Int]]
val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
sorter.write((0 until 100000).iterator.map(i => (i / 2, i)))
sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i)))
val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
val expected = (0 until 3).map(p => {
(p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
Expand All @@ -431,7 +431,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
val ord = implicitly[Ordering[Int]]
val sorter = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(3)), Some(ord), None)
sorter.write((0 until 100).iterator.map(i => (i, i)))
sorter.insertAll((0 until 100).iterator.map(i => (i, i)))
val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq
val expected = (0 until 3).map(p => {
(p, (0 until 100).map(i => (i, i)).filter(_._1 % 3 == p).toSeq)
Expand All @@ -448,7 +448,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
val ord = implicitly[Ordering[Int]]
val sorter = new ExternalSorter[Int, Int, Int](
None, Some(new HashPartitioner(3)), Some(ord), None)
sorter.write((0 until 100000).iterator.map(i => (i, i)))
sorter.insertAll((0 until 100000).iterator.map(i => (i, i)))
val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq
val expected = (0 until 3).map(p => {
(p, (0 until 100000).map(i => (i, i)).filter(_._1 % 3 == p).toSeq)
Expand Down Expand Up @@ -495,7 +495,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
val toInsert = (1 to 100000).iterator.map(_.toString).map(s => (s, s)) ++
collisionPairs.iterator ++ collisionPairs.iterator.map(_.swap)

sorter.write(toInsert)
sorter.insertAll(toInsert)

// A map of collision pairs in both directions
val collisionPairsMap = (collisionPairs ++ collisionPairs.map(_.swap)).toMap
Expand Down Expand Up @@ -524,7 +524,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
// problems if the map fails to group together the objects with the same code (SPARK-2043).
val toInsert = for (i <- 1 to 10; j <- 1 to 10000) yield (FixedHashObject(j, j % 2), 1)
sorter.write(toInsert.iterator)
sorter.insertAll(toInsert.iterator)

val it = sorter.iterator
var count = 0
Expand All @@ -548,7 +548,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
val agg = new Aggregator[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners)
val sorter = new ExternalSorter[Int, Int, ArrayBuffer[Int]](Some(agg), None, None, None)

sorter.write((1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue)))
sorter.insertAll((1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue)))

val it = sorter.iterator
while (it.hasNext) {
Expand All @@ -572,7 +572,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
val sorter = new ExternalSorter[String, String, ArrayBuffer[String]](
Some(agg), None, None, None)

sorter.write((1 to 100000).iterator.map(i => (i.toString, i.toString)) ++ Iterator(
sorter.insertAll((1 to 100000).iterator.map(i => (i.toString, i.toString)) ++ Iterator(
(null.asInstanceOf[String], "1"),
("1", null.asInstanceOf[String]),
(null.asInstanceOf[String], null.asInstanceOf[String])
Expand Down

0 comments on commit 31e5d7c

Please sign in to comment.