Skip to content
This repository has been archived by the owner on Oct 15, 2019. It is now read-only.

refactor(auth): update auth classes #34

Merged
merged 3 commits into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ repos:
hooks:
- id: python-no-eval
- id: python-no-log-warn
- id: python-use-type-annotations
TheKevJames marked this conversation as resolved.
Show resolved Hide resolved
- id: rst-backticks
- repo: https://github.com/Lucas-C/pre-commit-hooks-markup
rev: v1.0.0
Expand Down
5 changes: 4 additions & 1 deletion gcloud/rest/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from pkg_resources import get_distribution
__version__ = get_distribution('gcloud-rest').version

from gcloud.rest.auth.iam import IamClient
from gcloud.rest.auth.token import Token
from gcloud.rest.auth.utils import decode
from gcloud.rest.auth.utils import encode


__all__ = ['__version__', 'Token']
__all__ = ['__version__', 'IamClient', 'Token', 'decode', 'encode']
149 changes: 149 additions & 0 deletions gcloud/rest/auth/iam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import json
import threading
from typing import Dict
from typing import List
from typing import Optional
from typing import Union # pylint: disable=unused-import

import requests

from .token import Token
from .token import Type
from .utils import encode


API_ROOT_IAM = 'https://iam.googleapis.com/v1'
API_ROOT_IAM_CREDENTIALS = 'https://iamcredentials.googleapis.com/v1'
SCOPES = ['https://www.googleapis.com/auth/iam']


class IamClient:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inherit from object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, a lint error caught this too. Thought I had fixed it, will be fixed in the next update to this PR.

def __init__(self,
service_file=None, # type: Optional[str]
session=None, # type: requests.Session
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Optional to all of these

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so my understanding was that there won't be a need to add Optional to these based on gcloud-aio's Type annotations - https://github.com/talkiq/gcloud-aio/blob/master/auth/gcloud/aio/auth/token.py#L65.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like these are incorrect in gcloud-aio as well; these would only be accepted if we have the --no-strict-optional flag set in mypy, which we should avoid since it tends to help mask missing null check issues. We should similarly make this change in -aio, since it'd be great to be more accurate about these things.

google_api_lock=None, # type: threading.RLock
token=None # type: Token
):
# type: (...) -> None
self.session = session
self.google_api_lock = google_api_lock or threading.RLock()
self.token = token or Token(service_file=service_file,
session=session, scopes=SCOPES)

if self.token.token_type != Type.SERVICE_ACCOUNT:
raise TypeError('IAM Credentials Client is only valid for use'
' with Service Accounts')

def headers(self):
# type: () -> Dict[str, str]
token = self.token.get()
return {
'Authorization': 'Bearer {}'.format(token),
}

@property
def service_account_email(self):
# type: () -> Optional[str]
return self.token.service_data.get('client_email')

# https://cloud.google.com/iam/reference/rest/v1/projects.serviceAccounts.keys/get
# pylint: disable=too-many-arguments
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just throw this into our .pre-commit-config.yaml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

def get_public_key(self,
key_id=None, # type: Optional[str]
key=None, # type: Optional[str]
service_account_email=None, # type: Optional[str]
project=None, # type: Optional[str]
session=None, # type: requests.Session
timeout=10 # type: int
):
# type: (...) -> Dict[str, str]
service_account_email = (service_account_email
or self.service_account_email)
project = project or self.token.get_project()

if not key_id and not key:
raise ValueError('get_public_key must have either key_id or key')

if not key:
key = 'projects/{}/serviceAccounts/{}/keys/{}' \
.format(project, service_account_email, key_id)

url = '{}/{}?publicKeyType=TYPE_X509_PEM_FILE'.format(
API_ROOT_IAM, key)
headers = self.headers()

if not self.session:
self.session = requests.Session()

session = session or self.session
with self.google_api_lock:
resp = session.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
return resp.json()

# https://cloud.google.com/iam/reference/rest/v1/projects.serviceAccounts.keys/list
def list_public_keys(self,
service_account_email=None, # type: Optional[str]
project=None, # type: Optional[str]
session=None, # type: requests.Session
timeout=10 # type: int
):
# type: (...) -> List[Dict[str, str]]
service_account_email = (service_account_email
or self.service_account_email)
project = project or self.token.get_project()

url = ('{}/projects/{}/serviceAccounts/'
'{}/keys').format(API_ROOT_IAM, project, service_account_email)

headers = self.headers()

if not self.session:
self.session = requests.Session()

session = session or self.session
with self.google_api_lock:
resp = session.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
return resp.json().get('keys', [])

# https://cloud.google.com/iam/credentials/reference/rest/v1/projects.serviceAccounts/signBlob
# pylint: disable=too-many-arguments
def sign_blob(self,
payload, # type: Optional[Union[str, bytes]]
service_account_email=None, # type: Optional[str]
delegates=None, # type: Optional[list]
session=None, # type: requests.Session
timeout=10 # type: int
):
# type: (...) -> Dict[str, str]
service_account_email = (
service_account_email or self.service_account_email)
if not service_account_email:
raise TypeError('sign_blob must have a valid '
'service_account_email')

resource_name = 'projects/-/serviceAccounts/{}'.format(
service_account_email)
url = '{}/{}:signBlob'.format(API_ROOT_IAM_CREDENTIALS, resource_name)

json_str = json.dumps({
'delegates': delegates or [resource_name],
'payload': encode(payload).decode('utf-8'),
})

headers = self.headers()
headers.update({
'Content-Length': str(len(json_str)),
'Content-Type': 'application/json',
})

if not self.session:
self.session = requests.Session()

session = session or self.session
with self.google_api_lock:
resp = session.post(url, data=json_str,
headers=headers, timeout=timeout)
resp.raise_for_status()
return resp.json()
Loading