Skip to content

Commit

Permalink
HADOOP-18657. Tune ABFS create() retry logic
Browse files Browse the repository at this point in the history
Production code changes; no tests yet.

Something with mockito is going to be needed here

Change-Id: I430a9f0e6796461ccec8c23cd80d024258703048
  • Loading branch information
steveloughran committed Mar 8, 2023
1 parent 9274018 commit a4122e2
Showing 1 changed file with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.services.AbfsErrors;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -611,7 +612,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
final String umask,
final boolean isAppendBlob,
TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsRestOperation op;
AbfsRestOperation op = null;

try {
// Trigger a create with overwrite=false first so that eTag fetch can be
Expand All @@ -621,37 +622,57 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
isAppendBlob, null, tracingContext);

} catch (AbfsRestOperationException e) {
LOG.debug("Failed to create {}", relativePath, e);
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
// File pre-exists, fetch eTag
LOG.debug("Fetching etag of {}", relativePath);
try {
op = client.getPathStatus(relativePath, false, tracingContext);
} catch (AbfsRestOperationException ex) {
LOG.debug("Failed to to getPathStatus {}", relativePath, ex);
if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
// Is a parallel access case, as file which was found to be
// present went missing by this request.
throw new ConcurrentWriteOperationDetectedException(
"Parallel access to the create path detected. Failing request "
+ "to honor single writer semantics");
// this means the other thread deleted it and the conflict
// has implicitly been resolved.
LOG.debug("File at {} has been deleted; creation can continue", relativePath);
} else {
throw ex;
}
}

String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
String eTag = op != null
? op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG)
: null;

LOG.debug("Attempting to create file {} with etag of {}", relativePath, eTag);
try {
// overwrite only if eTag matches with the file properties fetched befpre
op = client.createPath(relativePath, true, true, permission, umask,
// overwrite only if eTag matches with the file properties fetched or the file
// was deleted and there is no etag.
// if the etag was not retrieved, overwrite is still false, so will fail
// if another process has just created the file
op = client.createPath(relativePath, true, eTag != null, permission, umask,
isAppendBlob, eTag, tracingContext);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
final int sc = ex.getStatusCode();
LOG.debug("Failed to create file {} with etag {}; status code={}",
relativePath, eTag, sc, ex);
if (sc == HttpURLConnection.HTTP_PRECON_FAILED
|| sc == HttpURLConnection.HTTP_CONFLICT) {
// Is a parallel access case, as file with eTag was just queried
// and precondition failure can happen only when another file with
// different etag got created.
throw new ConcurrentWriteOperationDetectedException(
"Parallel access to the create path detected. Failing request "
+ "to honor single writer semantics");
// OR leasing is enabled on the directory and this client
// does not have the lease.
final ConcurrentWriteOperationDetectedException ex2 =
new ConcurrentWriteOperationDetectedException(
AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED
+ " Path =\"" + relativePath+ "\""
+ "; Status code =" + sc
+ "; etag = \"" + eTag + "\""
+ "; error =" + ex.getErrorMessage());
ex2.initCause(ex);
throw ex2;
} else {
throw ex;
}
Expand Down

0 comments on commit a4122e2

Please sign in to comment.