From c10ef3579718b961228be0a90dce28696d8f1f6b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 17 Sep 2019 16:20:14 +0200 Subject: [PATCH] Retry GCS Resumable Upload on Error 410 (#45963) A resumable upload session can fail on with a 410 error and should be retried in that case. I added retrying twice using resetting of the given `InputStream` as the retry mechanism since the same approach is used by the AWS S3 SDK already as well and relied upon by the S3 repository implementation. Related GCS documentation: https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone --- .../gcs/GoogleCloudStorageBlobStore.java | 73 +++++++++++++------ ...leCloudStorageBlobStoreContainerTests.java | 36 ++++++++- ...eCloudStorageBlobStoreRepositoryTests.java | 2 +- .../gcs/GoogleCloudStorageBlobStoreTests.java | 2 +- ...loudStorageRepositoryDeprecationTests.java | 2 +- .../repositories/gcs/MockStorage.java | 31 ++++++-- 6 files changed, 111 insertions(+), 35 deletions(-) diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 7894f9fc7df63..4e757bac403e0 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -28,6 +28,10 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobListOption; import com.google.cloud.storage.StorageException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -53,16 +57,19 @@ import java.util.Map; import java.util.stream.Collectors; +import static java.net.HttpURLConnection.HTTP_GONE; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore { + private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class); + // The recommended maximum size of a blob that should be uploaded in a single // request. Larger files should be uploaded over multiple requests (this is // called "resumable upload") // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload - private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024; + public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024; private final String bucketName; private final String clientName; @@ -213,35 +220,53 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException { - try { - final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? - new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } : - new Storage.BlobWriteOption[0]; - final WriteChannel writeChannel = SocketAccess + // We retry 410 GONE errors to cover the unlikely but possible scenario where a resumable upload session becomes broken and + // needs to be restarted from scratch. Given how unlikely a 410 error should be according to SLAs we retry only twice. + assert inputStream.markSupported(); + inputStream.mark(Integer.MAX_VALUE); + StorageException storageException = null; + final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? + new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0]; + for (int retry = 0; retry < 3; ++retry) { + try { + final WriteChannel writeChannel = SocketAccess .doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions)); - Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { - @Override - public boolean isOpen() { - return writeChannel.isOpen(); - } + Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { + @Override + public boolean isOpen() { + return writeChannel.isOpen(); + } - @Override - public void close() throws IOException { - SocketAccess.doPrivilegedVoidIOException(writeChannel::close); - } + @Override + public void close() throws IOException { + SocketAccess.doPrivilegedVoidIOException(writeChannel::close); + } - @SuppressForbidden(reason = "Channel is based of a socket not a file") - @Override - public int write(ByteBuffer src) throws IOException { - return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src)); + @SuppressForbidden(reason = "Channel is based of a socket not a file") + @Override + public int write(ByteBuffer src) throws IOException { + return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src)); + } + })); + return; + } catch (final StorageException se) { + final int errorCode = se.getCode(); + if (errorCode == HTTP_GONE) { + logger.warn(() -> new ParameterizedMessage("Retrying broken resumable upload session for blob {}", blobInfo), se); + storageException = ExceptionsHelper.useOrSuppress(storageException, se); + inputStream.reset(); + continue; + } else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) { + throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); } - })); - } catch (final StorageException se) { - if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) { - throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); + if (storageException != null) { + se.addSuppressed(storageException); + } + throw se; } - throw se; } + assert storageException != null; + throw storageException; } /** diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java index 2f23011d4d9b7..5f5eaff85a766 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java @@ -19,12 +19,21 @@ package org.elasticsearch.repositories.gcs; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.repositories.ESBlobStoreContainerTestCase; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; import java.util.Locale; import java.util.concurrent.ConcurrentHashMap; +import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,10 +46,35 @@ protected BlobStore newBlobStore() { final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT); final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class); try { - when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>())); + when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random())); } catch (final Exception e) { throw new RuntimeException(e); } return new GoogleCloudStorageBlobStore(bucketName, clientName, storageService); } + + public void testWriteReadLarge() throws IOException { + try(BlobStore store = newBlobStore()) { + final BlobContainer container = store.blobContainer(new BlobPath()); + byte[] data = randomBytes(GoogleCloudStorageBlobStore.LARGE_BLOB_THRESHOLD_BYTE_SIZE + 1); + writeBlob(container, "foobar", new BytesArray(data), randomBoolean()); + if (randomBoolean()) { + // override file, to check if we get latest contents + random().nextBytes(data); + writeBlob(container, "foobar", new BytesArray(data), false); + } + try (InputStream stream = container.readBlob("foobar")) { + BytesRefBuilder target = new BytesRefBuilder(); + while (target.length() < data.length) { + byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())]; + int offset = scaledRandomIntBetween(0, buffer.length - 1); + int read = stream.read(buffer, offset, buffer.length - offset); + target.append(new BytesRef(buffer, offset, read)); + } + assertEquals(data.length, target.length()); + assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length())); + } + } + } + } diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 0e3ecde69c4f0..d1c3ffbedaea2 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -87,7 +87,7 @@ protected GoogleCloudStorageService createStorageService() { public static class MockGoogleCloudStorageService extends GoogleCloudStorageService { @Override public Storage client(String clientName) { - return new MockStorage(BUCKET, blobs); + return new MockStorage(BUCKET, blobs, random()); } } diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreTests.java index e2adfed94bbc9..294adfdaec573 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreTests.java @@ -37,7 +37,7 @@ protected BlobStore newBlobStore() { final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT); final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class); try { - when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>())); + when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random())); } catch (final Exception e) { throw new RuntimeException(e); } diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepositoryDeprecationTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepositoryDeprecationTests.java index c45947bc7feb5..575ed1068b51c 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepositoryDeprecationTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepositoryDeprecationTests.java @@ -46,7 +46,7 @@ public void testDeprecatedSettings() throws Exception { new GoogleCloudStorageService() { @Override public Storage client(String clientName) throws IOException { - return new MockStorage("test", new ConcurrentHashMap<>()); + return new MockStorage("test", new ConcurrentHashMap<>(), random()); } }); diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java index 97c7e2ab76bd2..3ae3f0ba13dfc 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java @@ -38,12 +38,14 @@ import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageRpcOptionUtils; import com.google.cloud.storage.StorageTestUtils; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.internal.io.IOUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.net.HttpURLConnection; import java.net.URL; import java.nio.ByteBuffer; import java.nio.channels.Channels; @@ -52,6 +54,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -63,10 +67,12 @@ */ class MockStorage implements Storage { + private final Random random; private final String bucketName; private final ConcurrentMap blobs; - MockStorage(final String bucket, final ConcurrentMap blobs) { + MockStorage(final String bucket, final ConcurrentMap blobs, final Random random) { + this.random = random; this.bucketName = Objects.requireNonNull(bucket); this.blobs = Objects.requireNonNull(blobs); } @@ -228,12 +234,16 @@ public boolean isOpen() { return null; } + private final Set simulated410s = ConcurrentCollections.newConcurrentSet(); + @Override public WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { if (bucketName.equals(blobInfo.getBucket())) { final ByteArrayOutputStream output = new ByteArrayOutputStream(); return new WriteChannel() { + private volatile boolean failed; + final WritableByteChannel writableByteChannel = Channels.newChannel(output); @Override @@ -248,6 +258,11 @@ public RestorableState capture() { @Override public int write(ByteBuffer src) throws IOException { + // Only fail a blob once on a 410 error since the error is so unlikely in practice + if (simulated410s.add(blobInfo) && random.nextBoolean()) { + failed = true; + throw new StorageException(HttpURLConnection.HTTP_GONE, "Simulated lost resumeable upload session"); + } return writableByteChannel.write(src); } @@ -259,13 +274,15 @@ public boolean isOpen() { @Override public void close() { IOUtils.closeWhileHandlingException(writableByteChannel); - if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) { - byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray()); - if (existingBytes != null) { - throw new StorageException(412, "Blob already exists"); + if (failed == false) { + if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) { + byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray()); + if (existingBytes != null) { + throw new StorageException(412, "Blob already exists"); + } + } else { + blobs.put(blobInfo.getName(), output.toByteArray()); } - } else { - blobs.put(blobInfo.getName(), output.toByteArray()); } } };