Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14014] Support impersonation credentials in dataflow runner #17244

Merged
merged 58 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
c28530b
first commit
ryanthompson591 Mar 30, 2022
c7f7d50
Merge branch 'master' into add_impersonation_chain
ryanthompson591 Mar 31, 2022
bd76cf8
added parameters
ryanthompson591 Apr 1, 2022
36ff1d2
Merge branch 'master' into add_impersonation_chain
ryanthompson591 Apr 1, 2022
07a12a9
add impersonation credentials to the runner
ryanthompson591 Apr 1, 2022
9fc90c2
some small updates
ryanthompson591 Apr 1, 2022
08d7987
fixed options to set in file systems
ryanthompson591 Apr 1, 2022
b19b7f1
Merge branch 'master' into add_impersonation_chain
ryanthompson591 Apr 4, 2022
b8f5ab0
added impersonation to credentials
ryanthompson591 Apr 4, 2022
99d6257
added impersonation accounts to bigquery
ryanthompson591 Apr 4, 2022
6ecb314
reverted sdk_worker_main
ryanthompson591 Apr 5, 2022
570feb5
Merge branch 'master' into add_impersonation_chain
ryanthompson591 Apr 5, 2022
ca5cad7
dont use dataflow_runner is setting credntials
ryanthompson591 Apr 6, 2022
a3a138c
lint
ryanthompson591 Apr 6, 2022
5b26b77
change line lenght of pipeline options
ryanthompson591 Apr 6, 2022
92ba9b6
use similar delegate_to name throughout
ryanthompson591 Apr 6, 2022
7f19d1b
Merge branch 'master' into add_impersonation_chain
ryanthompson591 Apr 11, 2022
4b3d39b
fixed name errors
ryanthompson591 Apr 12, 2022
d5e283b
added a credentials fix
ryanthompson591 Apr 12, 2022
e03adc3
moved impersonation into a single parameter
ryanthompson591 Apr 12, 2022
dcbedb9
Merge branch 'master' into add_impersonation_chain
ryanthompson591 Apr 20, 2022
4a1d6b0
fixed bug in delegate_to, added logging
ryanthompson591 Apr 21, 2022
73340c0
Merge branch 'master' into add_impersonation_chain
ryanthompson591 May 6, 2022
a18b3d3
added auth
ryanthompson591 May 6, 2022
f92857a
added test
ryanthompson591 May 9, 2022
e474ff8
changed impersonation location
ryanthompson591 May 9, 2022
de49c5e
Merge branch 'apache:master' into add_impersonation_chain
ryanthompson591 May 9, 2022
1f4c3e6
updated tests
ryanthompson591 May 10, 2022
6c869b0
Don't pass impersonate_service_account to workers
ryanthompson591 May 10, 2022
15bfda9
yapf
ryanthompson591 May 10, 2022
433585a
simplified code in readthrough
ryanthompson591 May 10, 2022
37c25a1
added wordcount integration test for impersonation
ryanthompson591 May 10, 2022
4ba5f9b
changed output dir to use test dir
ryanthompson591 May 11, 2022
8865c1b
removed unused assert_bucket_exists call
ryanthompson591 May 11, 2022
17babbd
linted line length
ryanthompson591 May 11, 2022
caa950a
lint line length
ryanthompson591 May 11, 2022
b5acf5b
modified to be a parameter
ryanthompson591 May 11, 2022
f628a4e
large refactor to make pipeline_options a required parameter get_serv…
ryanthompson591 May 11, 2022
d986185
finished changes
ryanthompson591 May 11, 2022
5a6cf5a
updated pylint
ryanthompson591 May 11, 2022
b50466a
Merge branch 'master' into add_impersonation_chain
ryanthompson591 May 11, 2022
a7ad6e5
fixed import order
ryanthompson591 May 11, 2022
1c9eaf5
small lint change added line
ryanthompson591 May 11, 2022
d9b51ae
yapf
ryanthompson591 May 11, 2022
2253391
remove set_impersonation_accounts
ryanthompson591 May 11, 2022
a01635b
changed {} to none to avoid dangerous default value
ryanthompson591 May 11, 2022
bd2dd11
fix mock to have parameter
ryanthompson591 May 11, 2022
677cdde
fixes filesystem test
ryanthompson591 May 11, 2022
e6cd15a
temp added more logging details
ryanthompson591 May 12, 2022
727ccc6
fixed bug where None was being turning into a string
ryanthompson591 May 12, 2022
156df85
typo fi
ryanthompson591 May 12, 2022
f6a06c5
reverted run_integration_test.sh will make that change in a different cl
ryanthompson591 May 12, 2022
7488468
valentyns comments
ryanthompson591 May 12, 2022
11857a2
removed dict option for impersonated credentials
ryanthompson591 May 12, 2022
1ec2b1d
added dictionary back
ryanthompson591 May 12, 2022
f346e8c
reset credentials for credential test
ryanthompson591 May 13, 2022
fbf58bc
added comment
ryanthompson591 May 13, 2022
72cf610
imported auth
ryanthompson591 May 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions sdks/python/apache_beam/examples/wordcount_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from hamcrest.core.core.allof import all_of

