Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14014] Add parameter for service account impersonation in GCP credentials #17394

Merged
merged 1 commit into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions runners/google-cloud-dataflow-java/examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,28 @@ def dockerJavaImageName = project(':runners:google-cloud-dataflow-java').ext.doc
// If -PuseExecutableStage is set, the use_executable_stage_bundle_execution wil be enabled.
def fnapiExperiments = project.hasProperty('useExecutableStage') ? 'beam_fn_api_use_deprecated_read,use_executable_stage_bundle_execution' : "beam_fn_api,beam_fn_api_use_deprecated_read"

def commonConfig = { dataflowWorkerJar, workerHarnessContainerImage = '', additionalOptions = [] ->
// For testing impersonation, we use three ingredients:
// - a principal to impersonate
// - a dataflow service account that only that principal is allowed to launch jobs as
// - a temp root that only the above two accounts have access to
//
// Jenkins and Dataflow workers both run as GCE default service account. So we remove that account from all the above.
def impersonateServiceAccount = project.findProperty('gcpImpersonateServiceAccount') ?: '[email protected]'
def dataflowWorkerImpersonationServiceAccount = project.findProperty('dataflowWorkerImpersonationServiceAccount') ?:
"impersonation-dataflow-worker@apache-beam-testing.iam.gserviceaccount.com"
def impersonationTempRoot = project.findProperty('gcpImpersonationTempRoot') ?: 'gs://impersonation-test-bucket/tmproot'


def commonConfig = { Map args ->
if (!args.dataflowWorkerJar) {
throw new GradleException("Dataflow integration test configuration requires dataflowWorkerJar parameter")
}

def actualDataflowWorkerJar = args.dataflowWorkerJar
def actualWorkerHarnessContainerImage = args.workerHarnessContainerImage ?: ''
def actualGcsTempRoot = args.gcsTempRoot ?: gcsTempRoot
def additionalOptions = args.additionalOptions ?: []

// return the preevaluated configuration closure
return {
testClassesDirs = files(project(":examples:java").sourceSets.test.output.classesDirs)
Expand All @@ -54,10 +75,10 @@ def commonConfig = { dataflowWorkerJar, workerHarnessContainerImage = '', additi
def preCommitBeamTestPipelineOptions = [
"--project=${gcpProject}",
"--region=${gcpRegion}",
"--tempRoot=${gcsTempRoot}",
"--tempRoot=${actualGcsTempRoot}",
"--runner=TestDataflowRunner",
"--dataflowWorkerJar=${dataflowWorkerJar}",
"--workerHarnessContainerImage=${workerHarnessContainerImage}"
"--dataflowWorkerJar=${actualDataflowWorkerJar}",
"--workerHarnessContainerImage=${actualWorkerHarnessContainerImage}"
] + additionalOptions
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(preCommitBeamTestPipelineOptions)
}
Expand All @@ -66,14 +87,30 @@ def commonConfig = { dataflowWorkerJar, workerHarnessContainerImage = '', additi
task preCommitLegacyWorker(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
with commonConfig(dataflowWorkerJar)
with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
}

task preCommitLegacyWorkerImpersonate(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
with commonConfig(
dataflowWorkerJar: dataflowWorkerJar,
gcsTempRoot: impersonationTempRoot,
additionalOptions: [
"--impersonateServiceAccount=${impersonateServiceAccount}",
"--serviceAccount=${dataflowWorkerImpersonationServiceAccount}"
])
}

task verifyFnApiWorker(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar"
dependsOn ":runners:google-cloud-dataflow-java:buildAndPushDockerContainer"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
with commonConfig(dataflowWorkerJar, dockerJavaImageName, ["--experiments=${fnapiExperiments}"])
with commonConfig(
dataflowWorkerJar: dataflowWorkerJar,
workerHarnessContainerImage: dockerJavaImageName,
additionalOptions: ["--experiments=${fnapiExperiments}"]
)
useJUnit {
excludeCategories 'org.apache.beam.sdk.testing.StreamingIT'
}
Expand All @@ -83,7 +120,7 @@ task postCommitLegacyWorkerJava11(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
systemProperty "java.specification.version", "11"
with commonConfig(dataflowWorkerJar)
with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
}

task java11PostCommit() {
Expand All @@ -94,7 +131,7 @@ task postCommitLegacyWorkerJava17(type: Test) {
dependsOn ":runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar"
def dataflowWorkerJar = project.findProperty('dataflowWorkerJar') ?: project(":runners:google-cloud-dataflow-java:worker:legacy-worker").shadowJar.archivePath
systemProperty "java.specification.version", "17"
with commonConfig(dataflowWorkerJar)
with commonConfig(dataflowWorkerJar: dataflowWorkerJar)
}

task java17PostCommit() {
Expand All @@ -103,6 +140,7 @@ task java17PostCommit() {

task preCommit() {
dependsOn preCommitLegacyWorker
dependsOn preCommitLegacyWorkerImpersonate
}

task verifyPortabilityApi() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.google.auth.Credentials;
import java.io.IOException;
import java.security.GeneralSecurityException;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Construct an oauth credential to be used by the SDK and the SDK workers. */
public interface CredentialFactory {
@Nullable
Credentials getCredential() throws IOException, GeneralSecurityException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@
*/
package org.apache.beam.sdk.extensions.gcp.auth;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ImpersonatedCredentials;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Construct an oauth credential to be used by the SDK and the SDK workers. Returns a GCP
* credential.
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class GcpCredentialFactory implements CredentialFactory {
/**
* The scope cloud-platform provides access to all Cloud Platform resources. cloud-platform isn't
Expand All @@ -50,17 +52,52 @@ public class GcpCredentialFactory implements CredentialFactory {
"https://www.googleapis.com/auth/bigquery.insertdata",
"https://www.googleapis.com/auth/pubsub");

private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory();
// If non-null, a list of service account emails to be used as an impersonation chain.
private @Nullable List<String> impersonateServiceAccountChain;

private GcpCredentialFactory(@Nullable List<String> impersonateServiceAccountChain) {
if (impersonateServiceAccountChain != null) {
checkArgument(impersonateServiceAccountChain.size() > 0);
}

this.impersonateServiceAccountChain = impersonateServiceAccountChain;
}

public static GcpCredentialFactory fromOptions(PipelineOptions options) {
return INSTANCE;
@Nullable
String impersonateServiceAccountArg =
options.as(GcpOptions.class).getImpersonateServiceAccount();

@Nullable
List<String> impersonateServiceAccountChain =
impersonateServiceAccountArg == null
? null
: Arrays.asList(impersonateServiceAccountArg.split(","));

return new GcpCredentialFactory(impersonateServiceAccountChain);
}

/** Returns a default GCP {@link Credentials} or null when it fails. */
@Override
public Credentials getCredential() {
public @Nullable Credentials getCredential() {
try {
return GoogleCredentials.getApplicationDefault().createScoped(SCOPES);
GoogleCredentials applicationDefaultCredentials =
GoogleCredentials.getApplicationDefault().createScoped(SCOPES);

if (impersonateServiceAccountChain == null) {
return applicationDefaultCredentials;
} else {
String targetPrincipal =
impersonateServiceAccountChain.get(impersonateServiceAccountChain.size() - 1);
List<String> delegationChain =
impersonateServiceAccountChain.subList(0, impersonateServiceAccountChain.size() - 1);

GoogleCredentials impersonationCredentials =
ImpersonatedCredentials.create(
applicationDefaultCredentials, targetPrincipal, delegationChain, SCOPES, 0);

return impersonationCredentials;
}
} catch (IOException e) {
// Ignore the exception
// Pipelines that only access to public data should be able to run without credentials.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,25 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {

void setGcpCredential(Credentials value);

/**
* All API requests will be made as the given service account or target service account in an
* impersonation delegation chain instead of the currently selected account. You can specify
* either a single service account as the impersonator, or a comma-separated list of service
* accounts to create an impersonation delegation chain.
*/
@Description(
"All API requests will be made as the given service account or"
+ " target service account in an impersonation delegation chain"
+ " instead of the currently selected account. You can specify"
+ " either a single service account as the impersonator, or a"
+ " comma-separated list of service accounts to create an"
+ " impersonation delegation chain.")
@JsonIgnore
@Nullable
String getImpersonateServiceAccount();

void setImpersonateServiceAccount(String impersonateServiceAccount);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd to take a string that we parse instead of List/String[] since PipelineOptionsFactory can do this already for us. See

String[] args = new String[] {"--string=stringValue1,stringValue2,stringValue3"};

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I did not realize this. That could be a useful follow-up. We do want the format of the parameter to be the same across SDKs and across Beam and gcloud, etc. But it seems that comma-separated strings will be the same across all of them.


/** Experiment to turn on the Streaming Engine experiment. */
String STREAMING_ENGINE_EXPERIMENT = "enable_streaming_engine";

Expand Down