diff --git a/runners/google-cloud-dataflow-java/examples/build.gradle b/runners/google-cloud-dataflow-java/examples/build.gradle index 21f806308970..36379f63cff7 100644 --- a/runners/google-cloud-dataflow-java/examples/build.gradle +++ b/runners/google-cloud-dataflow-java/examples/build.gradle @@ -42,7 +42,28 @@ def dockerJavaImageName = project(':runners:google-cloud-dataflow-java').ext.doc // If -PuseExecutableStage is set, the use_executable_stage_bundle_execution wil be enabled. def fnapiExperiments = project.hasProperty('useExecutableStage') ? 'beam_fn_api_use_deprecated_read,use_executable_stage_bundle_execution' : "beam_fn_api,beam_fn_api_use_deprecated_read" -def commonConfig = { dataflowWorkerJar, workerHarnessContainerImage = '', additionalOptions = [] -> +// For testing impersonation, we use three ingredients: +// - a principal to impersonate +// - a dataflow service account that only that principal is allowed to launch jobs as +// - a temp root that only the above two accounts have access to +// +// Jenkins and Dataflow workers both run as GCE default service account. So we remove that account from all the above. +def impersonateServiceAccount = project.findProperty('gcpImpersonateServiceAccount') ?: 'allows-impersonation@apache-beam-testing.iam.gserviceaccount.com' +def dataflowWorkerImpersonationServiceAccount = project.findProperty('dataflowWorkerImpersonationServiceAccount') ?: + "impersonation-dataflow-worker@apache-beam-testing.iam.gserviceaccount.com" +def impersonationTempRoot = project.findProperty('gcpImpersonationTempRoot') ?: 'gs://impersonation-test-bucket/tmproot' + + +def commonConfig = { Map args -> + if (!args.dataflowWorkerJar) { + throw new GradleException("Dataflow integration test configuration requires dataflowWorkerJar parameter") + } + + def actualDataflowWorkerJar = args.dataflowWorkerJar + def actualWorkerHarnessContainerImage = args.workerHarnessContainerImage ?: '' + def actualGcsTempRoot = args.gcsTempRoot ?: gcsTempRoot + def additionalOptions = args.additionalOptions ?: [] + // return the preevaluated configuration closure return { testClassesDirs = files(project(":examples:java").sourceSets.test.output.classesDirs) @@ -54,10 +75,10 @@ def commonConfig = { dataflowWorkerJar, workerHarnessContainerImage = '', additi def preCommitBeamTestPipelineOptions = [ "--project=${gcpProject}", "--region=${gcpRegion}", - "--tempRoot=${gcsTempRoot}", + "--tempRoot=${actualGcsTempRoot}", "--runner=TestDataflowRunner", - "--dataflowWorkerJar=${dataflowWorkerJar}", - "--workerHarnessContainerImage=${workerHarnessContainerImage}" + "--dataflowWorkerJar=${actualDataflowWorkerJar}", + "--workerHarnessContainerImage=${actualWorkerHarnessContainerImage}" ] + additionalOptions systemProperty "beamTestPipelineOptions", JsonOutput.toJson(preCommitBeamTestPipelineOptions) } @@ -66,14 +87,30 @@ def commonConfig = { dataflowWorkerJar, workerHarnessContainerImage = '', additi task preCommitLegacyWorker(type: Test) { dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar" def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath - with commonConfig(dataflowWorkerJar) + with commonConfig(dataflowWorkerJar: dataflowWorkerJar) +} + +task preCommitLegacyWorkerImpersonate(type: Test) { + dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar" + def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath + with commonConfig( + dataflowWorkerJar: dataflowWorkerJar, + gcsTempRoot: impersonationTempRoot, + additionalOptions: [ + "--impersonateServiceAccount=${impersonateServiceAccount}", + "--serviceAccount=${dataflowWorkerImpersonationServiceAccount}" + ]) } task verifyFnApiWorker(type: Test) { dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar" dependsOn ":runners:google-cloud-dataflow-java:buildAndPushDockerContainer" def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath - with commonConfig(dataflowWorkerJar, dockerJavaImageName, ["--experiments=${fnapiExperiments}"]) + with commonConfig( + dataflowWorkerJar: dataflowWorkerJar, + workerHarnessContainerImage: dockerJavaImageName, + additionalOptions: ["--experiments=${fnapiExperiments}"] + ) useJUnit { excludeCategories 'org.apache.beam.sdk.testing.StreamingIT' } @@ -83,7 +120,7 @@ task postCommitLegacyWorkerJava11(type: Test) { dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar" def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath systemProperty "java.specification.version", "11" - with commonConfig(dataflowWorkerJar) + with commonConfig(dataflowWorkerJar: dataflowWorkerJar) } task java11PostCommit() { @@ -94,7 +131,7 @@ task postCommitLegacyWorkerJava17(type: Test) { dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar" def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath systemProperty "java.specification.version", "17" - with commonConfig(dataflowWorkerJar) + with commonConfig(dataflowWorkerJar: dataflowWorkerJar) } task java17PostCommit() { @@ -103,6 +140,7 @@ task java17PostCommit() { task preCommit() { dependsOn preCommitLegacyWorker + dependsOn preCommitLegacyWorkerImpersonate } task verifyPortabilityApi() { diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java index 3dc8afc5fe8a..6e1e71dbb4ed 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java @@ -20,8 +20,10 @@ import com.google.auth.Credentials; import java.io.IOException; import java.security.GeneralSecurityException; +import org.checkerframework.checker.nullness.qual.Nullable; /** Construct an oauth credential to be used by the SDK and the SDK workers. */ public interface CredentialFactory { + @Nullable Credentials getCredential() throws IOException, GeneralSecurityException; } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java index e7193da1c6b1..71685e3f1ab4 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java @@ -17,20 +17,22 @@ */ package org.apache.beam.sdk.extensions.gcp.auth; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ImpersonatedCredentials; import java.io.IOException; import java.util.Arrays; import java.util.List; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Construct an oauth credential to be used by the SDK and the SDK workers. Returns a GCP * credential. */ -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) public class GcpCredentialFactory implements CredentialFactory { /** * The scope cloud-platform provides access to all Cloud Platform resources. cloud-platform isn't @@ -50,17 +52,52 @@ public class GcpCredentialFactory implements CredentialFactory { "https://www.googleapis.com/auth/bigquery.insertdata", "https://www.googleapis.com/auth/pubsub"); - private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory(); + // If non-null, a list of service account emails to be used as an impersonation chain. + private @Nullable List impersonateServiceAccountChain; + + private GcpCredentialFactory(@Nullable List impersonateServiceAccountChain) { + if (impersonateServiceAccountChain != null) { + checkArgument(impersonateServiceAccountChain.size() > 0); + } + + this.impersonateServiceAccountChain = impersonateServiceAccountChain; + } public static GcpCredentialFactory fromOptions(PipelineOptions options) { - return INSTANCE; + @Nullable + String impersonateServiceAccountArg = + options.as(GcpOptions.class).getImpersonateServiceAccount(); + + @Nullable + List impersonateServiceAccountChain = + impersonateServiceAccountArg == null + ? null + : Arrays.asList(impersonateServiceAccountArg.split(",")); + + return new GcpCredentialFactory(impersonateServiceAccountChain); } /** Returns a default GCP {@link Credentials} or null when it fails. */ @Override - public Credentials getCredential() { + public @Nullable Credentials getCredential() { try { - return GoogleCredentials.getApplicationDefault().createScoped(SCOPES); + GoogleCredentials applicationDefaultCredentials = + GoogleCredentials.getApplicationDefault().createScoped(SCOPES); + + if (impersonateServiceAccountChain == null) { + return applicationDefaultCredentials; + } else { + String targetPrincipal = + impersonateServiceAccountChain.get(impersonateServiceAccountChain.size() - 1); + List delegationChain = + impersonateServiceAccountChain.subList(0, impersonateServiceAccountChain.size() - 1); + + GoogleCredentials impersonationCredentials = + ImpersonatedCredentials.create( + applicationDefaultCredentials, targetPrincipal, delegationChain, SCOPES, 0); + + return impersonationCredentials; + } } catch (IOException e) { // Ignore the exception // Pipelines that only access to public data should be able to run without credentials. diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index f593e2f5c9d8..0f1afec29bef 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -169,6 +169,25 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { void setGcpCredential(Credentials value); + /** + * All API requests will be made as the given service account or target service account in an + * impersonation delegation chain instead of the currently selected account. You can specify + * either a single service account as the impersonator, or a comma-separated list of service + * accounts to create an impersonation delegation chain. + */ + @Description( + "All API requests will be made as the given service account or" + + " target service account in an impersonation delegation chain" + + " instead of the currently selected account. You can specify" + + " either a single service account as the impersonator, or a" + + " comma-separated list of service accounts to create an" + + " impersonation delegation chain.") + @JsonIgnore + @Nullable + String getImpersonateServiceAccount(); + + void setImpersonateServiceAccount(String impersonateServiceAccount); + /** Experiment to turn on the Streaming Engine experiment. */ String STREAMING_ENGINE_EXPERIMENT = "enable_streaming_engine";