from apache_beam.examples import wordcount
from apache_beam.internal.gcp import auth
from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsReader
from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
Expand All @@ -47,6 +48,44 @@ class WordCountIT(unittest.TestCase):
def test_wordcount_it(self):
self._run_wordcount_it(wordcount.run)

@pytest.mark.it_postcommit
@pytest.mark.sickbay_direct
@pytest.mark.sickbay_spark
@pytest.mark.sickbay_flink
def test_wordcount_impersonation_it(self):
"""Tests impersonation on dataflow.

For testing impersonation, we use three ingredients:
- a principal to impersonate
- a dataflow service account that only that principal is
allowed to launch jobs as
- a temp root that only the above two accounts have access to

Jenkins and Dataflow workers both run as GCE default service account.
So we remove that account from all the above.
"""
# Credentials need to be reset or this test will fail and credentials
# from a previous test will be used.
auth._Credentials._credentials_init = False

ACCOUNT_TO_IMPERSONATE = (
'allows-impersonation@apache-'
'beam-testing.iam.gserviceaccount.com')
RUNNER_ACCOUNT = (
'impersonation-dataflow-worker@'
'apache-beam-testing.iam.gserviceaccount.com')
TEMP_DIR = 'gs://impersonation-test-bucket/temp-it'
STAGING_LOCATION = 'gs://impersonation-test-bucket/staging-it'
extra_options = {
'impersonate_service_account': ACCOUNT_TO_IMPERSONATE,
'service_account_email': RUNNER_ACCOUNT,
'temp_location': TEMP_DIR,
'staging_location': STAGING_LOCATION
}
self._run_wordcount_it(wordcount.run, **extra_options)
# Reset credentials for future tests.
auth._Credentials._credentials_init = False

@pytest.mark.it_postcommit
@pytest.mark.it_validatescontainer
def test_wordcount_fnapi_it(self):
Expand Down
66 changes: 48 additions & 18 deletions sdks/python/apache_beam/internal/gcp/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import socket
import threading

from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions

# google.auth is only available when Beam is installed with the gcp extra.
try:
from google.auth import impersonated_credentials
import google.auth
import google_auth_httplib2
_GOOGLE_AUTH_AVAILABLE = True
Expand All @@ -40,6 +44,16 @@

_LOGGER = logging.getLogger(__name__)

CLIENT_SCOPES = [
'https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/devstorage.full_control',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/datastore',
'https://www.googleapis.com/auth/spanner.admin',
'https://www.googleapis.com/auth/spanner.data'
]


def set_running_in_gce(worker_executing_project):
"""For internal use only; no backwards-compatibility guarantees.
Expand All @@ -59,16 +73,19 @@ def set_running_in_gce(worker_executing_project):
executing_project = worker_executing_project


def get_service_credentials():
def get_service_credentials(pipeline_options):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concerns with setting pipeline_options=None ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also mention in the docstring that

How pipeline options are used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea if None is a valid type, then mention in the docstring what it means

"""For internal use only; no backwards-compatibility guarantees.

Get credentials to access Google services.
Args:
pipeline_options: Pipeline options, used in creating credentials
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say something here like:

Pipeline options. Required on initial credential initialization when the impersonate_service_account option is provided, otherwise passing None is acceptable.

