Skip to content

Commit

Permalink
Autofill Authorization Code in login URL (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
shaundewberry authored and benkehoe committed Mar 6, 2023
1 parent a4a1463 commit 83aff0f
Show file tree
Hide file tree
Showing 10 changed files with 709 additions and 420 deletions.
3 changes: 2 additions & 1 deletion CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
* [Geordam](https://github.com/Geordam)
* [Imran](https://github.com/fai555)
* [O'Shaughnessy Evans](https://github.com/oshaughnessy)
* [Anthony Engelstad](https://github.com/anton0)
* [Anthony Engelstad](https://github.com/anton0)
* [Shaun Dewberry](https://github.com/shaundewberry)
4 changes: 2 additions & 2 deletions cli/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ pyyaml = "^5.3.1"
jsonschema = "^3.2.0"
aws-error-utils = "^2.4"
python-dateutil = "^2.8.1"
aws-sso-lib = "^1.13.0"
# aws-sso-lib = { path = "../lib" }
# aws-sso-lib = "^1.13.0"
aws-sso-lib = { path = "../lib", develop = true }
requests = "^2.26.0"

[tool.poetry.dev-dependencies]
Expand Down
86 changes: 43 additions & 43 deletions cli/src/aws_sso_util/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,64 +15,64 @@

from . import __version__

from .assignments import assignments
from .cfn import generate_template
from .check import check
from .configure_profile import configure_profile
from .credential_process import credential_process
from .deploy_macro import deploy_macro
# from .assignments import assignments
# from .cfn import generate_template
# from .check import check
# from .configure_profile import configure_profile
# from .credential_process import credential_process
# from .deploy_macro import deploy_macro
from .login import login
from .logout import logout
from .lookup import lookup
from .populate_profiles import populate_profiles
from .roles import roles
from .run_as import run_as
from .console import launch, get_config_token, launch_from_config
# from .logout import logout
# from .lookup import lookup
# from .populate_profiles import populate_profiles
# from .roles import roles
# from .run_as import run_as
# from .console import launch, get_config_token, launch_from_config

@click.group(name="aws-sso-util")
@click.version_option(version=__version__, message='%(version)s')
def cli():
pass

@cli.group()
def configure():
"""Commands to set up ~/.aws/config."""
pass
# @cli.group()
# def configure():
# """Commands to set up ~/.aws/config."""
# pass

configure.add_command(configure_profile, "profile")
configure.add_command(populate_profiles, "populate")
# configure.add_command(configure_profile, "profile")
# configure.add_command(populate_profiles, "populate")

cli.add_command(login)
cli.add_command(logout)
cli.add_command(roles)
cli.add_command(check)
cli.add_command(run_as)

@cli.group()
def console():
"""Commands for launching the AWS console in a browser."""
pass
# cli.add_command(logout)
# cli.add_command(roles)
# cli.add_command(check)
# cli.add_command(run_as)

console.add_command(launch)
console.add_command(get_config_token)
console.add_command(launch_from_config)
# @cli.group()
# def console():
# """Commands for launching the AWS console in a browser."""
# pass

@cli.group()
def admin():
"""Commands for IAM Identity Center administration."""
pass
# console.add_command(launch)
# console.add_command(get_config_token)
# console.add_command(launch_from_config)

# @cli.group()
# def admin():
# """Commands for IAM Identity Center administration."""
# pass

admin.add_command(lookup)
admin.add_command(assignments)
# admin.add_command(lookup)
# admin.add_command(assignments)

# admin.add_command(deploy_macro)
admin.add_command(generate_template, "cfn")
# # admin.add_command(deploy_macro)
# admin.add_command(generate_template, "cfn")

cli.add_command(credential_process)
# cli.add_command(credential_process)

_list_commands = cli.list_commands
def list_commands(ctx):
return [c for c in _list_commands(ctx) if c != "credential-process"]
# _list_commands = cli.list_commands
# def list_commands(ctx):
# return [c for c in _list_commands(ctx) if c != "credential-process"]

cli.list_commands = list_commands
# cli.list_commands = list_commands

195 changes: 148 additions & 47 deletions cli/src/aws_sso_util/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import os
import sys
import json
import logging

import botocore
Expand All @@ -23,7 +24,20 @@

import aws_error_utils

from aws_sso_lib.config import find_instances, find_all_instances, SSOInstance
from aws_sso_lib.config import (
Specifier,
Session,
Source,
get_specifier,
find_all_sessions,
get_session_from_config_profile,
get_session_from_config_session,
InlineSessionError,
ConfigProfileError,
ConfigSessionError,
NotAnSSOProfileError
)

from aws_sso_lib.sso import get_token_fetcher
from aws_sso_lib.exceptions import PendingAuthorizationExpiredError

Expand All @@ -40,28 +54,30 @@
LOCAL_TIME_FORMAT = "%Y-%m-%d %H:%M %Z"

@click.command()
@click.argument("sso_start_url", required=False)
@click.argument("specifier", required=False)
@click.argument("sso_region", required=False)
@click.option("--profile", metavar="PROFILE_NAME", help="Use a profile to specify Identity Center instance")
@click.option("--all", "login_all", is_flag=True, default=None, help="Log in to all Identity Center instances if multiple are found")
@click.option("--profile", metavar="PROFILE_NAME", help="Use a config profile to specify Identity Center session")
@click.option("--sso-session", metavar="SESSION_NAME", help="Use a config session to specify Identity Center session")
@click.option("--all", "login_all", is_flag=True, default=None, help="Scan for all Identity Center sessions and log in to them all")
@click.option("--force-refresh", "force", is_flag=True, help="Force re-authentication")
@click.option("--headless", is_flag=True, default=None, help="Never open a browser window")
@click.option("--verbose", "-v", count=True)
@click.option("--sso-start-url", "alternate_sso_start_url", hidden=True)
@click.option("--sso-region", "alternate_sso_region", hidden=True)
@click.option("--force", "alternate_force", is_flag=True, hidden=True)
def login(
sso_start_url,
specifier,
sso_region,
profile,
sso_session,
login_all,
force,
headless,
verbose,
alternate_sso_start_url,
alternate_sso_region,
alternate_force):
"""Log in to an Identity Center instance.
"""Log in to an Identity Center session.
Note this only needs to be done once for a given Identity Center instance (i.e., start URL),
as all profiles sharing the same start URL will share the same login.
Expand All @@ -73,66 +89,151 @@ def login(
Otherwise, you can provide a full start URL, or a regex for the start URL (usually a substring will work),
and if this uniquely identifies a start URL in your config, that will suffice.
You can also provide a profile name with --profile to use the Identity Center instance from a specific profile.
You can also provide a profile name with --profile to use the Identity Center session from a specific config profile,
or --sso-session for a specific config session.
"""
sso_start_url = sso_start_url or alternate_sso_start_url
specifier_param = specifier
sso_start_url = specifier_param or alternate_sso_start_url
sso_region = sso_region or alternate_sso_region

# mutually exclusive options
if sso_start_url and profile:
raise click.BadParameter("Cannot use specifier and --profile")

if sso_start_url and sso_session:
raise click.BadParameter("Cannot use specifier and --sso-session")

if profile and sso_session:
raise click.BadParameter("Cannot use --profile and --sso-session")

force = force or alternate_force

if login_all is None:
login_all = os.environ.get(LOGIN_ALL_VAR, "").lower() in ["true", "1"]

configure_logging(LOGGER, verbose)

if login_all:
instances, _ = find_all_instances(
start_url_vars=LOGIN_DEFAULT_START_URL_VARS,
region_vars=LOGIN_DEFAULT_SSO_REGION_VARS,
)
botocore_session = botocore.session.Session(session_vars={
"profile": (None, None, None, None),
"region": (None, None, None, None),
})

# first check if we've got specific config to get the session from
# either a config profile or a config session, resulting in a list
# of just a single session
if profile:
try:
sessions = [get_session_from_config_profile(
profile_name=profile,
source=Source(type="CLI parameter", name="--profile"),
botocore_session_full_config=botocore_session.full_config
)]
except NotAnSSOProfileError as e:
LOGGER.fatal(str(e))
sys.exit(5)
except ConfigProfileError as e:
LOGGER.fatal(str(e))
sys.exit(5)
elif sso_session:
try:
sessions = [get_session_from_config_session(
session_name=sso_session,
source=Source(type="CLI parameter", name="--sso-session"),
botocore_session_full_config=botocore_session.full_config
)]
except ConfigSessionError as e:
LOGGER.fatal(str(e))
sys.exit(5)
else:
instances, specifier, all_instances = find_instances(
profile_name=profile,
profile_source="--profile",
start_url=sso_start_url,
start_url_source="CLI input",
region=sso_region,
region_source="CLI input",
start_url_vars=LOGIN_DEFAULT_START_URL_VARS,
region_vars=LOGIN_DEFAULT_SSO_REGION_VARS,
)

if not instances:
if all_instances:
LOGGER.fatal((
f"No Identity Center config matched {specifier.to_str(region=True)} " +
f"from {SSOInstance.to_strs(all_instances)}"))
# otherwise we need to search

if login_all:
specifier = None
else:
# the specifier might come from params
if sso_start_url and sso_start_url.startswith("http") and sso_region:
if alternate_sso_start_url or alternate_sso_region:
source_name = "--sso-start-url and --sso-region"
else:
source_name = "positional parameters"
specifier = Specifier(
value=json.dumps({
"sso_start_url": sso_start_url,
"sso_region": sso_region
}),
source=Source(
type="CLI parameter",
name=source_name
))
elif specifier_param:
if alternate_sso_start_url:
source_name = "--sso-start-url"
else:
source_name = "positional parameter"
try:
specifier = Specifier(
value=specifier_param,
source=Source(
type="CLI parameter",
name=source_name
))
except InlineSessionError as e:
LOGGER.fatal(str(e))
sys.exit(5)
else:
LOGGER.fatal("No Identity Center config found")
sys.exit(1)
try:
specifier = get_specifier()
except InlineSessionError as e:
LOGGER.fatal(str(e))
sys.exit(5)

if len(instances) > 1:
LOGGER.fatal(f"Found {len(instances)} Identity Center configs, please specify one or use --all: {SSOInstance.to_strs(instances)}")
sys.exit(1)
if specifier.session:
sessions = [specifier.session]
else:
all_sessions = find_all_sessions()

LOGGER.debug(f"Instances: {SSOInstance.to_strs(instances)}")
if not all_sessions.unique_sessions:
message = "No valid Identity Center sessions found"
if all_sessions.malformed_session_errors:
message = (
message
+ ", "
+ f"but {len(all_sessions.malformed_session_errors)} invalid sessions were found"
+ ": "
+ "; ".join(str(e) for e in all_sessions.malformed_session_errors)
)
LOGGER.fatal(message)
sys.exit(1)

if not specifier:
sessions = list(all_sessions.unique_sessions.values())
else:
sessions = list(all_sessions.filter(specifier).values())
if not sessions:
LOGGER.fatal(f"No Identity Center sessions matched specifier {specifier} from {all_sessions}")
sys.exit(1)

session = botocore.session.Session(session_vars={
'profile': (None, None, None, None),
'region': (None, None, None, None),
})
if not login_all and len(sessions) > 1:
LOGGER.fatal(f"Found {len(sessions)} Identity Center sessions, please specify one or use --all: {sessions}")
sys.exit(1)

LOGGER.debug(f"Sessions: {sessions}")

regions = set(i.region for i in instances)
regions = set(s.region for s in sessions)
token_fetchers = {}
for region in regions:
token_fetchers[region] = get_token_fetcher(session, region, interactive=True, disable_browser=headless)
token_fetchers[region] = get_token_fetcher(botocore_session, region, interactive=True, disable_browser=headless)

if len(instances) > 1:
LOGGER.info(f"Logging in {len(instances)} Identity Center instances")
for instance in instances:
LOGGER.info(f"Logging in {instance.start_url}")
token_fetcher = token_fetchers[instance.region]
if len(sessions) > 1:
LOGGER.info(f"Logging in {len(sessions)} Identity Center sessions")
for session in sessions:
if session.is_inline_session():
LOGGER.info(f"Logging in {session.start_url}")
else:
LOGGER.info(f"Logging in {session.session_name} ({session.start_url})")
token_fetcher = token_fetchers[session.region]
try:
token = token_fetcher.fetch_token(instance.start_url, force_refresh=force)
token = token_fetcher.fetch_token(session.start_url, force_refresh=force)
LOGGER.debug(f"Token: {token}")
expiration = token['expiresAt']
if isinstance(expiration, str):
Expand Down
Loading

0 comments on commit 83aff0f

Please sign in to comment.