Skip to content

Commit

Permalink
Merge pull request apache#17394: [BEAM-14014] Add parameter for servi…
Browse files Browse the repository at this point in the history
…ce account impersonation in GCP credentials
  • Loading branch information
kennknowles authored and y1chi committed May 10, 2022
1 parent 6e2cca1 commit c61c24d
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 15 deletions.
54 changes: 46 additions & 8 deletions runners/google-cloud-dataflow-java/examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,28 @@ def dockerJavaImageName = project(':runners:google-cloud-dataflow-java').ext.doc
// If -PuseExecutableStage is set, the use_executable_stage_bundle_execution wil be enabled.
def fnapiExperiments = project.hasProperty('useExecutableStage') ? 'beam_fn_api_use_deprecated_read,use_executable_stage_bundle_execution' : "beam_fn_api,beam_fn_api_use_deprecated_read"

def commonConfig = { dataflowWorkerJar, workerHarnessContainerImage = '', additionalOptions = [] ->
// For testing impersonation, we use three ingredients:
// - a principal to impersonate
// - a dataflow service account that only that principal is allowed to launch jobs as
// - a temp root that only the above two accounts have access to
//
// Jenkins and Dataflow workers both run as GCE default service account. So we remove that account from all the above.
def impersonateServiceAccount = project.findProperty('gcpImpersonateServiceAccount') ?: '[email protected]'
def dataflowWorkerImpersonationServiceAccount = project.findProperty('dataflowWorkerImpersonationServiceAccount') ?:
"impersonation-dataflow-worker@apache-beam-testing.iam.gserviceaccount.com"
def impersonationTempRoot = project.findProperty('gcpImpersonationTempRoot') ?: 'gs://impersonation-test-bucket/tmproot'


def commonConfig = { Map args ->
if (!args.dataflowWorkerJar) {
throw new GradleException("Dataflow integration test configuration requires dataflowWorkerJar parameter")
}

def actualDataflowWorkerJar = args.dataflowWorkerJar
def actualWorkerHarnessContainerImage = args.workerHarnessContainerImage ?: ''
def actualGcsTempRoot = args.gcsTempRoot ?: gcsTempRoot
def additionalOptions = args.additionalOptions ?: []

// return the preevaluated configuration closure
return {
testClassesDirs = files(project(":examples:java").sourceSets.test.output.classesDirs)
Expand All @@ -54,10 +75,10 @@ def commonConfig = { dataflowWorkerJar, workerHarnessContainerImage = '', additi
def preCommitBeamTestPipelineOptions = [
"--project=${gcpProject}",
"--region=${gcpRegion}",
"--tempRoot=${gcsTempRoot}",
"--tempRoot=${actualGcsTempRoot}",
"--runner=TestDataflowRunner",
"--dataflowWorkerJar=${dataflowWorkerJar}",
"--workerHarnessContainerImage=${workerHarnessContainerImage}"
"--dataflowWorkerJar=${actualDataflowWorkerJar}",
"--workerHarnessContainerImage=${actualWorkerHarnessContainerImage}"
] + additionalOptions
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(preCommitBeamTestPipelineOptions)
}
Expand All @@ -66,14 +87,30 @@ def commonConfig = { dataflowWorkerJar, workerHarnessContainerImage = '', additi
task preCommitLegacyWorker(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
with commonConfig(dataflowWorkerJar)
with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
}

task preCommitLegacyWorkerImpersonate(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
with commonConfig(
dataflowWorkerJar: dataflowWorkerJar,
gcsTempRoot: impersonationTempRoot,
additionalOptions: [
"--impersonateServiceAccount=${impersonateServiceAccount}",
"--serviceAccount=${dataflowWorkerImpersonationServiceAccount}"
])
}

task verifyFnApiWorker(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
dependsOn ":runners:google-cloud-dataflow-java:buildAndPushDockerContainer"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
with commonConfig(dataflowWorkerJar, dockerJavaImageName, ["--experiments=${fnapiExperiments}"])
with commonConfig(
dataflowWorkerJar: dataflowWorkerJar,
workerHarnessContainerImage: dockerJavaImageName,
additionalOptions: ["--experiments=${fnapiExperiments}"]
)
useJUnit {
excludeCategories 'org.apache.beam.sdk.testing.StreamingIT'
}
Expand All @@ -83,7 +120,7 @@ task postCommitLegacyWorkerJava11(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
systemProperty "java.specification.version", "11"
with commonConfig(dataflowWorkerJar)
with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
}

task java11PostCommit() {
Expand All @@ -94,7 +131,7 @@ task postCommitLegacyWorkerJava17(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
systemProperty "java.specification.version", "17"
with commonConfig(dataflowWorkerJar)
with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
}

task java17PostCommit() {
Expand All @@ -103,6 +140,7 @@ task java17PostCommit() {

task preCommit() {
dependsOn preCommitLegacyWorker
dependsOn preCommitLegacyWorkerImpersonate
}

task verifyPortabilityApi() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.google.auth.Credentials;
import java.io.IOException;
import java.security.GeneralSecurityException;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Construct an oauth credential to be used by the SDK and the SDK workers. */
public interface CredentialFactory {
@Nullable
Credentials getCredential() throws IOException, GeneralSecurityException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@
*/
package org.apache.beam.sdk.extensions.gcp.auth;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ImpersonatedCredentials;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Construct an oauth credential to be used by the SDK and the SDK workers. Returns a GCP
* credential.
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class GcpCredentialFactory implements CredentialFactory {
/**
* The scope cloud-platform provides access to all Cloud Platform resources. cloud-platform isn't
Expand All @@ -50,17 +52,52 @@ public class GcpCredentialFactory implements CredentialFactory {
"https://www.googleapis.com/auth/bigquery.insertdata",
"https://www.googleapis.com/auth/pubsub");

private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory();
// If non-null, a list of service account emails to be used as an impersonation chain.
private @Nullable List<String> impersonateServiceAccountChain;

private GcpCredentialFactory(@Nullable List<String> impersonateServiceAccountChain) {
if (impersonateServiceAccountChain != null) {
checkArgument(impersonateServiceAccountChain.size() > 0);
}

this.impersonateServiceAccountChain = impersonateServiceAccountChain;
}

public static GcpCredentialFactory fromOptions(PipelineOptions options) {
return INSTANCE;
@Nullable
String impersonateServiceAccountArg =
options.as(GcpOptions.class).getImpersonateServiceAccount();

@Nullable
List<String> impersonateServiceAccountChain =
impersonateServiceAccountArg == null
? null
: Arrays.asList(impersonateServiceAccountArg.split(","));

return new GcpCredentialFactory(impersonateServiceAccountChain);
}

/** Returns a default GCP {@link Credentials} or null when it fails. */
@Override
public Credentials getCredential() {
public @Nullable Credentials getCredential() {
try {
return GoogleCredentials.getApplicationDefault().createScoped(SCOPES);
GoogleCredentials applicationDefaultCredentials =
GoogleCredentials.getApplicationDefault().createScoped(SCOPES);

if (impersonateServiceAccountChain == null) {
return applicationDefaultCredentials;
} else {
String targetPrincipal =
impersonateServiceAccountChain.get(impersonateServiceAccountChain.size() - 1);
List<String> delegationChain =
impersonateServiceAccountChain.subList(0, impersonateServiceAccountChain.size() - 1);

GoogleCredentials impersonationCredentials =
ImpersonatedCredentials.create(
applicationDefaultCredentials, targetPrincipal, delegationChain, SCOPES, 0);

return impersonationCredentials;
}
} catch (IOException e) {
// Ignore the exception
// Pipelines that only access to public data should be able to run without credentials.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,25 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {

void setGcpCredential(Credentials value);

/**
* All API requests will be made as the given service account or target service account in an
* impersonation delegation chain instead of the currently selected account. You can specify
* either a single service account as the impersonator, or a comma-separated list of service
* accounts to create an impersonation delegation chain.
*/
@Description(
"All API requests will be made as the given service account or"
+ " target service account in an impersonation delegation chain"
+ " instead of the currently selected account. You can specify"
+ " either a single service account as the impersonator, or a"
+ " comma-separated list of service accounts to create an"
+ " impersonation delegation chain.")
@JsonIgnore
@Nullable
String getImpersonateServiceAccount();

void setImpersonateServiceAccount(String impersonateServiceAccount);

/** Experiment to turn on the Streaming Engine experiment. */
String STREAMING_ENGINE_EXPERIMENT = "enable_streaming_engine";

Expand Down

0 comments on commit c61c24d

Please sign in to comment.