Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry S3 task log fetch #14714

Merged
merged 10 commits into from
Aug 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.storage.s3;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
Expand Down Expand Up @@ -67,57 +66,64 @@ public S3TaskLogs(
public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "log");
return streamTaskFile(offset, taskKey);
return streamTaskFileWithRetry(offset, taskKey);
}

@Override
public Optional<InputStream> streamTaskReports(String taskid) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "report.json");
return streamTaskFile(0, taskKey);
return streamTaskFileWithRetry(0, taskKey);
}

@Override
public Optional<InputStream> streamTaskStatus(String taskid) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "status.json");
return streamTaskFile(0, taskKey);
return streamTaskFileWithRetry(0, taskKey);
}

private Optional<InputStream> streamTaskFile(final long offset, String taskKey) throws IOException
/**
* Using the retry conditions defined in {@link S3Utils#S3RETRY}.
*/
private Optional<InputStream> streamTaskFileWithRetry(final long offset, String taskKey) throws IOException
{
try {
return S3Utils.retryS3Operation(() -> streamTaskFile(offset, taskKey));
}
catch (Exception e) {
throw new IOE(e, "Failed to stream logs from: %s", taskKey);
YongGang marked this conversation as resolved.
Show resolved Hide resolved
}
}

private Optional<InputStream> streamTaskFile(final long offset, String taskKey)
{
try {
final ObjectMetadata objectMetadata = service.getObjectMetadata(config.getS3Bucket(), taskKey);

try {
final long start;
final long end = objectMetadata.getContentLength() - 1;
final long start;
final long end = objectMetadata.getContentLength() - 1;

if (offset > 0 && offset < objectMetadata.getContentLength()) {
start = offset;
} else if (offset < 0 && (-1 * offset) < objectMetadata.getContentLength()) {
start = objectMetadata.getContentLength() + offset;
} else {
start = 0;
}
long contentLength = objectMetadata.getContentLength();
if (offset >= contentLength || offset <= -contentLength) {
start = 0;
} else {
start = offset >= 0 ? offset : contentLength + offset;
}
YongGang marked this conversation as resolved.
Show resolved Hide resolved

final GetObjectRequest request = new GetObjectRequest(config.getS3Bucket(), taskKey)
.withMatchingETagConstraint(ensureQuotated(objectMetadata.getETag()))
.withRange(start, end);
final GetObjectRequest request = new GetObjectRequest(config.getS3Bucket(), taskKey)
.withMatchingETagConstraint(ensureQuotated(objectMetadata.getETag()))
.withRange(start, end);

return Optional.of(service.getObject(request).getObjectContent());
}
catch (AmazonServiceException e) {
throw new IOException(e);
}
return Optional.of(service.getObject(request).getObjectContent());
}
catch (AmazonS3Exception e) {
if (404 == e.getStatusCode()
|| "NoSuchKey".equals(e.getErrorCode())
|| "NoSuchBucket".equals(e.getErrorCode())) {
return Optional.absent();
} else {
throw new IOE(e, "Failed to stream logs from: %s", taskKey);
throw e;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.Grant;
Expand Down Expand Up @@ -487,6 +488,42 @@ public void test_status_fetch() throws IOException
Assert.assertEquals(STATUS_CONTENTS, report);
}

@Test
public void test_retryStatusFetch_whenExceptionThrown() throws IOException
{
EasyMock.reset(s3Client);
AmazonS3Exception awsError = new AmazonS3Exception("AWS Error");
awsError.setErrorCode("503");
awsError.setStatusCode(503);
EasyMock.expect(s3Client.getObjectMetadata(EasyMock.anyString(), EasyMock.anyString())).andThrow(awsError);
EasyMock.expectLastCall().once();
String logPath = TEST_PREFIX + "/" + KEY_1 + "/status.json";
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(STATUS_CONTENTS.length());
EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata);
S3Object s3Object = new S3Object();
s3Object.setObjectContent(new ByteArrayInputStream(STATUS_CONTENTS.getBytes(StandardCharsets.UTF_8)));
GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, logPath);
getObjectRequest.setRange(0, STATUS_CONTENTS.length() - 1);
getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
EasyMock.expectLastCall().once();
replayAll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some 1-line comments might be good here, or at least a logical separation using newlines.


S3TaskLogs s3TaskLogs = getS3TaskLogs();

Optional<InputStream> inputStreamOptional = s3TaskLogs.streamTaskStatus(KEY_1);
String report;
try (BufferedReader reader = new BufferedReader(new InputStreamReader(
inputStreamOptional.get(),
StandardCharsets.UTF_8
))) {
report = reader.lines().collect(Collectors.joining("\n"));
}

Assert.assertEquals(STATUS_CONTENTS, report);
}

@Nonnull
private S3TaskLogs getS3TaskLogs()
{
Expand Down