Skip to content

Commit

Permalink
Discard changes for FetchFailedException and minor modification
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Nov 3, 2014
1 parent 4e946f7 commit f7e1faf
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,7 @@ private[spark] class FetchFailedException(
message: String)
extends Exception(message) {

def this(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, e: Throwable) {
this(bmAddress, shuffleId, mapId, reduceId, e.getMessage)
initCause(e)
}

override def getMessage: String =
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)

def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
Utils.exceptionString(this))
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
}

/**
Expand All @@ -54,7 +45,4 @@ private[spark] class MetadataFetchFailedException(
shuffleId: Int,
reduceId: Int,
message: String)
extends FetchFailedException(null, shuffleId, -1, reduceId, message) {

override def getMessage: String = message
}
extends FetchFailedException(null, shuffleId, -1, reduceId, message)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.{CompletionIterator, Utils}

private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
Expand Down Expand Up @@ -64,7 +64,8 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
Utils.exceptionString(e))
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block", e)
Expand Down

0 comments on commit f7e1faf

Please sign in to comment.