Skip to content

Commit

Permalink
Fix rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
mateiz committed Aug 7, 2014
1 parent 10233af commit 88cf26a
Showing 1 changed file with 4 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,10 @@ private[spark] class ExternalSorter[K, V, C](

// Create our file writers if we haven't done so yet
if (partitionWriters == null) {
curWriteMetrics = new ShuffleWriteMetrics()
partitionWriters = Array.fill(numPartitions) {
val (blockId, file) = diskBlockManager.createTempBlock()
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize).open()
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open()
}
}

Expand Down Expand Up @@ -734,10 +735,6 @@ private[spark] class ExternalSorter[K, V, C](
val offsets = new Array[Long](numPartitions + 1)
val lengths = new Array[Long](numPartitions)

// Statistics
var totalBytes = 0L
var totalTime = 0L

if (bypassMergeSort && partitionWriters != null) {
// We decided to write separate files for each partition, so just concatenate them. To keep
// this simple we spill out the current in-memory collection so that everything is in files.
Expand Down Expand Up @@ -769,27 +766,22 @@ private[spark] class ExternalSorter[K, V, C](
// partition and just write everything directly.
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize)
val writer = blockManager.getDiskWriter(
blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get)
for (elem <- elements) {
writer.write(elem)
}
writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
totalTime += writer.timeWriting()
totalBytes += segment.length
} else {
// The partition is empty; don't create a new writer to avoid writing headers, etc
offsets(id + 1) = offsets(id)
}
}
}

val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
context.taskMetrics.shuffleWriteMetrics = Some(shuffleMetrics)
context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += diskBytesSpilled

Expand Down

0 comments on commit 88cf26a

Please sign in to comment.