Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2532] Consolidated shuffle fixes #1609

Closed
wants to merge 10 commits into from
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ object SparkEnv extends Logging {
// Listener bus is only used on the driver
if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
// When running tests in local mode, previous shutdown of sc could have marked it as
// VM shutdown. recheck and disable shutdown flag.
// Utils.doShutdownCheck()
}

val securityManager = new SecurityManager(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In
*/
def writeObject[T: ClassTag](t: T): SerializationStream = {
objOut.writeObject(t)
if (counterReset > 0 && counter >= counterReset) {
if (counterReset >= 0 && counter >= counterReset) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done only to support adding marker after each object has been written.
Only practical reason to do this is to test that part.

objOut.reset()
counter = 0
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class HashShuffleWriter[K, V](
}

/** Close this writer, passing along whether the map completed */
override def stop(success: Boolean): Option[MapStatus] = {
override def stop(successInput: Boolean): Option[MapStatus] = {
var success = successInput
try {
if (stopping) {
return None
Expand All @@ -71,7 +72,8 @@ class HashShuffleWriter[K, V](
try {
return Some(commitWritesAndBuildStatus())
} catch {
case e: Exception =>
case e: Throwable =>
success = false // for finally block
revertWrites()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If success != false, then release writers will attempt to register.

throw e
}
Expand All @@ -96,9 +98,9 @@ class HashShuffleWriter[K, V](
var totalBytes = 0L
var totalTime = 0L
val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
writer.commitAndClose()
val size = writer.fileSegment().length
assert(size >= 0)
totalBytes += size
totalTime += writer.timeWriting()
MapOutputTracker.compressSize(size)
Expand All @@ -116,8 +118,13 @@ class HashShuffleWriter[K, V](
private def revertWrites(): Unit = {
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
try {
writer.revertPartialWritesAndClose()
} catch {
// Ensure that all revert's get done - log exception and continue
case ex: Exception =>
logError("Exception reverting/closing writers", ex)
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert/close can throw exception - causing other writers to be left hanging.
Hence, log and continue

}
}
Expand Down
Loading