diff --git a/CHANGES.md b/CHANGES.md index dd66e9192b69..a6c61da45107 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -72,6 +72,7 @@ Template variables can be passed with the (json-formatted) `--jinja_variables` flag. * DataFrame API now supports pandas 2.1.x and adds 12 more string functions for Series.([#31185](https://github.com/apache/beam/pull/31185)). * Added BigQuery handler for enrichment transform (Python) ([#31295](https://github.com/apache/beam/pull/31295)) +* Disable soft delete policy when creating the default bucket for a project (Java) ([#31324](https://github.com/apache/beam/pull/31324)). ## Breaking Changes diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle b/sdks/java/extensions/google-cloud-platform-core/build.gradle index d4dfd46f7451..4af856cc9ff3 100644 --- a/sdks/java/extensions/google-cloud-platform-core/build.gradle +++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle @@ -87,8 +87,32 @@ task integrationTestKms(type: Test) { } } +// Note that no runner is specified here, so tests running under this task should not be running +// pipelines. +task integrationTestNoKms(type: Test) { + group = "Verification" + def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' + def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests-cmek' + systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ + "--project=${gcpProject}", + "--tempRoot=${gcpTempRoot}", + ]) + + // Disable Gradle cache: these ITs interact with live service that should always be considered "out of date" + outputs.upToDateWhen { false } + + include '**/*IT.class' + maxParallelForks 4 + classpath = sourceSets.test.runtimeClasspath + testClassesDirs = sourceSets.test.output.classesDirs + useJUnit { + excludeCategories "org.apache.beam.sdk.testing.UsesKms" + } +} + task postCommit { group = "Verification" description = "Integration tests of GCP connectors using the DirectRunner." dependsOn integrationTestKms + dependsOn integrationTestNoKms } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index 3c65f0fa748c..2686a53cbcd9 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -27,6 +27,7 @@ import com.google.api.services.cloudresourcemanager.CloudResourceManager; import com.google.api.services.cloudresourcemanager.model.Project; import com.google.api.services.storage.model.Bucket; +import com.google.api.services.storage.model.Bucket.SoftDeletePolicy; import com.google.auth.Credentials; import com.google.auth.http.HttpCredentialsAdapter; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; @@ -392,40 +393,67 @@ class GcpTempLocationFactory implements DefaultValueFactory { return tempLocation; } - /** - * Creates a default bucket or verifies the existence and proper access control of an existing - * default bucket. Returns the location if successful. - */ @VisibleForTesting - static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManager crmClient) { + static ImmutableList getDefaultBucketNameStubs( + PipelineOptions options, CloudResourceManager crmClient, String bucketNamePrefix) { GcsOptions gcsOptions = options.as(GcsOptions.class); - checkArgument( - isNullOrEmpty(gcsOptions.getDataflowKmsKey()), - "Cannot create a default bucket when --dataflowKmsKey is set."); - final String projectId = gcsOptions.getProject(); checkArgument(!isNullOrEmpty(projectId), "--project is a required option."); - // Look up the project number, to create a default bucket with a stable - // name with no special characters. long projectNumber = 0L; try { projectNumber = getProjectNumber(projectId, crmClient); } catch (IOException e) { throw new RuntimeException("Unable to verify project with ID " + projectId, e); } + String region = DEFAULT_REGION; if (!isNullOrEmpty(gcsOptions.getZone())) { region = getRegionFromZone(gcsOptions.getZone()); } - final String bucketName = "dataflow-staging-" + region + "-" + projectNumber; + + return ImmutableList.of(bucketNamePrefix, region, String.valueOf(projectNumber)); + } + + /** + * Creates a default bucket or verifies the existence and proper access control of an existing + * default bucket. Returns the location if successful. + */ + @VisibleForTesting + static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManager crmClient) { + return tryCreateDefaultBucketWithPrefix(options, crmClient, "dataflow-staging"); + } + + @VisibleForTesting + static String tryCreateDefaultBucketWithPrefix( + PipelineOptions options, CloudResourceManager crmClient, String bucketNamePrefix) { + GcsOptions gcsOptions = options.as(GcsOptions.class); + + checkArgument( + isNullOrEmpty(gcsOptions.getDataflowKmsKey()), + "Cannot create a default bucket when --dataflowKmsKey is set."); + + final List bucketNameStubs = + getDefaultBucketNameStubs(options, crmClient, bucketNamePrefix); + final String region = bucketNameStubs.get(1); + final long projectNumber = Long.parseLong(bucketNameStubs.get(2)); + final String bucketName = String.join("-", bucketNameStubs); LOG.info("No tempLocation specified, attempting to use default bucket: {}", bucketName); - Bucket bucket = new Bucket().setName(bucketName).setLocation(region); + + // Disable soft delete policy for a bucket. + // Reference: https://cloud.google.com/storage/docs/soft-delete + SoftDeletePolicy softDeletePolicy = new SoftDeletePolicy().setRetentionDurationSeconds(0L); + + Bucket bucket = + new Bucket() + .setName(bucketName) + .setLocation(region) + .setSoftDeletePolicy(softDeletePolicy); // Always try to create the bucket before checking access, so that we do not // race with other pipelines that may be attempting to do the same thing. try { - gcsOptions.getGcsUtil().createBucket(projectId, bucket); + gcsOptions.getGcsUtil().createBucket(gcsOptions.getProject(), bucket); } catch (FileAlreadyExistsException e) { LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName); } catch (IOException e) { diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 0338323bb0aa..60e8443d2640 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -652,6 +652,17 @@ public void createBucket(String projectId, Bucket bucket) throws IOException { createBucket(projectId, bucket, createBackOff(), Sleeper.DEFAULT); } + /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ + @Nullable + public Bucket getBucket(GcsPath path) throws IOException { + return getBucket(path, createBackOff(), Sleeper.DEFAULT); + } + + /** Remove an empty {@link Bucket} in Cloud Storage or propagates an exception. */ + public void removeBucket(Bucket bucket) throws IOException { + removeBucket(bucket, createBackOff(), Sleeper.DEFAULT); + } + /** * Returns whether the GCS bucket exists. This will return false if the bucket is inaccessible due * to permissions. @@ -753,6 +764,40 @@ public boolean shouldRetry(IOException e) { } } + @VisibleForTesting + void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException { + Storage.Buckets.Delete getBucket = storageClient.buckets().delete(bucket.getName()); + + try { + ResilientOperation.retry( + getBucket::execute, + backoff, + new RetryDeterminer() { + @Override + public boolean shouldRetry(IOException e) { + if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { + return false; + } + return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); + } + }, + IOException.class, + sleeper); + } catch (GoogleJsonResponseException e) { + if (errorExtractor.accessDenied(e)) { + throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); + } + if (errorExtractor.itemNotFound(e)) { + throw new FileNotFoundException(e.getMessage()); + } + throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format("Error while attempting to remove bucket gs://%s", bucket.getName()), e); + } + } + private static void executeBatches(List batches) throws IOException { ExecutorService executor = MoreExecutors.listeningDecorator( diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java new file mode 100644 index 000000000000..138bb2f291d3 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsIT.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; + +import com.google.api.services.cloudresourcemanager.CloudResourceManager; +import com.google.api.services.storage.model.Bucket; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Random; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtil; +import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Integration tests for {@link GcpOptions}. These tests are designed to run against production + * Google Cloud Storage. + * + *

This is a runnerless integration test, even though the Beam IT framework assumes one. Thus, + * this test should only be run against single runner (such as DirectRunner). + */ +@RunWith(JUnit4.class) +public class GcpOptionsIT { + /** Tests the creation of a default bucket in a project. */ + @Test + public void testCreateDefaultBucket() throws IOException { + TestPipelineOptions options = + TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + + CloudResourceManager crmClient = + GcpOptions.GcpTempLocationFactory.newCloudResourceManagerClient( + options.as(CloudResourceManagerOptions.class)) + .build(); + + GcsOptions gcsOptions = options.as(GcsOptions.class); + GcsUtil gcsUtil = gcsOptions.getGcsUtil(); + + Random rand = new Random(); + // Add a random number to the prefix to avoid collision if multiple test instances + // are run at the same time. To avoid too many dangling buckets if bucket removal fails, + // we limit the max number of possible bucket names in this test to 1000. + String bucketNamePrefix = "gcp-options-it-" + rand.nextInt(1000); + + String bucketName = + String.join( + "-", + GcpOptions.GcpTempLocationFactory.getDefaultBucketNameStubs( + options, crmClient, bucketNamePrefix)); + + // remove existing default bucket if any + try { + Bucket oldBucket = gcsUtil.getBucket(GcsPath.fromUri("gs://" + bucketName)); + gcsUtil.removeBucket(oldBucket); + } catch (FileNotFoundException e) { + // the bucket to be created does not exist, which is good news + } + + String tempLocation = + GcpOptions.GcpTempLocationFactory.tryCreateDefaultBucketWithPrefix( + options, crmClient, bucketNamePrefix); + + GcsPath gcsPath = GcsPath.fromUri(tempLocation); + Bucket bucket = gcsUtil.getBucket(gcsPath); + assertNotNull(bucket); + // verify the soft delete policy is disabled + assertEquals(bucket.getSoftDeletePolicy().getRetentionDurationSeconds(), Long.valueOf(0L)); + + gcsUtil.removeBucket(bucket); + assertThrows(FileNotFoundException.class, () -> gcsUtil.getBucket(gcsPath)); + } +} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java index bf30b4c030e2..a182f0ab82cf 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java @@ -21,10 +21,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.api.services.cloudresourcemanager.CloudResourceManager; @@ -56,6 +59,7 @@ import org.junit.rules.TestRule; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -230,6 +234,14 @@ public void testCreateBucket() throws Exception { String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); assertEquals("gs://dataflow-staging-us-north1-1/temp/", bucket); + + ArgumentCaptor bucketArg = ArgumentCaptor.forClass(Bucket.class); + verify(mockGcsUtil, times(1)).createBucket(anyString(), bucketArg.capture()); + + // verify that the soft delete policy is disabled in the default bucket + assertEquals( + bucketArg.getValue().getSoftDeletePolicy().getRetentionDurationSeconds(), + Long.valueOf(0L)); } @Test