-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-14014] Support impersonation credentials in dataflow runner #17244
Merged
y1chi
merged 58 commits into
apache:master
from
ryanthompson591:add_impersonation_chain
May 13, 2022
Merged
Changes from all commits
Commits
Show all changes
58 commits
Select commit
Hold shift + click to select a range
c28530b
first commit
ryanthompson591 c7f7d50
Merge branch 'master' into add_impersonation_chain
ryanthompson591 bd76cf8
added parameters
ryanthompson591 36ff1d2
Merge branch 'master' into add_impersonation_chain
ryanthompson591 07a12a9
add impersonation credentials to the runner
ryanthompson591 9fc90c2
some small updates
ryanthompson591 08d7987
fixed options to set in file systems
ryanthompson591 b19b7f1
Merge branch 'master' into add_impersonation_chain
ryanthompson591 b8f5ab0
added impersonation to credentials
ryanthompson591 99d6257
added impersonation accounts to bigquery
ryanthompson591 6ecb314
reverted sdk_worker_main
ryanthompson591 570feb5
Merge branch 'master' into add_impersonation_chain
ryanthompson591 ca5cad7
dont use dataflow_runner is setting credntials
ryanthompson591 a3a138c
lint
ryanthompson591 5b26b77
change line lenght of pipeline options
ryanthompson591 92ba9b6
use similar delegate_to name throughout
ryanthompson591 7f19d1b
Merge branch 'master' into add_impersonation_chain
ryanthompson591 4b3d39b
fixed name errors
ryanthompson591 d5e283b
added a credentials fix
ryanthompson591 e03adc3
moved impersonation into a single parameter
ryanthompson591 dcbedb9
Merge branch 'master' into add_impersonation_chain
ryanthompson591 4a1d6b0
fixed bug in delegate_to, added logging
ryanthompson591 73340c0
Merge branch 'master' into add_impersonation_chain
ryanthompson591 a18b3d3
added auth
ryanthompson591 f92857a
added test
ryanthompson591 e474ff8
changed impersonation location
ryanthompson591 de49c5e
Merge branch 'apache:master' into add_impersonation_chain
ryanthompson591 1f4c3e6
updated tests
ryanthompson591 6c869b0
Don't pass impersonate_service_account to workers
ryanthompson591 15bfda9
yapf
ryanthompson591 433585a
simplified code in readthrough
ryanthompson591 37c25a1
added wordcount integration test for impersonation
ryanthompson591 4ba5f9b
changed output dir to use test dir
ryanthompson591 8865c1b
removed unused assert_bucket_exists call
ryanthompson591 17babbd
linted line length
ryanthompson591 caa950a
lint line length
ryanthompson591 b5acf5b
modified to be a parameter
ryanthompson591 f628a4e
large refactor to make pipeline_options a required parameter get_serv…
ryanthompson591 d986185
finished changes
ryanthompson591 5a6cf5a
updated pylint
ryanthompson591 b50466a
Merge branch 'master' into add_impersonation_chain
ryanthompson591 a7ad6e5
fixed import order
ryanthompson591 1c9eaf5
small lint change added line
ryanthompson591 d9b51ae
yapf
ryanthompson591 2253391
remove set_impersonation_accounts
ryanthompson591 a01635b
changed {} to none to avoid dangerous default value
ryanthompson591 bd2dd11
fix mock to have parameter
ryanthompson591 677cdde
fixes filesystem test
ryanthompson591 e6cd15a
temp added more logging details
ryanthompson591 727ccc6
fixed bug where None was being turning into a string
ryanthompson591 156df85
typo fi
ryanthompson591 f6a06c5
reverted run_integration_test.sh will make that change in a different cl
ryanthompson591 7488468
valentyns comments
ryanthompson591 11857a2
removed dict option for impersonated credentials
ryanthompson591 1ec2b1d
added dictionary back
ryanthompson591 f346e8c
reset credentials for credential test
ryanthompson591 fbf58bc
added comment
ryanthompson591 72cf610
imported auth
ryanthompson591 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,8 +23,12 @@ | |
import socket | ||
import threading | ||
|
||
from apache_beam.options.pipeline_options import GoogleCloudOptions | ||
from apache_beam.options.pipeline_options import PipelineOptions | ||
|
||
# google.auth is only available when Beam is installed with the gcp extra. | ||
try: | ||
from google.auth import impersonated_credentials | ||
import google.auth | ||
import google_auth_httplib2 | ||
_GOOGLE_AUTH_AVAILABLE = True | ||
|
@@ -40,6 +44,16 @@ | |
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
CLIENT_SCOPES = [ | ||
'https://www.googleapis.com/auth/bigquery', | ||
'https://www.googleapis.com/auth/cloud-platform', | ||
'https://www.googleapis.com/auth/devstorage.full_control', | ||
'https://www.googleapis.com/auth/userinfo.email', | ||
'https://www.googleapis.com/auth/datastore', | ||
'https://www.googleapis.com/auth/spanner.admin', | ||
'https://www.googleapis.com/auth/spanner.data' | ||
] | ||
|
||
|
||
def set_running_in_gce(worker_executing_project): | ||
"""For internal use only; no backwards-compatibility guarantees. | ||
|
@@ -59,16 +73,19 @@ def set_running_in_gce(worker_executing_project): | |
executing_project = worker_executing_project | ||
|
||
|
||
def get_service_credentials(): | ||
def get_service_credentials(pipeline_options): | ||
"""For internal use only; no backwards-compatibility guarantees. | ||
|
||
Get credentials to access Google services. | ||
Args: | ||
pipeline_options: Pipeline options, used in creating credentials | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd say something here like: Pipeline options. Required on initial credential initialization when the impersonate_service_account option is provided, otherwise passing You can do it in a follow up change |
||
like impersonated credentials. | ||
|
||
Returns: | ||
A ``google.auth.credentials.Credentials`` object or None if credentials | ||
not found. Returned object is thread-safe. | ||
""" | ||
return _Credentials.get_service_credentials() | ||
return _Credentials.get_service_credentials(pipeline_options) | ||
|
||
|
||
if _GOOGLE_AUTH_AVAILABLE: | ||
|
@@ -108,10 +125,7 @@ class _Credentials(object): | |
_credentials = None | ||
|
||
@classmethod | ||
def get_service_credentials(cls): | ||
if cls._credentials_init: | ||
return cls._credentials | ||
|
||
def get_service_credentials(cls, pipeline_options): | ||
with cls._credentials_lock: | ||
if cls._credentials_init: | ||
return cls._credentials | ||
|
@@ -124,31 +138,24 @@ def get_service_credentials(cls): | |
_LOGGER.info( | ||
"socket default timeout is %s seconds.", socket.getdefaulttimeout()) | ||
|
||
cls._credentials = cls._get_service_credentials() | ||
cls._credentials = cls._get_service_credentials(pipeline_options) | ||
cls._credentials_init = True | ||
|
||
return cls._credentials | ||
|
||
@staticmethod | ||
def _get_service_credentials(): | ||
def _get_service_credentials(pipeline_options): | ||
if not _GOOGLE_AUTH_AVAILABLE: | ||
_LOGGER.warning( | ||
'Unable to find default credentials because the google-auth library ' | ||
'is not available. Install the gcp extra (apache_beam[gcp]) to use ' | ||
'Google default credentials. Connecting anonymously.') | ||
return None | ||
|
||
client_scopes = [ | ||
'https://www.googleapis.com/auth/bigquery', | ||
ryanthompson591 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
'https://www.googleapis.com/auth/cloud-platform', | ||
'https://www.googleapis.com/auth/devstorage.full_control', | ||
'https://www.googleapis.com/auth/userinfo.email', | ||
'https://www.googleapis.com/auth/datastore', | ||
'https://www.googleapis.com/auth/spanner.admin', | ||
'https://www.googleapis.com/auth/spanner.data' | ||
] | ||
try: | ||
credentials, _ = google.auth.default(scopes=client_scopes) # pylint: disable=c-extension-no-member | ||
credentials, _ = google.auth.default(scopes=CLIENT_SCOPES) # pylint: disable=c-extension-no-member | ||
credentials = _Credentials._add_impersonation_credentials( | ||
credentials, pipeline_options) | ||
credentials = _ApitoolsCredentialsAdapter(credentials) | ||
logging.debug( | ||
'Connecting using Google Application Default ' | ||
|
@@ -160,3 +167,26 @@ def _get_service_credentials(): | |
'Connecting anonymously.', | ||
e) | ||
return None | ||
|
||
@staticmethod | ||
def _add_impersonation_credentials(credentials, pipeline_options): | ||
if isinstance(pipeline_options, PipelineOptions): | ||
gcs_options = pipeline_options.view_as(GoogleCloudOptions) | ||
impersonate_service_account = gcs_options.impersonate_service_account | ||
elif isinstance(pipeline_options, dict): | ||
impersonate_service_account = pipeline_options.get( | ||
'impersonate_service_account') | ||
else: | ||
return credentials | ||
if impersonate_service_account: | ||
_LOGGER.info('Impersonating: %s', impersonate_service_account) | ||
impersonate_accounts = impersonate_service_account.split(',') | ||
target_principal = impersonate_accounts[-1] | ||
delegate_to = impersonate_accounts[0:-1] | ||
credentials = impersonated_credentials.Credentials( | ||
source_credentials=credentials, | ||
target_principal=target_principal, | ||
delegates=delegate_to, | ||
target_scopes=CLIENT_SCOPES, | ||
) | ||
return credentials |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any concerns with setting
pipeline_options=None
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also mention in the docstring that
How pipeline options are used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea if
None
is a valid type, then mention in the docstring what it means