Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Bulk Deletes for GCS Repository #41368

Merged
merged 12 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {

Expand Down Expand Up @@ -78,7 +80,12 @@ public void deleteBlob(String blobName) throws IOException {
blobStore.deleteBlob(buildKey(blobName));
}

protected String buildKey(String blobName) {
@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));
}

private String buildKey(String blobName) {
assert blobName != null;
return path + blobName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.repositories.gcs;

import com.google.cloud.BatchResult;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
Expand All @@ -27,10 +28,9 @@
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
Expand All @@ -53,14 +53,13 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;

class GoogleCloudStorageBlobStore implements BlobStore {

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);

// The recommended maximum size of a blob that should be uploaded in a single
// request. Larger files should be uploaded over multiple requests (this is
Expand Down Expand Up @@ -105,7 +104,7 @@ public void close() {
* @param bucketName name of the bucket
* @return true iff the bucket exists
*/
boolean doesBucketExist(String bucketName) {
private boolean doesBucketExist(String bucketName) {
try {
final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> client().get(bucketName));
return bucket != null;
Expand Down Expand Up @@ -295,16 +294,16 @@ void deleteBlob(String blobName) throws IOException {
*
* @param prefix prefix of the blobs to delete
*/
void deleteBlobsByPrefix(String prefix) throws IOException {
deleteBlobs(listBlobsByPrefix("", prefix).keySet());
private void deleteBlobsByPrefix(String prefix) throws IOException {
deleteBlobsIgnoringIfNotExists(listBlobsByPrefix("", prefix).keySet());
}

/**
* Deletes multiple blobs from the specific bucket using a batch request
*
* @param blobNames names of the blobs to delete
*/
void deleteBlobs(Collection<String> blobNames) throws IOException {
void deleteBlobsIgnoringIfNotExists(Collection<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
return;
}
Expand All @@ -314,17 +313,31 @@ void deleteBlobs(Collection<String> blobNames) throws IOException {
return;
}
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList());
final List<Boolean> deletedStatuses = SocketAccess.doPrivilegedIOException(() -> client().delete(blobIdsToDelete));
assert blobIdsToDelete.size() == deletedStatuses.size();
boolean failed = false;
for (int i = 0; i < blobIdsToDelete.size(); i++) {
if (deletedStatuses.get(i) == false) {
logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucketName);
failed = true;
final StorageException e = SocketAccess.doPrivilegedIOException(() -> {
final AtomicReference<StorageException> ioe = new AtomicReference<>();
final StorageBatch batch = client().batch();
for (BlobId blob : blobIdsToDelete) {
batch.delete(blob).notify(
new BatchResult.Callback<>() {
@Override
public void success(Boolean result) {
}

@Override
public void error(StorageException exception) {
if (exception.getCode() != HTTP_NOT_FOUND) {
if (ioe.compareAndSet(null, exception) == false) {
ioe.get().addSuppressed(exception);
}
}
}
});
}
}
if (failed) {
throw new IOException("Failed to delete all [" + blobIdsToDelete.size() + "] blobs");
batch.submit();
return ioe.get();
});
if (e != null) {
throw new IOException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a message just as for S3BlobContainer? i.e. "Exception when deleting blobs [" + blobNames + "]"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consciously didn't do that here, since it's a little different from the AWS case in that we will always submit the full batch of deletes since the sub-batches are internal to the client (unlike in S3 where we do the splitting up of the batch).
So I expect us to always get an exception for every failed blob, making listing them again a little redundant (plus we're catching these exceptions upstream anyway and logging the blobs that we tried to delete)?
Maybe rather remove the listing from S3 as well? (I didn't realize it at the time, but it seems completely redundant when we always log the list of blobs upstream anyway)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be nicer to collect the files that failed deletion here than at the caller site. It allows filtering the list down to the actual file deletions that failed (i.e. instead of just blobNames, we can filter it down to those that actually experienced a failure). A similar thing can be done for S3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Maybe keep it all in the exception though and simply make sure that the blob names are all in the exception?
I don't like manually collecting the list of failed deletes without exceptions tbh., it doesn't really give us any information. We want to know why a delete failed. The fact that it failed we can see by listing the stale blobs later on already :P

Then we could simply remove the blobs list from all the upstream logging and be done?
=> how about keeping this like they are here and adjusting the S3 implementation and upstream logging in a subsequent PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok to adapt call sites in a follow-up PR, but let's add the message with all the blob names at least in this PR here.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.repositories.gcs;

import com.google.api.gax.paging.Page;
import com.google.cloud.BatchResult;
import com.google.cloud.Policy;
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
Expand All @@ -34,11 +35,13 @@
import com.google.cloud.storage.ServiceAccount;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageBatchResult;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.StorageRpcOptionUtils;
import com.google.cloud.storage.StorageTestUtils;
import org.elasticsearch.core.internal.io.IOUtils;
import org.mockito.stubbing.Answer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -57,6 +60,11 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyVararg;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

/**
* {@link MockStorage} mocks a {@link Storage} client by storing all the blobs
* in a given concurrent map.
Expand Down Expand Up @@ -356,8 +364,25 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
}

@Override
@SuppressWarnings("unchecked")
public StorageBatch batch() {
return null;
final Answer<?> throwOnMissingMock = invocationOnMock -> {
throw new AssertionError("Did not expect call to method [" + invocationOnMock.getMethod().getName() + ']');
};
final StorageBatch batch = mock(StorageBatch.class, throwOnMissingMock);
StorageBatchResult<Boolean> result = mock(StorageBatchResult.class, throwOnMissingMock);
doAnswer(answer -> {
BatchResult.Callback<Boolean, Exception> callback = (BatchResult.Callback<Boolean, Exception>) answer.getArguments()[0];
callback.success(true);
return null;
}).when(result).notify(any(BatchResult.Callback.class));
doAnswer(invocation -> {
final BlobId blobId = (BlobId) invocation.getArguments()[0];
delete(blobId);
return result;
}).when(batch).delete(any(BlobId.class), anyVararg());
doAnswer(invocation -> null).when(batch).submit();
return batch;
}

@Override
Expand Down