Skip to content

Commit

Permalink
Retry S3 task log fetch in case of transient S3 exceptions (#14714)
Browse files Browse the repository at this point in the history
  • Loading branch information
YongGang authored Aug 3, 2023
1 parent b27d281 commit 20c48b6
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.storage.s3;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
Expand Down Expand Up @@ -67,57 +66,66 @@ public S3TaskLogs(
public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "log");
return streamTaskFile(offset, taskKey);
return streamTaskFileWithRetry(offset, taskKey);
}

@Override
public Optional<InputStream> streamTaskReports(String taskid) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "report.json");
return streamTaskFile(0, taskKey);
return streamTaskFileWithRetry(0, taskKey);
}

@Override
public Optional<InputStream> streamTaskStatus(String taskid) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "status.json");
return streamTaskFile(0, taskKey);
return streamTaskFileWithRetry(0, taskKey);
}

private Optional<InputStream> streamTaskFile(final long offset, String taskKey) throws IOException
/**
* Using the retry conditions defined in {@link S3Utils#S3RETRY}.
*/
private Optional<InputStream> streamTaskFileWithRetry(final long offset, String taskKey) throws IOException
{
try {
return S3Utils.retryS3Operation(() -> streamTaskFile(offset, taskKey));
}
catch (Exception e) {
throw new IOE(e, "Failed to stream logs for task[%s] starting at offset[%d]", taskKey, offset);
}
}

private Optional<InputStream> streamTaskFile(final long offset, String taskKey)
{
try {
final ObjectMetadata objectMetadata = service.getObjectMetadata(config.getS3Bucket(), taskKey);

try {
final long start;
final long end = objectMetadata.getContentLength() - 1;
final long start;
final long end = objectMetadata.getContentLength() - 1;

if (offset > 0 && offset < objectMetadata.getContentLength()) {
start = offset;
} else if (offset < 0 && (-1 * offset) < objectMetadata.getContentLength()) {
start = objectMetadata.getContentLength() + offset;
} else {
start = 0;
}
long contentLength = objectMetadata.getContentLength();
if (offset >= contentLength || offset <= -contentLength) {
start = 0;
} else if (offset >= 0) {
start = offset;
} else {
start = contentLength + offset;
}

final GetObjectRequest request = new GetObjectRequest(config.getS3Bucket(), taskKey)
.withMatchingETagConstraint(ensureQuotated(objectMetadata.getETag()))
.withRange(start, end);
final GetObjectRequest request = new GetObjectRequest(config.getS3Bucket(), taskKey)
.withMatchingETagConstraint(ensureQuotated(objectMetadata.getETag()))
.withRange(start, end);

return Optional.of(service.getObject(request).getObjectContent());
}
catch (AmazonServiceException e) {
throw new IOException(e);
}
return Optional.of(service.getObject(request).getObjectContent());
}
catch (AmazonS3Exception e) {
if (404 == e.getStatusCode()
|| "NoSuchKey".equals(e.getErrorCode())
|| "NoSuchBucket".equals(e.getErrorCode())) {
return Optional.absent();
} else {
throw new IOE(e, "Failed to stream logs from: %s", taskKey);
throw e;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.Grant;
Expand Down Expand Up @@ -487,6 +488,45 @@ public void test_status_fetch() throws IOException
Assert.assertEquals(STATUS_CONTENTS, report);
}

@Test
public void test_retryStatusFetch_whenExceptionThrown() throws IOException
{
EasyMock.reset(s3Client);
// throw exception on first call
AmazonS3Exception awsError = new AmazonS3Exception("AWS Error");
awsError.setErrorCode("503");
awsError.setStatusCode(503);
EasyMock.expect(s3Client.getObjectMetadata(EasyMock.anyString(), EasyMock.anyString())).andThrow(awsError);
EasyMock.expectLastCall().once();

String logPath = TEST_PREFIX + "/" + KEY_1 + "/status.json";
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(STATUS_CONTENTS.length());
EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata);
S3Object s3Object = new S3Object();
s3Object.setObjectContent(new ByteArrayInputStream(STATUS_CONTENTS.getBytes(StandardCharsets.UTF_8)));
GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, logPath);
getObjectRequest.setRange(0, STATUS_CONTENTS.length() - 1);
getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
EasyMock.expectLastCall().once();

replayAll();

S3TaskLogs s3TaskLogs = getS3TaskLogs();

Optional<InputStream> inputStreamOptional = s3TaskLogs.streamTaskStatus(KEY_1);
String report;
try (BufferedReader reader = new BufferedReader(new InputStreamReader(
inputStreamOptional.get(),
StandardCharsets.UTF_8
))) {
report = reader.lines().collect(Collectors.joining("\n"));
}

Assert.assertEquals(STATUS_CONTENTS, report);
}

@Nonnull
private S3TaskLogs getS3TaskLogs()
{
Expand Down

0 comments on commit 20c48b6

Please sign in to comment.