-
Notifications
You must be signed in to change notification settings - Fork 9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-18657. Tune ABFS create() retry logic #5462
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -55,6 +55,7 @@ | |||
import java.util.concurrent.TimeUnit; | ||||
|
||||
import org.apache.hadoop.classification.VisibleForTesting; | ||||
import org.apache.hadoop.fs.azurebfs.services.AbfsErrors; | ||||
import org.apache.hadoop.util.Preconditions; | ||||
import org.apache.hadoop.thirdparty.com.google.common.base.Strings; | ||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; | ||||
|
@@ -611,7 +612,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa | |||
final String umask, | ||||
final boolean isAppendBlob, | ||||
TracingContext tracingContext) throws AzureBlobFileSystemException { | ||||
AbfsRestOperation op; | ||||
AbfsRestOperation op = null; | ||||
|
||||
try { | ||||
// Trigger a create with overwrite=false first so that eTag fetch can be | ||||
|
@@ -621,38 +622,79 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa | |||
isAppendBlob, null, tracingContext); | ||||
|
||||
} catch (AbfsRestOperationException e) { | ||||
LOG.debug("Failed to create {}", relativePath, e); | ||||
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { | ||||
// File pre-exists, fetch eTag | ||||
LOG.debug("Fetching etag of {}", relativePath); | ||||
try { | ||||
op = client.getPathStatus(relativePath, false, tracingContext); | ||||
} catch (AbfsRestOperationException ex) { | ||||
LOG.debug("Failed to to getPathStatus {}", relativePath, ex); | ||||
if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { | ||||
// Is a parallel access case, as file which was found to be | ||||
// present went missing by this request. | ||||
throw new ConcurrentWriteOperationDetectedException( | ||||
"Parallel access to the create path detected. Failing request " | ||||
+ "to honor single writer semantics"); | ||||
// this means the other thread deleted it and the conflict | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a race condition in the job, and developer should be informed about the same. @snvijaya @anmolanmol1234 @sreeb-msft , what you feel. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will do; text will indicate this may be due to a lease on the parent dir too. |
||||
// has implicitly been resolved. | ||||
LOG.debug("File at {} has been deleted; creation can continue", relativePath); | ||||
} else { | ||||
throw ex; | ||||
} | ||||
} | ||||
|
||||
String eTag = op.getResult() | ||||
.getResponseHeader(HttpHeaderConfigurations.ETAG); | ||||
|
||||
// get the etag of the file at the destination; this will be made | ||||
// the condition of the second createPath call. | ||||
String eTag = op != null | ||||
? op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG) | ||||
: null; | ||||
|
||||
final boolean overwrite = eTag != null; | ||||
final String action = overwrite ? "overwrite" : "create"; | ||||
LOG.debug("Attempting to {} file {} with etag of {}", | ||||
action, | ||||
relativePath, eTag); | ||||
try { | ||||
// overwrite only if eTag matches with the file properties fetched befpre | ||||
op = client.createPath(relativePath, true, true, permission, umask, | ||||
// overwrite only if eTag matches with the file properties fetched or the file | ||||
// was deleted and there is no etag. | ||||
// if the etag was not retrieved, overwrite is still false, so will fail | ||||
// if another process has just created the file | ||||
|
||||
op = client.createPath(relativePath, true, overwrite, permission, umask, | ||||
isAppendBlob, eTag, tracingContext); | ||||
|
||||
} catch (AbfsRestOperationException ex) { | ||||
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { | ||||
final int sc = ex.getStatusCode(); | ||||
|
||||
// Create a detailed error message. | ||||
final String details = "Path =\"" + relativePath + "\"" | ||||
+ "; Status code =" + sc | ||||
+ "; etag = \"" + eTag + "\"" | ||||
+ "; operation = \"" + action + "\"" | ||||
+ "; error =" + ex.getErrorMessage(); | ||||
if (sc == HttpURLConnection.HTTP_PRECON_FAILED | ||||
|| sc == HttpURLConnection.HTTP_CONFLICT) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good that we have taken care of 409 which can come when due to would be awesome if we can put it in comments, and also have log according to it.
suggestion to add log2: where in when we searched for etag, there was no file, now when we will try to createPath with overWrite = false, if it will give 409 in case some other process created a file on same path. Also, in case of 409, it is similar to the case we started with in this method. Should we get into 409 control as in Line 624 in 7f9ca10
|
||||
// Is a parallel access case, as file with eTag was just queried | ||||
// and precondition failure can happen only when another file with | ||||
// different etag got created. | ||||
throw new ConcurrentWriteOperationDetectedException( | ||||
"Parallel access to the create path detected. Failing request " | ||||
+ "to honor single writer semantics"); | ||||
// OR leasing is enabled on the directory and this client | ||||
// does not have the lease. | ||||
|
||||
|
||||
final String errorText = AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED + " " + details; | ||||
|
||||
// Add a message to the log, including causes | ||||
LOG.warn("{}.", errorText); | ||||
LOG.warn("This is a race condition or another process has a lease on" | ||||
+ " the parent directory."); | ||||
// log full stack trace at debug | ||||
LOG.debug("{}", errorText, ex); | ||||
// then throw a specific exception class | ||||
throw new ConcurrentWriteOperationDetectedException(errorText, ex); | ||||
} else { | ||||
// another cause. warn | ||||
LOG.warn("Failed {}", details); | ||||
// print the stack at debug | ||||
LOG.debug("{}", details, ex); | ||||
// throw without wrapping | ||||
throw ex; | ||||
} | ||||
} | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @steveloughran, Given Hadoop is single writer semantic, would it be correct to expect that as part of job parallelization only one worker process should try to create a file ? As this check for FileNotFound is post an attempt to create the file with overwrite=false, which inturn failed with conflict indicating file was just present, concurrent operation on the file is indeed confirmed.
Its quite possible that if we let this create proceed, some other operation such as delete can kick in later on as well. Below code that throws exception at the first indication of parallel activity would be the right thing to do ?
As the workload pattern is not honoring the single writer semantic I feel we should retain the logic to throw ConcurrentWriteOperationDetectedException.