Skip to content

Commit

Permalink
Rebase with sort-based shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
aarondav committed Jul 31, 2014
1 parent 9160149 commit 701d045
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
for (elem <- elements) {
writer.write(elem)
}
writer.commit()
writer.close()
writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,10 @@ private[spark] class ExternalSorter[K, V, C](
// How many elements we have in each partition
val elementsPerPartition = new Array[Long](numPartitions)

// Flush the disk writer's contents to disk, and update relevant variables
// Flush the disk writer's contents to disk, and update relevant variables.
// The writer is closed at the end of this process, and cannot be reused.
def flush() = {
writer.commit()
writer.commitAndClose()
val bytesWritten = writer.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
Expand All @@ -293,7 +294,6 @@ private[spark] class ExternalSorter[K, V, C](

if (objectsWritten == serializerBatchSize) {
flush()
writer.close()
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize)
}
}
Expand Down

0 comments on commit 701d045

Please sign in to comment.