Skip to content

Commit

Permalink
TEZ-4336: ShuffleScheduler should try to report the original exceptio…
Browse files Browse the repository at this point in the history
…n (when shuffle becomes unhealthy) (apache#155) (Laszlo Bodor reviewed by Rajesh Balamohan)

(cherry picked from commit 6863a2d)
  • Loading branch information
abstractdog authored and prabhjyotsingh committed Nov 11, 2024
1 parent f74dd74 commit 436a790
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class InputAttemptFetchFailure {
private final InputAttemptIdentifier inputAttemptIdentifier;
private final boolean isLocalFetch;
private final boolean isDiskErrorAtSource;
private Throwable cause = null;

public InputAttemptFetchFailure(InputAttemptIdentifier inputAttemptIdentifier) {
this(inputAttemptIdentifier, false, false);
Expand Down Expand Up @@ -112,4 +113,13 @@ public String toString() {
return String.format("%s, isLocalFetch: %s, isDiskErrorAtSource: %s",
inputAttemptIdentifier.toString(), isLocalFetch, isDiskErrorAtSource);
}

public InputAttemptFetchFailure withCause(Throwable throwable) {
this.cause = throwable;
return this;
}

public Throwable getCause() {
return cause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ boolean setupConnection(MapHost host, Collection<InputAttemptIdentifier> attempt
for (InputAttemptIdentifier left : remaining.values()) {
// Need to be handling temporary glitches ..
// Report read error to the AM to trigger source failure heuristics
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(left), host, connectSucceeded,
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(left).withCause(ie), host, connectSucceeded,
!connectSucceeded);
}
return false;
Expand Down Expand Up @@ -738,7 +738,7 @@ protected void setupLocalDiskFetch(MapHost host) throws InterruptedException {
if (!stopped) {
hasFailures = true;
ioErrs.increment(1);
scheduler.copyFailed(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttemptId),
scheduler.copyFailed(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttemptId).withCause(e),
host, true, false);
LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
host.getHostIdentifier(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ enum ShuffleErrors {
private final Referee referee;
@VisibleForTesting
final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier,IntWritable>();

final Set<HostPort> uniqueHosts = Sets.newHashSet();
private final Map<HostPort,IntWritable> hostFailures = new HashMap<HostPort,IntWritable>();
private final InputContext inputContext;
Expand Down Expand Up @@ -792,7 +793,7 @@ public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHo
}

//Restart consumer in case shuffle is not healthy
if (!isShuffleHealthy(fetchFailure.getInputAttemptIdentifier())) {
if (!isShuffleHealthy(fetchFailure)) {
return;
}

Expand Down Expand Up @@ -1006,8 +1007,8 @@ private boolean isFetcherHealthy(String logContext) {
return fetcherHealthy;
}

boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) {

boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
InputAttemptIdentifier srcAttempt = fetchFailure.getInputAttemptIdentifier();
if (isAbortLimitExceeedFor(srcAttempt)) {
return false;
}
Expand Down Expand Up @@ -1049,13 +1050,15 @@ boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) {
+ ", pendingInputs=" + (numInputs - doneMaps)
+ ", fetcherHealthy=" + fetcherHealthy
+ ", reducerProgressedEnough=" + reducerProgressedEnough
+ ", reducerStalled=" + reducerStalled);
+ ", reducerStalled=" + reducerStalled
+ ", hostFailures=" + hostFailures)
+ "]";
LOG.error(errorMsg);
if (LOG.isDebugEnabled()) {
LOG.debug("Host failures=" + hostFailures.keySet());
}
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
exceptionReporter.reportException(new IOException(errorMsg));
exceptionReporter.reportException(new IOException(errorMsg, fetchFailure.getCause()));
return false;
}
return true;
Expand Down

0 comments on commit 436a790

Please sign in to comment.