-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry S3 task log fetch #14714
Retry S3 task log fetch #14714
Conversation
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
Fixed
Show fixed
Hide fixed
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
Fixed
Show fixed
Hide fixed
@@ -87,38 +92,42 @@ public Optional<InputStream> streamTaskStatus(String taskid) throws IOException | |||
private Optional<InputStream> streamTaskFile(final long offset, String taskKey) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nicer to just add a new method streamTaskFileWithRetry
, which calls the existing streamTaskFile
.
something like:
private Optional<InputStream> streamTaskFileWithRetry(final long offset, String taskKey)
{
try {
return S3Utils.retryOperation(() -> streamTaskFile(offset, taskKey))
}
catch (Exception e) {
throw new IOE(e, "Failed to stream logs from: %s", taskKey);
}
}
The new method can also have a javadoc to mention which failure cases are retried.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: the existing streamTaskFile
method could throw some wrapped exception e.g. new IOE(e, "Failed to stream logs from: %s", taskKey)
, in that case the outer new method streamTaskFileWithRetry
won't know whether should retry or not. That means we need to refine the exception throwing in the existing streamTaskFile
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you can modify the exception if required, as long as the overall logic remains the same. The advantage of having a separate retry method is readability and a small diff.
@@ -67,6 +67,11 @@ public S3TaskLogs( | |||
public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException | |||
{ | |||
final String taskKey = getTaskLogKey(taskid, "log"); | |||
// this is to satisfy CodeQL scan | |||
Preconditions.checkArgument( | |||
offset < Long.MAX_VALUE && offset > Long.MIN_VALUE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can ignore the CodeQL scan for now because I am not entirely sure if Long.MIN_VALUE or Long.MAX_VALUE are not being used anywhere on purpose to represent some special scenarios.
The scan might also go away if we make the suggested change of adding a new method rather than updating the existing one. Although, I am not entirely sure if it will.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, though PR can't be merged with CodeQL scan issue right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good, left some minor comments.
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor suggestions, otherwise looks good.
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
Outdated
Show resolved
Hide resolved
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
Outdated
Show resolved
Hide resolved
EasyMock.reset(s3Client); | ||
AmazonS3Exception awsError = new AmazonS3Exception("AWS Error"); | ||
awsError.setErrorCode("503"); | ||
awsError.setStatusCode(503); | ||
EasyMock.expect(s3Client.getObjectMetadata(EasyMock.anyString(), EasyMock.anyString())).andThrow(awsError); | ||
EasyMock.expectLastCall().once(); | ||
String logPath = TEST_PREFIX + "/" + KEY_1 + "/status.json"; | ||
ObjectMetadata objectMetadata = new ObjectMetadata(); | ||
objectMetadata.setContentLength(STATUS_CONTENTS.length()); | ||
EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata); | ||
S3Object s3Object = new S3Object(); | ||
s3Object.setObjectContent(new ByteArrayInputStream(STATUS_CONTENTS.getBytes(StandardCharsets.UTF_8))); | ||
GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, logPath); | ||
getObjectRequest.setRange(0, STATUS_CONTENTS.length() - 1); | ||
getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag()); | ||
EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object); | ||
EasyMock.expectLastCall().once(); | ||
replayAll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some 1-line comments might be good here, or at least a logical separation using newlines.
…torage/s3/S3TaskLogs.java Co-authored-by: Kashif Faraz <[email protected]>
…torage/s3/S3TaskLogs.java Co-authored-by: Kashif Faraz <[email protected]>
Description
Saw the following error when fetching task status from S3. This is due to S3 rate limiting on query, we should retry the operation in this case.
Now change to use
S3Utils.retryS3Operation
for task log fetch, same as task file push method.Release note
Retry S3 task log fetch
Key changed/added classes in this PR
streamTaskFile
method retry on transient S3 error inS3TaskLogs
classThis PR has: