Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
TEZ-4336: ShuffleScheduler should try to report the original exceptio…
Browse files Browse the repository at this point in the history
…n (when shuffle becomes unhealthy) (apache#155) (Laszlo Bodor reviewed by Rajesh Balamohan)

(cherry picked from commit 6863a2d)
(cherry picked from commit 436a790)
abstractdog authored and prabhjyotsingh committed Nov 20, 2024

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 4255183 commit 12b2d79
Showing 3 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ public class InputAttemptFetchFailure {
private final InputAttemptIdentifier inputAttemptIdentifier;
private final boolean isLocalFetch;
private final boolean isDiskErrorAtSource;
private Throwable cause = null;

public InputAttemptFetchFailure(InputAttemptIdentifier inputAttemptIdentifier) {
this(inputAttemptIdentifier, false, false);
@@ -112,4 +113,13 @@ public String toString() {
return String.format("%s, isLocalFetch: %s, isDiskErrorAtSource: %s",
inputAttemptIdentifier.toString(), isLocalFetch, isDiskErrorAtSource);
}

public InputAttemptFetchFailure withCause(Throwable throwable) {
this.cause = throwable;
return this;
}

public Throwable getCause() {
return cause;
}
}
Original file line number Diff line number Diff line change
@@ -378,7 +378,7 @@ boolean setupConnection(MapHost host, Collection<InputAttemptIdentifier> attempt
for (InputAttemptIdentifier left : remaining.values()) {
// Need to be handling temporary glitches ..
// Report read error to the AM to trigger source failure heuristics
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(left), host, connectSucceeded,
scheduler.copyFailed(InputAttemptFetchFailure.fromAttempt(left).withCause(ie), host, connectSucceeded,
!connectSucceeded);
}
return false;
@@ -738,7 +738,7 @@ protected void setupLocalDiskFetch(MapHost host) throws InterruptedException {
if (!stopped) {
hasFailures = true;
ioErrs.increment(1);
scheduler.copyFailed(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttemptId),
scheduler.copyFailed(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttemptId).withCause(e),
host, true, false);
LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
host.getHostIdentifier(), e);
Original file line number Diff line number Diff line change
@@ -178,6 +178,7 @@ enum ShuffleErrors {
private final Referee referee;
@VisibleForTesting
final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier,IntWritable>();

final Set<HostPort> uniqueHosts = Sets.newHashSet();
private final Map<HostPort,IntWritable> hostFailures = new HashMap<HostPort,IntWritable>();
private final InputContext inputContext;
@@ -792,7 +793,7 @@ public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHo
}

//Restart consumer in case shuffle is not healthy
if (!isShuffleHealthy(fetchFailure.getInputAttemptIdentifier())) {
if (!isShuffleHealthy(fetchFailure)) {
return;
}

@@ -1006,8 +1007,8 @@ private boolean isFetcherHealthy(String logContext) {
return fetcherHealthy;
}

boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) {

boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
InputAttemptIdentifier srcAttempt = fetchFailure.getInputAttemptIdentifier();
if (isAbortLimitExceeedFor(srcAttempt)) {
return false;
}
@@ -1049,13 +1050,15 @@ boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) {
+ ", pendingInputs=" + (numInputs - doneMaps)
+ ", fetcherHealthy=" + fetcherHealthy
+ ", reducerProgressedEnough=" + reducerProgressedEnough
+ ", reducerStalled=" + reducerStalled);
+ ", reducerStalled=" + reducerStalled
+ ", hostFailures=" + hostFailures)
+ "]";
LOG.error(errorMsg);
if (LOG.isDebugEnabled()) {
LOG.debug("Host failures=" + hostFailures.keySet());
}
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
exceptionReporter.reportException(new IOException(errorMsg));
exceptionReporter.reportException(new IOException(errorMsg, fetchFailure.getCause()));
return false;
}
return true;

0 comments on commit 12b2d79

Please sign in to comment.