Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: adding more properties to external_account_authorized_user #1169

Merged
merged 10 commits into from
Oct 29, 2022
69 changes: 61 additions & 8 deletions google/auth/external_account_authorized_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(
token_url=None,
token_info_url=None,
revoke_url=None,
scopes=None,
lsirac marked this conversation as resolved.
Show resolved Hide resolved
quota_project_id=None,
):
"""Instantiates a external account authorized user credentials object.
Expand All @@ -90,8 +91,8 @@ def __init__(
None if the token can not be refreshed.
client_secret (str): The OAuth 2.0 client secret. Must be specified for refresh, can be
left as None if the token can not be refreshed.
token_url (str): The optional STS token exchange endpoint. Must be specified fro refresh,
can be leftas None if the token can not be refreshed.
token_url (str): The optional STS token exchange endpoint for refresh. Must be specified for
refresh, can be left as None if the token can not be refreshed.
token_info_url (str): The optional STS endpoint URL for token introspection.
revoke_url (str): The optional STS endpoint URL for revoking tokens.
quota_project_id (str): The optional project ID used for quota and billing.
Expand All @@ -102,9 +103,6 @@ def __init__(
google.auth.external_account_authorized_user.Credentials: The
constructed credentials.
"""
if not any((refresh_token, token)):
raise ValueError("Either `refresh_token` or `token` should be set.")

super(Credentials, self).__init__()

self.token = token
Expand All @@ -117,6 +115,14 @@ def __init__(
self._client_secret = client_secret
self._revoke_url = revoke_url
self._quota_project_id = quota_project_id
self._scopes = scopes
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved

if not self.valid and not self.can_refresh:
raise ValueError(
"Token should be created with fields to make it valid (`token` and "
"`expiry`), or fields to allow it to refresh (`refresh_token`, "
"`token_url`, `client_id`, `client_secret`)."
)

self._client_auth = None
if self._client_id:
Expand Down Expand Up @@ -154,20 +160,68 @@ def constructor_args(self):
"token": self.token,
"expiry": self.expiry,
"revoke_url": self._revoke_url,
"scopes": self._scopes,
"quota_project_id": self._quota_project_id,
}

@property
def scopes(self):
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
"""Optional[str]: The OAuth 2.0 permission scopes."""
return self._scopes

@property
def requires_scopes(self):
""" False: OAuth 2.0 credentials have their scopes set when
the initial token is requested and can not be changed."""
return False

@property
def client_id(self):
"""Optional[str]: The OAuth 2.0 client ID."""
return self._client_id

@property
def client_secret(self):
"""Optional[str]: The OAuth 2.0 client secret."""
return self._client_secret

@property
def audience(self):
"""Optional[str]: The STS audience which contains the resource name for the
workforce pool and the provider identifier in that pool."""
return self._audience

@property
def refresh_token(self):
"""Optional[str]: The OAuth 2.0 refresh token."""
return self._refresh_token

@property
def token_url(self):
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
"""Optional[str]: The STS token exchange endpoint for refresh."""
return self._token_url

@property
def token_info_url(self):
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
"""Optional[str]: The STS endpoint for token info."""
return self._token_info_url

@property
def revoke_url(self):
"""Optional[str]: The STS endpoint for token revocation."""
return self._revoke_url

@property
def is_user(self):
""" True: This credential always represents a user."""
return True

@property
def can_refresh(self):
return all(
(self._refresh_token, self._token_url, self._client_id, self._client_secret)
)

def get_project_id(self):
"""Retrieves the project ID corresponding to the workload identity or workforce pool.
For workforce pool credentials, it returns the project ID corresponding to
Expand Down Expand Up @@ -203,9 +257,7 @@ def refresh(self, request):
google.auth.exceptions.RefreshError: If the credentials could
not be refreshed.
"""
if not all(
(self._refresh_token, self._token_url, self._client_id, self._client_secret)
):
if not self.can_refresh:
raise exceptions.RefreshError(
"The credentials do not contain the necessary fields need to "
"refresh the access token. You must specify refresh_token, "
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -270,6 +322,7 @@ def from_info(cls, info, **kwargs):
expiry=expiry,
revoke_url=info.get("revoke_url"),
quota_project_id=info.get("quota_project_id"),
scopes=info.get("scopes"),
**kwargs
)

Expand Down
116 changes: 81 additions & 35 deletions tests/test_external_account_authorized_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
CLIENT_SECRET = "password"
# Base64 encoding of "username:password".
BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
SCOPES = ["email", "profile"]
NOW = datetime.datetime(1990, 8, 27, 6, 54, 30)


class TestCredentials(object):
Expand Down Expand Up @@ -87,27 +89,74 @@ def test_default_state(self):
assert not creds.token
assert not creds.valid
assert not creds.requires_scopes
assert not creds.scopes
assert not creds.revoke_url
assert creds.token_info_url
assert creds.client_id
assert creds.client_secret
assert creds.is_user
assert creds.refresh_token == REFRESH_TOKEN
assert creds.audience == AUDIENCE
assert creds.token_url == TOKEN_URL
ScruffyProdigy marked this conversation as resolved.
Show resolved Hide resolved