You can do it in a follow up change

like impersonated credentials.

Returns:
A ``google.auth.credentials.Credentials`` object or None if credentials
not found. Returned object is thread-safe.
"""
return _Credentials.get_service_credentials()
return _Credentials.get_service_credentials(pipeline_options)


if _GOOGLE_AUTH_AVAILABLE:
Expand Down Expand Up @@ -108,10 +125,7 @@ class _Credentials(object):
_credentials = None

@classmethod
def get_service_credentials(cls):
if cls._credentials_init:
return cls._credentials

def get_service_credentials(cls, pipeline_options):
with cls._credentials_lock:
if cls._credentials_init:
return cls._credentials
Expand All @@ -124,31 +138,24 @@ def get_service_credentials(cls):
_LOGGER.info(
"socket default timeout is %s seconds.", socket.getdefaulttimeout())

cls._credentials = cls._get_service_credentials()
cls._credentials = cls._get_service_credentials(pipeline_options)
cls._credentials_init = True

return cls._credentials

@staticmethod
def _get_service_credentials():
def _get_service_credentials(pipeline_options):
if not _GOOGLE_AUTH_AVAILABLE:
_LOGGER.warning(
'Unable to find default credentials because the google-auth library '
'is not available. Install the gcp extra (apache_beam[gcp]) to use '
'Google default credentials. Connecting anonymously.')
return None

client_scopes = [
'https://www.googleapis.com/auth/bigquery',
ryanthompson591 marked this conversation as resolved.
Show resolved Hide resolved
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/devstorage.full_control',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/datastore',
'https://www.googleapis.com/auth/spanner.admin',
'https://www.googleapis.com/auth/spanner.data'
]
try:
credentials, _ = google.auth.default(scopes=client_scopes) # pylint: disable=c-extension-no-member
credentials, _ = google.auth.default(scopes=CLIENT_SCOPES) # pylint: disable=c-extension-no-member
credentials = _Credentials._add_impersonation_credentials(
credentials, pipeline_options)
credentials = _ApitoolsCredentialsAdapter(credentials)
logging.debug(
'Connecting using Google Application Default '
Expand All @@ -160,3 +167,26 @@ def _get_service_credentials():
'Connecting anonymously.',
e)
return None

@staticmethod
def _add_impersonation_credentials(credentials, pipeline_options):
if isinstance(pipeline_options, PipelineOptions):
gcs_options = pipeline_options.view_as(GoogleCloudOptions)
impersonate_service_account = gcs_options.impersonate_service_account
elif isinstance(pipeline_options, dict):
impersonate_service_account = pipeline_options.get(
'impersonate_service_account')
else:
return credentials
if impersonate_service_account:
_LOGGER.info('Impersonating: %s', impersonate_service_account)
impersonate_accounts = impersonate_service_account.split(',')
target_principal = impersonate_accounts[-1]
delegate_to = impersonate_accounts[0:-1]
credentials = impersonated_credentials.Credentials(
source_credentials=credentials,
target_principal=target_principal,
delegates=delegate_to,
target_scopes=CLIENT_SCOPES,
)
return credentials
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class BigQueryWrapper(object):

The wrapper is used to organize all the BigQuery integration points and
offer a common place where retry logic for failures can be controlled.
In addition it offers various functions used both in sources and sinks
In addition, it offers various functions used both in sources and sinks
(e.g., find and create tables, query a table, etc.).
"""

