From a4122e276ad2264c6303eecc3584b63f865dd353 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 8 Mar 2023 14:35:59 +0000 Subject: [PATCH] HADOOP-18657. Tune ABFS create() retry logic Production code changes; no tests yet. Something with mockito is going to be needed here Change-Id: I430a9f0e6796461ccec8c23cd80d024258703048 --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 45 ++++++++++++++----- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index e5e7056126564..9dd252405d382 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -55,6 +55,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.services.AbfsErrors; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; @@ -611,7 +612,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa final String umask, final boolean isAppendBlob, TracingContext tracingContext) throws AzureBlobFileSystemException { - AbfsRestOperation op; + AbfsRestOperation op = null; try { // Trigger a create with overwrite=false first so that eTag fetch can be @@ -621,37 +622,57 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa isAppendBlob, null, tracingContext); } catch (AbfsRestOperationException e) { + LOG.debug("Failed to create {}", relativePath, e); if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { // File pre-exists, fetch eTag + LOG.debug("Fetching etag of {}", relativePath); try { op = client.getPathStatus(relativePath, false, tracingContext); } catch (AbfsRestOperationException ex) { + LOG.debug("Failed to to getPathStatus {}", relativePath, ex); if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { // Is a parallel access case, as file which was found to be // present went missing by this request. - throw new ConcurrentWriteOperationDetectedException( - "Parallel access to the create path detected. Failing request " - + "to honor single writer semantics"); + // this means the other thread deleted it and the conflict + // has implicitly been resolved. + LOG.debug("File at {} has been deleted; creation can continue", relativePath); } else { throw ex; } } - String eTag = op.getResult() - .getResponseHeader(HttpHeaderConfigurations.ETAG); + String eTag = op != null + ? op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG) + : null; + LOG.debug("Attempting to create file {} with etag of {}", relativePath, eTag); try { - // overwrite only if eTag matches with the file properties fetched befpre - op = client.createPath(relativePath, true, true, permission, umask, + // overwrite only if eTag matches with the file properties fetched or the file + // was deleted and there is no etag. + // if the etag was not retrieved, overwrite is still false, so will fail + // if another process has just created the file + op = client.createPath(relativePath, true, eTag != null, permission, umask, isAppendBlob, eTag, tracingContext); } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { + final int sc = ex.getStatusCode(); + LOG.debug("Failed to create file {} with etag {}; status code={}", + relativePath, eTag, sc, ex); + if (sc == HttpURLConnection.HTTP_PRECON_FAILED + || sc == HttpURLConnection.HTTP_CONFLICT) { // Is a parallel access case, as file with eTag was just queried // and precondition failure can happen only when another file with // different etag got created. - throw new ConcurrentWriteOperationDetectedException( - "Parallel access to the create path detected. Failing request " - + "to honor single writer semantics"); + // OR leasing is enabled on the directory and this client + // does not have the lease. + final ConcurrentWriteOperationDetectedException ex2 = + new ConcurrentWriteOperationDetectedException( + AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED + + " Path =\"" + relativePath+ "\"" + + "; Status code =" + sc + + "; etag = \"" + eTag + "\"" + + "; error =" + ex.getErrorMessage()); + ex2.initCause(ex); + throw ex2; } else { throw ex; }