Skip to content

Commit

Permalink
SPARK-2532: Minimal shuffle consolidation fixes
Browse files Browse the repository at this point in the history
All changes from this PR are by @mridulm and are drawn from his work in apache#1609.
This patch is intended to fix all major issues related to shuffle file consolidation
that @mridulm found, while minimizing changes to the code, with the hope that it may
be more easily merged into 1.1.

This patch is **not** intended as a replacement for apache#1609, which provides many
additional benefits, including fixes to ExternalAppendOnlyMap, improvements to
DiskBlockObjectWriter's API, and several new unit tests.

If it is feasible to merge apache#1609 for the 1.1 deadline, that is a preferable option.
  • Loading branch information
aarondav committed Jul 31, 2014
1 parent e966284 commit 9160149
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,25 @@ private[spark] class HashShuffleWriter[K, V](
}

/** Close this writer, passing along whether the map completed */
override def stop(success: Boolean): Option[MapStatus] = {
override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
var success = initiallySuccess
try {
if (stopping) {
return None
}
stopping = true
if (success) {
try {
return Some(commitWritesAndBuildStatus())
Some(commitWritesAndBuildStatus())
} catch {
case e: Exception =>
success = false
revertWrites()
throw e
}
} else {
revertWrites()
return None
None
}
} finally {
// Release the writers back to the shuffle block manager.
Expand All @@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V](
var totalBytes = 0L
var totalTime = 0L
val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
writer.commitAndClose()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
Expand All @@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
private def revertWrites(): Unit = {
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
writer.revertPartialWritesAndClose()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
def isOpen: Boolean

/**
* Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit.
* Flush the partial writes and commit them as a single atomic block.
*/
def commit(): Long
def commitAndClose(): Unit

/**
* Reverts writes that haven't been flushed yet. Callers should invoke this function
* when there are runtime exceptions.
* when there are runtime exceptions. This method will not throw, though it may be
* unsuccessful in truncating written data.
*/
def revertPartialWrites()
def revertPartialWritesAndClose()

/**
* Writes an object.
Expand All @@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {

/**
* Returns the file segment of committed data that this Writer has written.
* This is only valid after commitAndClose() has been called.
*/
def fileSegment(): FileSegment

Expand Down Expand Up @@ -108,15 +109,14 @@ private[spark] class DiskBlockObjectWriter(
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private val initialPosition = file.length()
private var lastValidPosition = initialPosition
private var finalPosition: Long = -1
private var initialized = false
private var _timeWriting = 0L

override def open(): BlockObjectWriter = {
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
lastValidPosition = initialPosition
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
Expand Down Expand Up @@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(

override def isOpen: Boolean = objOut != null

override def commit(): Long = {
override def commitAndClose(): Unit = {
if (initialized) {
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
val prevPos = lastValidPosition
lastValidPosition = channel.position()
lastValidPosition - prevPos
} else {
// lastValidPosition is zero if stream is uninitialized
lastValidPosition
close()
finalPosition = file.length()
}
}

override def revertPartialWrites() {
if (initialized) {
// Discard current writes. We do this by flushing the outstanding writes and
// truncate the file to the last valid position.
objOut.flush()
bs.flush()
channel.truncate(lastValidPosition)
// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
if (initialized) {
objOut.flush()
bs.flush()
close()
}

val truncateStream = new FileOutputStream(file, true)
try {
truncateStream.getChannel.truncate(initialPosition)
} finally {
truncateStream.close()
}
} catch {
case e: Exception =>
logError("Uncaught exception while reverting partial writes to file " + file, e)
}
}

Expand All @@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter(

// Only valid if called after commit()
override def bytesWritten: Long = {
lastValidPosition - initialPosition
assert(finalPosition != -1, "bytesWritten is only valid after successful commit()")
finalPosition - initialPosition
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
if (consolidateShuffleFiles) {
if (success) {
val offsets = writers.map(_.fileSegment().offset)
fileGroup.recordMapOutput(mapId, offsets)
val lengths = writers.map(_.fileSegment().length)
fileGroup.recordMapOutput(mapId, offsets, lengths)
}
recycleFileGroup(fileGroup)
} else {
Expand Down Expand Up @@ -247,47 +248,48 @@ object ShuffleBlockManager {
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
*/
private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
private var numBlocks: Int = 0

/**
* Stores the absolute index of each mapId in the files of this group. For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()

/**
* Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
* This ordering allows us to compute block lengths by examining the following block offset.
* Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
* position in the file.
* Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
* reducer.
*/
private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]()
}

def numBlocks = mapIdToIndex.size
private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]()
}

def apply(bucketId: Int) = files(bucketId)

def recordMapOutput(mapId: Int, offsets: Array[Long]) {
def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
assert(offsets.length == lengths.length)
mapIdToIndex(mapId) = numBlocks
numBlocks += 1
for (i <- 0 until offsets.length) {
blockOffsetsByReducer(i) += offsets(i)
blockLengthsByReducer(i) += lengths(i)
}
}

/** Returns the FileSegment associated with the given map task, or None if no entry exists. */
def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
val file = files(reducerId)
val blockOffsets = blockOffsetsByReducer(reducerId)
val blockLengths = blockLengthsByReducer(reducerId)
val index = mapIdToIndex.getOrElse(mapId, -1)
if (index >= 0) {
val offset = blockOffsets(index)
val length =
if (index + 1 < numBlocks) {
blockOffsets(index + 1) - offset
} else {
file.length() - offset
}
assert(length >= 0)
val length = blockLengths(index)
Some(new FileSegment(file, offset, length))
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class ExternalAppendOnlyMap[K, V, C](

// Flush the disk writer's contents to disk, and update relevant variables
def flush() = {
writer.commit()
writer.commitAndClose()
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import java.io.{File, FileWriter}
import scala.collection.mutable
import scala.language.reflectiveCalls

import akka.actor.Props
import com.google.common.io.Files
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{AkkaUtils, Utils}

class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
private val testConf = new SparkConf(false)
Expand Down Expand Up @@ -121,6 +124,88 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
newFile.delete()
}

private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
assert (segment1.offset === segment2.offset)
assert (segment1.length === segment2.length)
}

test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") {

val serializer = new JavaSerializer(testConf)
val confCopy = testConf.clone
// reset after EACH object write. This is to ensure that there are bytes appended after
// an object is written. So if the codepaths assume writeObject is end of data, this should
// flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
confCopy.set("spark.serializer.objectStreamReset", "1")

val securityManager = new org.apache.spark.SecurityManager(confCopy)
// Do not use the shuffleBlockManager above !
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy,
securityManager)
val master = new BlockManagerMaster(
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))),
confCopy)
val store = new BlockManager("<driver>", actorSystem, master , serializer, confCopy,
securityManager, null)

try {

val shuffleManager = store.shuffleBlockManager

val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer)
for (writer <- shuffle1.writers) {
writer.write("test1")
writer.write("test2")
}
for (writer <- shuffle1.writers) {
writer.commitAndClose()
}

val shuffle1Segment = shuffle1.writers(0).fileSegment()
shuffle1.releaseWriters(success = true)

val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf))

for (writer <- shuffle2.writers) {
writer.write("test3")
writer.write("test4")
}
for (writer <- shuffle2.writers) {
writer.commitAndClose()
}
val shuffle2Segment = shuffle2.writers(0).fileSegment()
shuffle2.releaseWriters(success = true)

// Now comes the test :
// Write to shuffle 3; and close it, but before registering it, check if the file lengths for
// previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length
// of block based on remaining data in file : which could mess things up when there is concurrent read
// and writes happening to the same shuffle group.

val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf))
for (writer <- shuffle3.writers) {
writer.write("test3")
writer.write("test4")
}
for (writer <- shuffle3.writers) {
writer.commitAndClose()
}
// check before we register.
checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
shuffle3.releaseWriters(success = true)
checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
shuffleManager.removeShuffle(1)
} finally {

if (store != null) {
store.stop()
}
actorSystem.shutdown()
actorSystem.awaitTermination()
}
}

def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) {
val segment = diskBlockManager.getBlockLocation(blockId)
assert(segment.file.getName === filename)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ object StoragePerfTester {
for (i <- 1 to recordsPerMap) {
writers(i % numOutputSplits).write(writeData)
}
writers.map {w =>
w.commit()
writers.map { w =>
w.commitAndClose()
total.addAndGet(w.fileSegment().length)
w.close()
}

shuffle.releaseWriters(true)
Expand Down

0 comments on commit 9160149

Please sign in to comment.