Expand All @@ -328,7 +328,7 @@ class BigQueryWrapper(object):
def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
self.client = client or bigquery.BigqueryV2(
http=get_new_http(),
credentials=auth.get_service_credentials(),
credentials=auth.get_service_credentials(None),
response_encoding='utf8',
additional_http_headers={
"user-agent": "apache-beam-%s" % apache_beam.__version__
Expand Down
31 changes: 19 additions & 12 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class GCSFileSystem(FileSystem):
CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations
GCS_PREFIX = 'gs://'

def __init__(self, pipeline_options):
super().__init__(pipeline_options)
tvalentyn marked this conversation as resolved.
Show resolved Hide resolved
self._pipeline_options = pipeline_options

@classmethod
def scheme(cls):
"""URI scheme for the FileSystem
Expand Down Expand Up @@ -127,12 +131,15 @@ def _list(self, dir_or_prefix):
``BeamIOError``: if listing fails, but not if no files were found.
"""
try:
for path, (size, updated) in gcsio.GcsIO().list_prefix(
for path, (size, updated) in self._gcsIO().list_prefix(
dir_or_prefix, with_metadata=True).items():
yield FileMetadata(path, size, updated)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})

def _gcsIO(self):
return gcsio.GcsIO(pipeline_options=self._pipeline_options)

def _path_open(
self,
path,
Expand All @@ -143,7 +150,7 @@ def _path_open(
"""
compression_type = FileSystem._get_compression_type(path, compression_type)
mime_type = CompressionTypes.mime_type(compression_type, mime_type)
raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type)
raw_file = self._gcsIO().open(path, mode, mime_type=mime_type)
if compression_type == CompressionTypes.UNCOMPRESSED:
return raw_file
return CompressedFile(raw_file, compression_type=compression_type)
Expand Down Expand Up @@ -206,9 +213,9 @@ def _copy_path(source, destination):
raise ValueError('Destination %r must be GCS path.' % destination)
# Use copy_tree if the path ends with / as it is a directory
if source.endswith('/'):
gcsio.GcsIO().copytree(source, destination)
self._gcsIO().copytree(source, destination)
else:
gcsio.GcsIO().copy(source, destination)
self._gcsIO().copy(source, destination)

exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
Expand Down Expand Up @@ -249,15 +256,15 @@ def rename(self, source_file_names, destination_file_names):
# Execute GCS renames if any and return exceptions.
exceptions = {}
for batch in gcs_batches:
copy_statuses = gcsio.GcsIO().copy_batch(batch)
copy_statuses = self._gcsIO().copy_batch(batch)
copy_succeeded = []
for src, dest, exception in copy_statuses:
if exception:
exceptions[(src, dest)] = exception
else:
copy_succeeded.append((src, dest))
delete_batch = [src for src, dest in copy_succeeded]
delete_statuses = gcsio.GcsIO().delete_batch(delete_batch)
delete_statuses = self._gcsIO().delete_batch(delete_batch)
for i, (src, exception) in enumerate(delete_statuses):
dest = copy_succeeded[i][1]
if exception:
Expand All @@ -274,7 +281,7 @@ def exists(self, path):

Returns: boolean flag indicating if path exists
"""
return gcsio.GcsIO().exists(path)
return self._gcsIO().exists(path)

def size(self, path):
"""Get size of path on the FileSystem.
Expand All @@ -287,7 +294,7 @@ def size(self, path):
Raises:
``BeamIOError``: if path doesn't exist.
"""
return gcsio.GcsIO().size(path)
return self._gcsIO().size(path)

def last_updated(self, path):
"""Get UNIX Epoch time in seconds on the FileSystem.
Expand All @@ -300,7 +307,7 @@ def last_updated(self, path):
Raises:
``BeamIOError``: if path doesn't exist.
"""
return gcsio.GcsIO().last_updated(path)
return self._gcsIO().last_updated(path)

def checksum(self, path):
"""Fetch checksum metadata of a file on the
Expand All @@ -315,7 +322,7 @@ def checksum(self, path):
``BeamIOError``: if path isn't a file or doesn't exist.
"""
try:
return gcsio.GcsIO().checksum(path)
return self._gcsIO().checksum(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Checksum operation failed", {path: e})

Expand All @@ -332,7 +339,7 @@ def metadata(self, path):
``BeamIOError``: if path isn't a file or doesn't exist.
"""
try:
file_metadata = gcsio.GcsIO()._status(path)
file_metadata = self._gcsIO()._status(path)
return FileMetadata(
path, file_metadata['size'], file_metadata['last_updated'])
except Exception as e: # pylint: disable=broad-except
Expand All @@ -353,7 +360,7 @@ def _delete_path(path):
else:
path_to_use = path
match_result = self.match([path_to_use])[0]
statuses = gcsio.GcsIO().delete_batch(
statuses = self._gcsIO().delete_batch(
[m.path for m in match_result.metadata_list])
# pylint: disable=used-before-assignment
failures = [e for (_, e) in statuses if e is not None]
Expand Down
Loading