Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18657. Tune ABFS create() retry logic #5462

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.services.AbfsErrors;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -611,7 +612,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
final String umask,
final boolean isAppendBlob,
TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsRestOperation op;
AbfsRestOperation op = null;

try {
// Trigger a create with overwrite=false first so that eTag fetch can be
Expand All @@ -621,38 +622,79 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
isAppendBlob, null, tracingContext);

} catch (AbfsRestOperationException e) {
LOG.debug("Failed to create {}", relativePath, e);
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
// File pre-exists, fetch eTag
LOG.debug("Fetching etag of {}", relativePath);
try {
op = client.getPathStatus(relativePath, false, tracingContext);
} catch (AbfsRestOperationException ex) {
LOG.debug("Failed to to getPathStatus {}", relativePath, ex);
if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @steveloughran, Given Hadoop is single writer semantic, would it be correct to expect that as part of job parallelization only one worker process should try to create a file ? As this check for FileNotFound is post an attempt to create the file with overwrite=false, which inturn failed with conflict indicating file was just present, concurrent operation on the file is indeed confirmed.

Its quite possible that if we let this create proceed, some other operation such as delete can kick in later on as well. Below code that throws exception at the first indication of parallel activity would be the right thing to do ?

As the workload pattern is not honoring the single writer semantic I feel we should retain the logic to throw ConcurrentWriteOperationDetectedException.

// Is a parallel access case, as file which was found to be
// present went missing by this request.
throw new ConcurrentWriteOperationDetectedException(
"Parallel access to the create path detected. Failing request "
+ "to honor single writer semantics");
// this means the other thread deleted it and the conflict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition in the job, and developer should be informed about the same. @snvijaya @anmolanmol1234 @sreeb-msft , what you feel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do; text will indicate this may be due to a lease on the parent dir too.

// has implicitly been resolved.
LOG.debug("File at {} has been deleted; creation can continue", relativePath);
} else {
throw ex;
}
}

String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);

// get the etag of the file at the destination; this will be made
// the condition of the second createPath call.
String eTag = op != null
? op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG)
: null;

final boolean overwrite = eTag != null;
final String action = overwrite ? "overwrite" : "create";
LOG.debug("Attempting to {} file {} with etag of {}",
action,
relativePath, eTag);
try {
// overwrite only if eTag matches with the file properties fetched befpre
op = client.createPath(relativePath, true, true, permission, umask,
// overwrite only if eTag matches with the file properties fetched or the file
// was deleted and there is no etag.
// if the etag was not retrieved, overwrite is still false, so will fail
// if another process has just created the file

op = client.createPath(relativePath, true, overwrite, permission, umask,
isAppendBlob, eTag, tracingContext);

} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
final int sc = ex.getStatusCode();

// Create a detailed error message.
final String details = "Path =\"" + relativePath + "\""
+ "; Status code =" + sc
+ "; etag = \"" + eTag + "\""
+ "; operation = \"" + action + "\""
+ "; error =" + ex.getErrorMessage();
if (sc == HttpURLConnection.HTTP_PRECON_FAILED
|| sc == HttpURLConnection.HTTP_CONFLICT) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good that we have taken care of 409 which can come when due to etag!=null -> overwrite argument to client.createPath = false.

would be awesome if we can put it in comments, and also have log according to it.
log1: about some file is there whose eTag is with our process. When we went back to createPath with the same eTag, some other process had replaced that file which would lead to 412, which is present in the added code:

 final ConcurrentWriteOperationDetectedException ex2 =
                new ConcurrentWriteOperationDetectedException(
                    AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED
                        + " Path =\"" + relativePath+ "\""
                        + "; Status code =" + sc
                        + "; etag = \"" + eTag + "\""
                        + "; error =" + ex.getErrorMessage());

suggestion to add log2: where in when we searched for etag, there was no file, now when we will try to createPath with overWrite = false, if it will give 409 in case some other process created a file on same path.

Also, in case of 409, it is similar to the case we started with in this method. Should we get into 409 control as in

for a number of times. Like if we keep threshold as 2. If it happens that it gets 409 at this line, we will try once again to handle 409, post that we fail. @snvijaya @anmolanmol1234 @sreeb-msft, what you feel.

// Is a parallel access case, as file with eTag was just queried
// and precondition failure can happen only when another file with
// different etag got created.
throw new ConcurrentWriteOperationDetectedException(
"Parallel access to the create path detected. Failing request "
+ "to honor single writer semantics");
// OR leasing is enabled on the directory and this client
// does not have the lease.


final String errorText = AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED + " " + details;

// Add a message to the log, including causes
LOG.warn("{}.", errorText);
LOG.warn("This is a race condition or another process has a lease on"
+ " the parent directory.");
// log full stack trace at debug
LOG.debug("{}", errorText, ex);
// then throw a specific exception class
throw new ConcurrentWriteOperationDetectedException(errorText, ex);
} else {
// another cause. warn
LOG.warn("Failed {}", details);
// print the stack at debug
LOG.debug("{}", details, ex);
// throw without wrapping
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ public class ConcurrentWriteOperationDetectedException
public ConcurrentWriteOperationDetectedException(String message) {
super(message);
}

public ConcurrentWriteOperationDetectedException(final String message,
final Throwable innerThrowable) {
super(message, innerThrowable);
}
}