-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RBAC: role assignment creation #553
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,23 @@ | ||
#--------------------------------------------------------------------------------------------- | ||
#--------------------------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for license information. | ||
#--------------------------------------------------------------------------------------------- | ||
|
||
from azure.mgmt.authorization import AuthorizationManagementClient | ||
|
||
import azure.cli.commands.parameters #pylint: disable=unused-import | ||
|
||
from azure.cli.commands.client_factory import get_mgmt_service_client | ||
|
||
# FACTORIES | ||
from azure.cli.commands import register_cli_argument | ||
|
||
def _auth_client_factory(**_): | ||
return get_mgmt_service_client(AuthorizationManagementClient) | ||
register_cli_argument('ad app', 'application_object_id', options_list=('--object-id',)) | ||
register_cli_argument('ad app', 'app_id', help='application id') | ||
register_cli_argument('ad', 'display_name', help='object\'s display name or its prefix') | ||
register_cli_argument('ad', 'identifier_uri', | ||
help='graph application identifier, must be in uri format') | ||
register_cli_argument('ad', 'spn', help='service principal name') | ||
register_cli_argument('ad', 'upn', help='user principal name, e.g. [email protected]') | ||
register_cli_argument('ad', 'query_filter', options_list=('--filter',), help='OData filter') | ||
register_cli_argument('role assignment', 'role_assignment_name', | ||
options_list=('--role-assignment-name', '-n')) | ||
register_cli_argument('role assignment', 'role', help='role name or id') | ||
register_cli_argument('ad user', 'mail_nickname', | ||
help='mail alias. Defaults to user principal name') | ||
register_cli_argument('ad user', 'force_change_password_next_login', action='store_true') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,30 +1,144 @@ | ||
#--------------------------------------------------------------------------------------------- | ||
#--------------------------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for license information. | ||
#--------------------------------------------------------------------------------------------- | ||
|
||
import uuid | ||
import re | ||
|
||
from azure.cli._util import CLIError | ||
from azure.cli.commands.client_factory import get_mgmt_service_client, configure_common_settings | ||
from azure.mgmt.authorization import AuthorizationManagementClient | ||
from azure.graphrbac import GraphRbacManagementClient | ||
from azure.mgmt.authorization.models import RoleAssignmentProperties | ||
from ._params import _auth_client_factory | ||
from azure.graphrbac.models import UserCreateParameters, PasswordProfile | ||
|
||
def _auth_client_factory(**_): | ||
return get_mgmt_service_client(AuthorizationManagementClient) | ||
|
||
def _graph_client_factory(**_): | ||
from azure.cli._profile import Profile | ||
profile = Profile() | ||
cred, _, tenant_id = profile.get_login_credentials(True) | ||
client = GraphRbacManagementClient(cred, tenant_id) | ||
configure_common_settings(client) | ||
return client | ||
|
||
#TODO: expand the support to be in parity with node cli | ||
def create_role_assignment(role, object_id, scope=None): | ||
def create_role_assignment(role, assignee, resource_group_name=None, resource_id=None): | ||
''' | ||
:param assignee: represent a user, group, or service principal. | ||
supported format: object id, user sign-in name, or service principal name | ||
:param resource_id: resource id | ||
''' | ||
assignments_client = _auth_client_factory().role_assignments | ||
definitions_client = _auth_client_factory().role_definitions | ||
role_id = role | ||
scope = scope or '/subscriptions/' + definitions_client.config.subscription_id | ||
if not re.match(r'[0-9a-f]{32}\Z', role, re.I): #retrieve role id | ||
|
||
if resource_id: | ||
if resource_group_name: | ||
err = 'Resource group "{}" is redundant because resource id is supplied' | ||
raise CLIError(err.format(resource_group_name)) | ||
scope = resource_id | ||
else: | ||
scope = '/subscriptions/' + definitions_client.config.subscription_id | ||
if resource_group_name: | ||
scope = scope + '/resourceGroups/' + resource_group_name | ||
|
||
role_id = None | ||
try: | ||
uuid.UUID(role) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The object returned is not used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, just for validations. I found using regular expression matching involves more code, say some id might contain '-' in the middle |
||
role_id = role | ||
except ValueError: | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The preferred place to put argument validation is either in a validator (for validation that needs all parsed argument values) or in a type parameter (for individual argument values). This way, you get a consistent error message and usage statement instead of a single line of error text. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the only place we need to sniff the role-id at this moment, once i have more than 1, i will maintain the consistency by sharing the code, and i will consider validators. |
||
|
||
if not role_id: #retrieve role id | ||
role_defs = list(definitions_client.list(scope, "roleName eq '{}'".format(role))) | ||
if not role_defs: | ||
raise CLIError('Role {} doesn\'t exist.'.format(role)) | ||
raise CLIError("Role '{}' doesn't exist.".format(role)) | ||
elif len(role_defs) > 1: | ||
raise CLIError('More than one roles match the given name {}'.format(role)) | ||
ids = [r.id for r in role_defs] | ||
err = ("More than one role matches the given name '{}'. " | ||
"Set 'role' to one of the unique ids from {}'") | ||
raise CLIError(err.format(role, ids)) | ||
role_id = role_defs[0].id | ||
|
||
object_id = _get_object_id(assignee) | ||
properties = RoleAssignmentProperties(role_id, object_id) | ||
assignment_name = uuid.uuid4() | ||
return assignments_client.create(scope, assignment_name, properties) | ||
|
||
def list_apps(client, app_id=None, display_name=None, identifier_uri=None, query_filter=None): | ||
sub_filters = [] | ||
if query_filter: | ||
sub_filters.append(query_filter) | ||
if app_id: | ||
sub_filters.append("appId eq '{}'".format(app_id)) | ||
if display_name: | ||
sub_filters.append("startswith(displayName,'{}')".format(display_name)) | ||
if identifier_uri: | ||
sub_filters.append("identifierUris/any(s:s eq '{}')".format(identifier_uri)) | ||
|
||
return client.list(filter=(' and '.join(sub_filters))) | ||
|
||
def list_sps(client, spn=None, display_name=None, query_filter=None): | ||
sub_filters = [] | ||
if query_filter: | ||
sub_filters.append(query_filter) | ||
if spn: | ||
sub_filters.append("servicePrincipalNames/any(c:c eq '{}')".format(spn)) | ||
if display_name: | ||
sub_filters.append("startswith(displayName,'{}')".format(display_name)) | ||
|
||
return client.list(filter=(' and '.join(sub_filters))) | ||
|
||
def list_users(client, upn=None, display_name=None, query_filter=None): | ||
sub_filters = [] | ||
if query_filter: | ||
sub_filters.append(query_filter) | ||
if upn: | ||
sub_filters.append("userPrincipalName eq '{}'".format(upn)) | ||
if display_name: | ||
sub_filters.append("startswith(displayName,'{}')".format(display_name)) | ||
|
||
return client.list(filter=(' and ').join(sub_filters)) | ||
|
||
def create_user(client, user_principal_name, display_name, password, mail_nickname=None, #pylint: disable=too-many-arguments | ||
immutable_id=None, force_change_password_next_login=False): | ||
''' | ||
:param mail_nickname: mail alias. default to user principal name | ||
''' | ||
mail_nickname = mail_nickname or user_principal_name.split('@')[0] | ||
param = UserCreateParameters(user_principal_name=user_principal_name, account_enabled=True, | ||
display_name=display_name, mail_nickname=mail_nickname, | ||
immutable_id=immutable_id, | ||
password_profile=PasswordProfile( | ||
password, force_change_password_next_login)) | ||
return client.create(param) | ||
|
||
create_user.__doc__ = UserCreateParameters.__doc__ | ||
|
||
def list_groups(client, display_name=None, query_filter=None): | ||
sub_filters = [] | ||
if query_filter: | ||
sub_filters.append(query_filter) | ||
if display_name: | ||
sub_filters.append("startswith(displayName,'{}')".format(display_name)) | ||
|
||
return client.list(filter=(' and ').join(sub_filters)) | ||
|
||
def _get_object_id(assignee): | ||
client = _graph_client_factory() | ||
result = None | ||
if assignee.find('@') >= 0: #looks like a user principal name | ||
result = list(client.users.list(filter="userPrincipalName eq '{}'".format(assignee))) | ||
if not result: | ||
result = list(client.service_principals.list( | ||
filter="servicePrincipalNames/any(c:c eq '{}')".format(assignee))) | ||
if not result: #assume an object id, let us verify it | ||
from azure.graphrbac.models import GetObjectsParameters | ||
result = list(client.objects.get_objects_by_object_ids( | ||
GetObjectsParameters(include_directory_object_references=True, object_ids=[assignee]))) | ||
|
||
#2+ matches should never happen, so we only check 'no match' here | ||
if not result: | ||
raise CLIError("No matches in graph database for '{}'".format(assignee)) | ||
|
||
return result[0].object_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is fine, but we might want to check that it meets text standards