Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry GCS Resumable Upload on Error 410 (#45963) #51419

Merged
merged 1 commit into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
Expand All @@ -53,16 +57,19 @@
import java.util.Map;
import java.util.stream.Collectors;

import static java.net.HttpURLConnection.HTTP_GONE;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;

class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore {

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);

// The recommended maximum size of a blob that should be uploaded in a single
// request. Larger files should be uploaded over multiple requests (this is
// called "resumable upload")
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;

private final String bucketName;
private final String clientName;
Expand Down Expand Up @@ -213,35 +220,53 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
try {
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } :
new Storage.BlobWriteOption[0];
final WriteChannel writeChannel = SocketAccess
// We retry 410 GONE errors to cover the unlikely but possible scenario where a resumable upload session becomes broken and
// needs to be restarted from scratch. Given how unlikely a 410 error should be according to SLAs we retry only twice.
assert inputStream.markSupported();
inputStream.mark(Integer.MAX_VALUE);
StorageException storageException = null;
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
for (int retry = 0; retry < 3; ++retry) {
try {
final WriteChannel writeChannel = SocketAccess
.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}

@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}
@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}

@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
}
}));
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
if (errorCode == HTTP_GONE) {
logger.warn(() -> new ParameterizedMessage("Retrying broken resumable upload session for blob {}", blobInfo), se);
storageException = ExceptionsHelper.useOrSuppress(storageException, se);
inputStream.reset();
continue;
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
}));
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
if (storageException != null) {
se.addSuppressed(storageException);
}
throw se;
}
throw se;
}
assert storageException != null;
throw storageException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@

package org.elasticsearch.repositories.gcs;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;

import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -37,10 +46,35 @@ protected BlobStore newBlobStore() {
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
try {
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random()));
} catch (final Exception e) {
throw new RuntimeException(e);
}
return new GoogleCloudStorageBlobStore(bucketName, clientName, storageService);
}

public void testWriteReadLarge() throws IOException {
try(BlobStore store = newBlobStore()) {
final BlobContainer container = store.blobContainer(new BlobPath());
byte[] data = randomBytes(GoogleCloudStorageBlobStore.LARGE_BLOB_THRESHOLD_BYTE_SIZE + 1);
writeBlob(container, "foobar", new BytesArray(data), randomBoolean());
if (randomBoolean()) {
// override file, to check if we get latest contents
random().nextBytes(data);
writeBlob(container, "foobar", new BytesArray(data), false);
}
try (InputStream stream = container.readBlob("foobar")) {
BytesRefBuilder target = new BytesRefBuilder();
while (target.length() < data.length) {
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
int offset = scaledRandomIntBetween(0, buffer.length - 1);
int read = stream.read(buffer, offset, buffer.length - offset);
target.append(new BytesRef(buffer, offset, read));
}
assertEquals(data.length, target.length());
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected GoogleCloudStorageService createStorageService() {
public static class MockGoogleCloudStorageService extends GoogleCloudStorageService {
@Override
public Storage client(String clientName) {
return new MockStorage(BUCKET, blobs);
return new MockStorage(BUCKET, blobs, random());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected BlobStore newBlobStore() {
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
try {
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random()));
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testDeprecatedSettings() throws Exception {
new GoogleCloudStorageService() {
@Override
public Storage client(String clientName) throws IOException {
return new MockStorage("test", new ConcurrentHashMap<>());
return new MockStorage("test", new ConcurrentHashMap<>(), random());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.StorageRpcOptionUtils;
import com.google.cloud.storage.StorageTestUtils;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
Expand All @@ -52,6 +54,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -63,10 +67,12 @@
*/
class MockStorage implements Storage {

private final Random random;
private final String bucketName;
private final ConcurrentMap<String, byte[]> blobs;

MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs) {
MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs, final Random random) {
this.random = random;
this.bucketName = Objects.requireNonNull(bucket);
this.blobs = Objects.requireNonNull(blobs);
}
Expand Down Expand Up @@ -228,12 +234,16 @@ public boolean isOpen() {
return null;
}

private final Set<BlobInfo> simulated410s = ConcurrentCollections.newConcurrentSet();

@Override
public WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
if (bucketName.equals(blobInfo.getBucket())) {
final ByteArrayOutputStream output = new ByteArrayOutputStream();
return new WriteChannel() {

private volatile boolean failed;

final WritableByteChannel writableByteChannel = Channels.newChannel(output);

@Override
Expand All @@ -248,6 +258,11 @@ public RestorableState<WriteChannel> capture() {

@Override
public int write(ByteBuffer src) throws IOException {
// Only fail a blob once on a 410 error since the error is so unlikely in practice
if (simulated410s.add(blobInfo) && random.nextBoolean()) {
failed = true;
throw new StorageException(HttpURLConnection.HTTP_GONE, "Simulated lost resumeable upload session");
}
return writableByteChannel.write(src);
}

Expand All @@ -259,13 +274,15 @@ public boolean isOpen() {
@Override
public void close() {
IOUtils.closeWhileHandlingException(writableByteChannel);
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
if (failed == false) {
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
}
} else {
blobs.put(blobInfo.getName(), output.toByteArray());
}
} else {
blobs.put(blobInfo.getName(), output.toByteArray());
}
}
};
Expand Down