def test_basic_create(self):
creds = external_account_authorized_user.Credentials(
token=ACCESS_TOKEN, expiry=datetime.datetime.max
token=ACCESS_TOKEN,
expiry=datetime.datetime.max,
scopes=SCOPES,
revoke_url=REVOKE_URL,
)

assert creds.expiry == datetime.datetime.max
assert not creds.expired
assert creds.token == ACCESS_TOKEN
assert creds.valid
assert not creds.requires_scopes
assert creds.scopes == SCOPES
assert creds.is_user
assert creds.revoke_url == REVOKE_URL

def test_stunted_create(self):
def test_stunted_create_no_refresh_token(self):
with pytest.raises(ValueError) as excinfo:
self.make_credentials(token=None, refresh_token=None)

assert excinfo.match(r"Either `refresh_token` or `token` should be set")
assert excinfo.match(
r"Token should be created with fields to make it valid \(`token` and "
r"`expiry`\), or fields to allow it to refresh \(`refresh_token`, "
r"`token_url`, `client_id`, `client_secret`\)\."
)

def test_stunted_create_no_token_url(self):
with pytest.raises(ValueError) as excinfo:
self.make_credentials(token=None, token_url=None)

assert excinfo.match(
r"Token should be created with fields to make it valid \(`token` and "
r"`expiry`\), or fields to allow it to refresh \(`refresh_token`, "
r"`token_url`, `client_id`, `client_secret`\)\."
)

def test_stunted_create_no_client_id(self):
with pytest.raises(ValueError) as excinfo:
self.make_credentials(token=None, client_id=None)

assert excinfo.match(
r"Token should be created with fields to make it valid \(`token` and "
r"`expiry`\), or fields to allow it to refresh \(`refresh_token`, "
r"`token_url`, `client_id`, `client_secret`\)\."
)

@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_stunted_create_no_client_secret(self):
with pytest.raises(ValueError) as excinfo:
self.make_credentials(token=None, client_secret=None)

assert excinfo.match(
r"Token should be created with fields to make it valid \(`token` and "
r"`expiry`\), or fields to allow it to refresh \(`refresh_token`, "
r"`token_url`, `client_id`, `client_secret`\)\."
)

@mock.patch("google.auth._helpers.utcnow", return_value=NOW)
def test_refresh_auth_success(self, utcnow):
request = self.make_mock_request(
status=http_client.OK,
Expand Down Expand Up @@ -137,7 +186,7 @@ def test_refresh_auth_success(self, utcnow):
),
)

