Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry GCS Resumable Upload on Error 410 #45963

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
Expand Down Expand Up @@ -60,16 +64,19 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static java.net.HttpURLConnection.HTTP_GONE;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;

class GoogleCloudStorageBlobStore implements BlobStore {

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);

// The recommended maximum size of a blob that should be uploaded in a single
// request. Larger files should be uploaded over multiple requests (this is
// called "resumable upload")
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;

private final String bucketName;
private final String clientName;
Expand Down Expand Up @@ -224,35 +231,53 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
try {
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } :
new Storage.BlobWriteOption[0];
final WriteChannel writeChannel = SocketAccess
// We retry 410 GONE errors to cover the unlikely but possible scenario where a resumable upload session becomes broken and
// needs to be restarted from scratch. Given how unlikely a 410 error should be according to SLAs we retry only twice.
assert inputStream.markSupported();
inputStream.mark(Integer.MAX_VALUE);
StorageException storageException = null;
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
for (int retry = 0; retry < 3; ++retry) {
try {
final WriteChannel writeChannel = SocketAccess
.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}

@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}
@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}

@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
}
}));
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
if (errorCode == HTTP_GONE) {
logger.warn(() -> new ParameterizedMessage("Retrying broken resumable upload session for blob {}", blobInfo), se);
storageException = ExceptionsHelper.useOrSuppress(storageException, se);
inputStream.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is inputStream.mark() called?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nowhere, which was fine for all the streams we seem to be using. Added an explicit call now though :)

continue;
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
}));
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
if (storageException != null) {
se.addSuppressed(storageException);
}
throw se;
}
throw se;
}
assert storageException != null;
throw storageException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@

package org.elasticsearch.repositories.gcs;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;

import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -37,10 +46,35 @@ protected BlobStore newBlobStore() {
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
try {
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random()));
} catch (final Exception e) {
throw new RuntimeException(e);
}
return new GoogleCloudStorageBlobStore(bucketName, clientName, storageService);
}

public void testWriteReadLarge() throws IOException {
try(BlobStore store = newBlobStore()) {
final BlobContainer container = store.blobContainer(new BlobPath());
byte[] data = randomBytes(GoogleCloudStorageBlobStore.LARGE_BLOB_THRESHOLD_BYTE_SIZE + 1);
writeBlob(container, "foobar", new BytesArray(data), randomBoolean());
if (randomBoolean()) {
// override file, to check if we get latest contents
random().nextBytes(data);
writeBlob(container, "foobar", new BytesArray(data), false);
}
try (InputStream stream = container.readBlob("foobar")) {
BytesRefBuilder target = new BytesRefBuilder();
while (target.length() < data.length) {
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
int offset = scaledRandomIntBetween(0, buffer.length - 1);
int read = stream.read(buffer, offset, buffer.length - offset);
target.append(new BytesRef(buffer, offset, read));
}
assertEquals(data.length, target.length());
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
final MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace("test").getKey(), serviceAccount);
settings.setSecureSettings(secureSettings);

return settings.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected BlobStore newBlobStore() {
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
try {
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random()));
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.StorageRpcOptionUtils;
import com.google.cloud.storage.StorageTestUtils;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;
import org.mockito.stubbing.Answer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
Expand All @@ -55,6 +57,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -71,10 +75,12 @@
*/
class MockStorage implements Storage {

private final Random random;
private final String bucketName;
private final ConcurrentMap<String, byte[]> blobs;

MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs) {
MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs, final Random random) {
this.random = random;
this.bucketName = Objects.requireNonNull(bucket);
this.blobs = Objects.requireNonNull(blobs);
}
Expand Down Expand Up @@ -236,12 +242,16 @@ public boolean isOpen() {
return null;
}

private final Set<BlobInfo> simulated410s = ConcurrentCollections.newConcurrentSet();

@Override
public WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
if (bucketName.equals(blobInfo.getBucket())) {
final ByteArrayOutputStream output = new ByteArrayOutputStream();
return new WriteChannel() {

private volatile boolean failed;

final WritableByteChannel writableByteChannel = Channels.newChannel(output);

@Override
Expand All @@ -256,6 +266,11 @@ public RestorableState<WriteChannel> capture() {

@Override
public int write(ByteBuffer src) throws IOException {
// Only fail a blob once on a 410 error since the error is so unlikely in practice
if (simulated410s.add(blobInfo) && random.nextBoolean()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test this with @tlrx's recent HTTP mock of GCS?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the goal, but for now repository integration tests do not support GCS resumable uploads (as we don't generate large enough segment files).

I can add GCS-like retries test at the blob store level like I did for S3 (see S3BlobContainerRetriesTests) and test this retry logic with it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left this for @tlrx now if that's ok :) No need for two people to mess with the API mock code concurrently I guess.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took some time but I finally opened #46968, which adds some random 410-Gone errors when testing resume upload (see testWriteLargeBlob test)

failed = true;
throw new StorageException(HttpURLConnection.HTTP_GONE, "Simulated lost resumeable upload session");
}
return writableByteChannel.write(src);
}

Expand All @@ -267,13 +282,15 @@ public boolean isOpen() {
@Override
public void close() {
IOUtils.closeWhileHandlingException(writableByteChannel);
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
if (failed == false) {
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
}
} else {
blobs.put(blobInfo.getName(), output.toByteArray());
}
} else {
blobs.put(blobInfo.getName(), output.toByteArray());
}
}
};
Expand Down