diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 1923f7c71a48f..45d3b8b9b8725 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -65,7 +65,8 @@ private[spark] class HashShuffleWriter[K, V]( } /** Close this writer, passing along whether the map completed */ - override def stop(success: Boolean): Option[MapStatus] = { + override def stop(initiallySuccess: Boolean): Option[MapStatus] = { + var success = initiallySuccess try { if (stopping) { return None @@ -73,15 +74,16 @@ private[spark] class HashShuffleWriter[K, V]( stopping = true if (success) { try { - return Some(commitWritesAndBuildStatus()) + Some(commitWritesAndBuildStatus()) } catch { case e: Exception => + success = false revertWrites() throw e } } else { revertWrites() - return None + None } } finally { // Release the writers back to the shuffle block manager. @@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V]( var totalBytes = 0L var totalTime = 0L val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter => - writer.commit() - writer.close() + writer.commitAndClose() val size = writer.fileSegment().length totalBytes += size totalTime += writer.timeWriting() @@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V]( private def revertWrites(): Unit = { if (shuffle != null && shuffle.writers != null) { for (writer <- shuffle.writers) { - writer.revertPartialWrites() - writer.close() + writer.revertPartialWritesAndClose() } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index a2687e6be4e34..c1c8ec0812f0e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { def isOpen: Boolean /** - * Flush the partial writes and commit them as a single atomic block. Return the - * number of bytes written for this commit. + * Flush the partial writes and commit them as a single atomic block. */ - def commit(): Long + def commitAndClose(): Unit /** * Reverts writes that haven't been flushed yet. Callers should invoke this function - * when there are runtime exceptions. + * when there are runtime exceptions. This method will not throw, though it may be + * unsuccessful in truncating written data. */ - def revertPartialWrites() + def revertPartialWritesAndClose() /** * Writes an object. @@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { /** * Returns the file segment of committed data that this Writer has written. + * This is only valid after commitAndClose() has been called. */ def fileSegment(): FileSegment @@ -108,7 +109,7 @@ private[spark] class DiskBlockObjectWriter( private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null private val initialPosition = file.length() - private var lastValidPosition = initialPosition + private var finalPosition: Long = -1 private var initialized = false private var _timeWriting = 0L @@ -116,7 +117,6 @@ private[spark] class DiskBlockObjectWriter( fos = new FileOutputStream(file, true) ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() - lastValidPosition = initialPosition bs = compressStream(new BufferedOutputStream(ts, bufferSize)) objOut = serializer.newInstance().serializeStream(bs) initialized = true @@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter( override def isOpen: Boolean = objOut != null - override def commit(): Long = { + override def commitAndClose(): Unit = { if (initialized) { // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the // serializer stream and the lower level stream. objOut.flush() bs.flush() - val prevPos = lastValidPosition - lastValidPosition = channel.position() - lastValidPosition - prevPos - } else { - // lastValidPosition is zero if stream is uninitialized - lastValidPosition + close() + finalPosition = file.length() } } - override def revertPartialWrites() { - if (initialized) { - // Discard current writes. We do this by flushing the outstanding writes and - // truncate the file to the last valid position. - objOut.flush() - bs.flush() - channel.truncate(lastValidPosition) + // Discard current writes. We do this by flushing the outstanding writes and then + // truncating the file to its initial position. + override def revertPartialWritesAndClose() { + try { + if (initialized) { + objOut.flush() + bs.flush() + close() + } + + val truncateStream = new FileOutputStream(file, true) + try { + truncateStream.getChannel.truncate(initialPosition) + } finally { + truncateStream.close() + } + } catch { + case e: Exception => + logError("Uncaught exception while reverting partial writes to file " + file, e) } } @@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter( // Only valid if called after commit() override def bytesWritten: Long = { - lastValidPosition - initialPosition + assert(finalPosition != -1, "bytesWritten is only valid after successful commit()") + finalPosition - initialPosition } } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 7beb55c411e71..28aa35bc7e147 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { if (consolidateShuffleFiles) { if (success) { val offsets = writers.map(_.fileSegment().offset) - fileGroup.recordMapOutput(mapId, offsets) + val lengths = writers.map(_.fileSegment().length) + fileGroup.recordMapOutput(mapId, offsets, lengths) } recycleFileGroup(fileGroup) } else { @@ -247,6 +248,8 @@ object ShuffleBlockManager { * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. */ private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) { + private var numBlocks: Int = 0 + /** * Stores the absolute index of each mapId in the files of this group. For instance, * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0. @@ -254,23 +257,27 @@ object ShuffleBlockManager { private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]() /** - * Stores consecutive offsets of blocks into each reducer file, ordered by position in the file. - * This ordering allows us to compute block lengths by examining the following block offset. + * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by + * position in the file. * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every * reducer. */ private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { new PrimitiveVector[Long]() } - - def numBlocks = mapIdToIndex.size + private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { + new PrimitiveVector[Long]() + } def apply(bucketId: Int) = files(bucketId) - def recordMapOutput(mapId: Int, offsets: Array[Long]) { + def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) { + assert(offsets.length == lengths.length) mapIdToIndex(mapId) = numBlocks + numBlocks += 1 for (i <- 0 until offsets.length) { blockOffsetsByReducer(i) += offsets(i) + blockLengthsByReducer(i) += lengths(i) } } @@ -278,16 +285,11 @@ object ShuffleBlockManager { def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = { val file = files(reducerId) val blockOffsets = blockOffsetsByReducer(reducerId) + val blockLengths = blockLengthsByReducer(reducerId) val index = mapIdToIndex.getOrElse(mapId, -1) if (index >= 0) { val offset = blockOffsets(index) - val length = - if (index + 1 < numBlocks) { - blockOffsets(index + 1) - offset - } else { - file.length() - offset - } - assert(length >= 0) + val length = blockLengths(index) Some(new FileSegment(file, offset, length)) } else { None diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index b34512ef9eb60..cb67a1c039f20 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -199,7 +199,7 @@ class ExternalAppendOnlyMap[K, V, C]( // Flush the disk writer's contents to disk, and update relevant variables def flush() = { - writer.commit() + writer.commitAndClose() val bytesWritten = writer.bytesWritten batchSizes.append(bytesWritten) _diskBytesSpilled += bytesWritten diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index aaa7714049732..985ac9394738c 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -22,11 +22,14 @@ import java.io.{File, FileWriter} import scala.collection.mutable import scala.language.reflectiveCalls +import akka.actor.Props import com.google.common.io.Files import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import org.apache.spark.SparkConf -import org.apache.spark.util.Utils +import org.apache.spark.scheduler.LiveListenerBus +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.{AkkaUtils, Utils} class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll { private val testConf = new SparkConf(false) @@ -121,6 +124,88 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before newFile.delete() } + private def checkSegments(segment1: FileSegment, segment2: FileSegment) { + assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath) + assert (segment1.offset === segment2.offset) + assert (segment1.length === segment2.length) + } + + test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") { + + val serializer = new JavaSerializer(testConf) + val confCopy = testConf.clone + // reset after EACH object write. This is to ensure that there are bytes appended after + // an object is written. So if the codepaths assume writeObject is end of data, this should + // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc. + confCopy.set("spark.serializer.objectStreamReset", "1") + + val securityManager = new org.apache.spark.SecurityManager(confCopy) + // Do not use the shuffleBlockManager above ! + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy, + securityManager) + val master = new BlockManagerMaster( + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))), + confCopy) + val store = new BlockManager("", actorSystem, master , serializer, confCopy, + securityManager, null) + + try { + + val shuffleManager = store.shuffleBlockManager + + val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer) + for (writer <- shuffle1.writers) { + writer.write("test1") + writer.write("test2") + } + for (writer <- shuffle1.writers) { + writer.commitAndClose() + } + + val shuffle1Segment = shuffle1.writers(0).fileSegment() + shuffle1.releaseWriters(success = true) + + val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf)) + + for (writer <- shuffle2.writers) { + writer.write("test3") + writer.write("test4") + } + for (writer <- shuffle2.writers) { + writer.commitAndClose() + } + val shuffle2Segment = shuffle2.writers(0).fileSegment() + shuffle2.releaseWriters(success = true) + + // Now comes the test : + // Write to shuffle 3; and close it, but before registering it, check if the file lengths for + // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length + // of block based on remaining data in file : which could mess things up when there is concurrent read + // and writes happening to the same shuffle group. + + val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf)) + for (writer <- shuffle3.writers) { + writer.write("test3") + writer.write("test4") + } + for (writer <- shuffle3.writers) { + writer.commitAndClose() + } + // check before we register. + checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0))) + shuffle3.releaseWriters(success = true) + checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0))) + shuffleManager.removeShuffle(1) + } finally { + + if (store != null) { + store.stop() + } + actorSystem.shutdown() + actorSystem.awaitTermination() + } + } + def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) { val segment = diskBlockManager.getBlockLocation(blockId) assert(segment.file.getName === filename) diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index 8e8c35615a711..8a05fcb449aa6 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -61,10 +61,9 @@ object StoragePerfTester { for (i <- 1 to recordsPerMap) { writers(i % numOutputSplits).write(writeData) } - writers.map {w => - w.commit() + writers.map { w => + w.commitAndClose() total.addAndGet(w.fileSegment().length) - w.close() } shuffle.releaseWriters(true)