@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
@mock.patch("google.auth._helpers.utcnow", return_value=NOW)
def test_refresh_auth_success_new_refresh_token(self, utcnow):
request = self.make_mock_request(
status=http_client.OK,
Expand Down Expand Up @@ -228,7 +277,7 @@ def test_refresh_without_refresh_token(self):

def test_refresh_without_token_url(self):
request = self.make_mock_request()
creds = self.make_credentials(token_url=None)
creds = self.make_credentials(token_url=None, token=ACCESS_TOKEN)

with pytest.raises(exceptions.RefreshError) as excinfo:
creds.refresh(request)
Expand All @@ -239,16 +288,14 @@ def test_refresh_without_token_url(self):

assert not creds.expiry
assert not creds.expired
assert not creds.token
assert not creds.valid
assert not creds.requires_scopes
assert creds.is_user

request.assert_not_called()

def test_refresh_without_client_id(self):
request = self.make_mock_request()
creds = self.make_credentials(client_id=None)
creds = self.make_credentials(client_id=None, token=ACCESS_TOKEN)

with pytest.raises(exceptions.RefreshError) as excinfo:
creds.refresh(request)
Expand All @@ -259,16 +306,14 @@ def test_refresh_without_client_id(self):

assert not creds.expiry
assert not creds.expired
assert not creds.token
assert not creds.valid
assert not creds.requires_scopes
assert creds.is_user

request.assert_not_called()

def test_refresh_without_client_secret(self):
request = self.make_mock_request()
creds = self.make_credentials(client_secret=None)
creds = self.make_credentials(client_secret=None, token=ACCESS_TOKEN)

with pytest.raises(exceptions.RefreshError) as excinfo:
creds.refresh(request)
Expand All @@ -279,8 +324,6 @@ def test_refresh_without_client_secret(self):

assert not creds.expiry
assert not creds.expired
assert not creds.token
assert not creds.valid
assert not creds.requires_scopes
assert creds.is_user

Expand All @@ -304,7 +347,7 @@ def test_info(self):
def test_info_full(self):
creds = self.make_credentials(
token=ACCESS_TOKEN,
expiry=datetime.datetime.min,
expiry=NOW,
revoke_url=REVOKE_URL,
quota_project_id=QUOTA_PROJECT_ID,
)
Expand All @@ -317,7 +360,7 @@ def test_info_full(self):
assert info["client_id"] == CLIENT_ID
assert info["client_secret"] == CLIENT_SECRET
assert info["token"] == ACCESS_TOKEN
assert info["expiry"] == datetime.datetime.min.isoformat() + "Z"
assert info["expiry"] == NOW.isoformat() + "Z"
assert info["revoke_url"] == REVOKE_URL
assert info["quota_project_id"] == QUOTA_PROJECT_ID

Expand All @@ -340,7 +383,7 @@ def test_to_json(self):
def test_to_json_full(self):
creds = self.make_credentials(
token=ACCESS_TOKEN,
expiry=datetime.datetime.min,
expiry=NOW,
revoke_url=REVOKE_URL,
quota_project_id=QUOTA_PROJECT_ID,
)
Expand All @@ -354,14 +397,14 @@ def test_to_json_full(self):
assert info["client_id"] == CLIENT_ID
assert info["client_secret"] == CLIENT_SECRET
assert info["token"] == ACCESS_TOKEN
assert info["expiry"] == datetime.datetime.min.isoformat() + "Z"
assert info["expiry"] == NOW.isoformat() + "Z"
assert info["revoke_url"] == REVOKE_URL
assert info["quota_project_id"] == QUOTA_PROJECT_ID

def test_to_json_full_with_strip(self):
creds = self.make_credentials(
token=ACCESS_TOKEN,
expiry=datetime.datetime.min,
expiry=NOW,
revoke_url=REVOKE_URL,
quota_project_id=QUOTA_PROJECT_ID,
)
Expand All @@ -386,7 +429,7 @@ def test_get_project_id(self):
def test_with_quota_project(self):
creds = self.make_credentials(
token=ACCESS_TOKEN,
expiry=datetime.datetime.min,
expiry=NOW,
revoke_url=REVOKE_URL,
quota_project_id=QUOTA_PROJECT_ID,
)
Expand All @@ -405,7 +448,7 @@ def test_with_quota_project(self):
def test_with_token_uri(self):
creds = self.make_credentials(
token=ACCESS_TOKEN,
expiry=datetime.datetime.min,
expiry=NOW,
revoke_url=REVOKE_URL,
quota_project_id=QUOTA_PROJECT_ID,
)
Expand All @@ -428,36 +471,39 @@ def test_from_file_required_options_only(self, tmpdir):
creds = external_account_authorized_user.Credentials.from_file(str(config_file))

assert isinstance(creds, external_account_authorized_user.Credentials)
assert creds._audience == AUDIENCE
assert creds._refresh_token == REFRESH_TOKEN
assert creds._token_url == TOKEN_URL
assert creds._token_info_url == TOKEN_INFO_URL
assert creds._client_id == CLIENT_ID
assert creds._client_secret == CLIENT_SECRET
assert creds.audience == AUDIENCE
assert creds.refresh_token == REFRESH_TOKEN
assert creds.token_url == TOKEN_URL
assert creds.token_info_url == TOKEN_INFO_URL
assert creds.client_id == CLIENT_ID
assert creds.client_secret == CLIENT_SECRET
assert creds.token is None
assert creds.expiry is None
assert creds.scopes is None
assert creds._revoke_url is None
assert creds._quota_project_id is None

def test_from_file_full_options(self, tmpdir):
from_creds = self.make_credentials(
token=ACCESS_TOKEN,
expiry=datetime.datetime.min,
expiry=NOW,
revoke_url=REVOKE_URL,
quota_project_id=QUOTA_PROJECT_ID,
scopes=SCOPES,
)
config_file = tmpdir.join("config.json")
config_file.write(from_creds.to_json())
creds = external_account_authorized_user.Credentials.from_file(str(config_file))

assert isinstance(creds, external_account_authorized_user.Credentials)
assert creds._audience == AUDIENCE
assert creds._refresh_token == REFRESH_TOKEN
assert creds._token_url == TOKEN_URL
assert creds._token_info_url == TOKEN_INFO_URL
assert creds._client_id == CLIENT_ID
assert creds._client_secret == CLIENT_SECRET
assert creds.audience == AUDIENCE
assert creds.refresh_token == REFRESH_TOKEN
assert creds.token_url == TOKEN_URL
assert creds.token_info_url == TOKEN_INFO_URL
assert creds.client_id == CLIENT_ID
assert creds.client_secret == CLIENT_SECRET
assert creds.token == ACCESS_TOKEN
assert creds.expiry == datetime.datetime.min
assert creds.expiry == NOW
assert creds.scopes == SCOPES
assert creds._revoke_url == REVOKE_URL
assert creds._quota_project_id == QUOTA_PROJECT_ID