-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry GCS Resumable Upload on Error 410 #45963
Changes from all commits
fbdb05a
fa71e8a
4b735a0
adbc8b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,13 +40,15 @@ | |
import com.google.cloud.storage.StorageOptions; | ||
import com.google.cloud.storage.StorageRpcOptionUtils; | ||
import com.google.cloud.storage.StorageTestUtils; | ||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; | ||
import org.elasticsearch.core.internal.io.IOUtils; | ||
import org.mockito.stubbing.Answer; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.net.HttpURLConnection; | ||
import java.net.URL; | ||
import java.nio.ByteBuffer; | ||
import java.nio.channels.Channels; | ||
|
@@ -55,6 +57,8 @@ | |
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Random; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
|
@@ -71,10 +75,12 @@ | |
*/ | ||
class MockStorage implements Storage { | ||
|
||
private final Random random; | ||
private final String bucketName; | ||
private final ConcurrentMap<String, byte[]> blobs; | ||
|
||
MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs) { | ||
MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs, final Random random) { | ||
this.random = random; | ||
this.bucketName = Objects.requireNonNull(bucket); | ||
this.blobs = Objects.requireNonNull(blobs); | ||
} | ||
|
@@ -236,12 +242,16 @@ public boolean isOpen() { | |
return null; | ||
} | ||
|
||
private final Set<BlobInfo> simulated410s = ConcurrentCollections.newConcurrentSet(); | ||
|
||
@Override | ||
public WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { | ||
if (bucketName.equals(blobInfo.getBucket())) { | ||
final ByteArrayOutputStream output = new ByteArrayOutputStream(); | ||
return new WriteChannel() { | ||
|
||
private volatile boolean failed; | ||
|
||
final WritableByteChannel writableByteChannel = Channels.newChannel(output); | ||
|
||
@Override | ||
|
@@ -256,6 +266,11 @@ public RestorableState<WriteChannel> capture() { | |
|
||
@Override | ||
public int write(ByteBuffer src) throws IOException { | ||
// Only fail a blob once on a 410 error since the error is so unlikely in practice | ||
if (simulated410s.add(blobInfo) && random.nextBoolean()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we test this with @tlrx's recent HTTP mock of GCS? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's the goal, but for now repository integration tests do not support GCS resumable uploads (as we don't generate large enough segment files). I can add GCS-like retries test at the blob store level like I did for S3 (see S3BlobContainerRetriesTests) and test this retry logic with it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Left this for @tlrx now if that's ok :) No need for two people to mess with the API mock code concurrently I guess. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It took some time but I finally opened #46968, which adds some random |
||
failed = true; | ||
throw new StorageException(HttpURLConnection.HTTP_GONE, "Simulated lost resumeable upload session"); | ||
} | ||
return writableByteChannel.write(src); | ||
} | ||
|
||
|
@@ -267,13 +282,15 @@ public boolean isOpen() { | |
@Override | ||
public void close() { | ||
IOUtils.closeWhileHandlingException(writableByteChannel); | ||
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) { | ||
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray()); | ||
if (existingBytes != null) { | ||
throw new StorageException(412, "Blob already exists"); | ||
if (failed == false) { | ||
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) { | ||
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray()); | ||
if (existingBytes != null) { | ||
throw new StorageException(412, "Blob already exists"); | ||
} | ||
} else { | ||
blobs.put(blobInfo.getName(), output.toByteArray()); | ||
} | ||
} else { | ||
blobs.put(blobInfo.getName(), output.toByteArray()); | ||
} | ||
} | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is
inputStream.mark()
called?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nowhere, which was fine for all the streams we seem to be using. Added an explicit call now though :)