diff --git a/broker/broker.py b/broker/broker.py index 4a6706db..b8dd28ec 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -4,7 +4,7 @@ from broker.providers.test_provider import TestProvider from broker.hosts import Host from broker import exceptions, helpers -from concurrent.futures import ProcessPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor, as_completed PROVIDERS = { @@ -38,26 +38,11 @@ class mp_decorator: The decorated method is expected to return an itearable. """ - - # Note that this is a descriptor as the other option -- using nested function - # like this: - # - # def mp_decorator(func) - # @wraps(func) - # def wrapper(func) - # return - # - # return wrapper - # - # is not working with pickling that is necessary for the ProcessPoolExecutor of - # concurrent.futures. I got errors like: - # _pickle.PicklingError: Can't pickle ... it's not the same object as ... - MAX_WORKERS = None """ If set to integer, the count of workers will be limited to that amount. If set to None, the max workers count of the EXECUTOR will match the count of items.""" - EXECUTOR = ProcessPoolExecutor + EXECUTOR = ThreadPoolExecutor def __init__(self, func=None): self.func = func @@ -239,7 +224,7 @@ def checkin(self, sequential=False, host=None): logger.debug("Checkin called with no hosts, taking no action") return - with ProcessPoolExecutor(max_workers=1 if sequential else None) as workers: + with ThreadPoolExecutor(max_workers=1 if sequential else None) as workers: completed_checkins = as_completed( # reversing over a copy of the list to avoid skipping workers.submit(self._checkin, _host) @@ -291,7 +276,7 @@ def extend(self, sequential=False, host=None): logger.debug("Extend called with no hosts, taking no action") return - with ProcessPoolExecutor( + with ThreadPoolExecutor( max_workers=1 if sequential else len(hosts) ) as workers: completed_extends = as_completed( diff --git a/broker/helpers.py b/broker/helpers.py index 94ba79d5..7c0492e6 100644 --- a/broker/helpers.py +++ b/broker/helpers.py @@ -1,4 +1,5 @@ """Miscellaneous helpers live here""" +import collections from contextlib import contextmanager import getpass import inspect @@ -339,6 +340,17 @@ def __getitem__(self, key): def __call__(self, *args, **kwargs): return self + def __hash__(self): + return hash( + tuple( + ( + kp + for kp in self.__dict__.items() + if isinstance(kp[1], collections.abc.Hashable) + ) + ) + ) + def update_log_level(ctx, param, value): silent = False diff --git a/broker/providers/ansible_tower.py b/broker/providers/ansible_tower.py index 844a92a0..990889fc 100644 --- a/broker/providers/ansible_tower.py +++ b/broker/providers/ansible_tower.py @@ -3,7 +3,7 @@ import json import yaml from urllib import parse as url_parser -from functools import cached_property +from functools import cache, cached_property from dynaconf import Validator from broker import exceptions from broker.helpers import find_origin, results_filter @@ -22,6 +22,51 @@ from broker import helpers +@cache +def get_awxkit_and_uname( + config=None, root=None, url=None, token=None, uname=None, pword=None +): + """Return an awxkit api object and resolved username""" + # Prefer token if its set, otherwise use username/password + # auth paths for the API taken from: + # https://github.com/ansible/awx/blob/ddb6c5d0cce60779be279b702a15a2fddfcd0724/awxkit/awxkit/cli/client.py#L85-L94 + # unit test mock structure means the root API instance can't be loaded on the same line + config = config or awxkit.config + config.base_url = url + if root is None: + root = awxkit.api.Api() # support mock stub for unit tests + if token: + helpers.emit(auth_type="token") + logger.info("Using token authentication") + config.token = token + try: + root.connection.login( + username=None, password=None, token=token, auth_type="Bearer" + ) + except awxkit.exceptions.Unauthorized as err: + raise exceptions.AuthenticationError(err.args[0]) + versions = root.get().available_versions + try: + # lookup the user that authenticated with the token + # If a username was specified in config, use that instead + my_username = uname or versions.v2.get().me.get().results[0].username + except (IndexError, AttributeError): + # lookup failed for whatever reason + raise exceptions.ProviderError( + provider="AnsibleTower", + message="Failed to lookup a username for the given token, please check credentials", + ) + else: # dynaconf validators should have checked that either token or password was provided + helpers.emit(auth_type="password") + logger.info("Using username and password authentication") + config.credentials = {"default": {"username": uname, "password": pword}} + config.use_sessions = True + root.load_session().get() + versions = root.available_versions + my_username = uname + return versions.v2.get(), my_username + + class AnsibleTower(Provider): _validators = [ @@ -83,52 +128,18 @@ def __init__(self, **kwargs): kwargs.get("tower_inventory") or settings.ANSIBLETOWER.inventory ) # Init the class itself - config = kwargs.get("config", awxkit.config) - config.base_url = self.url + config = kwargs.get("config") root = kwargs.get("root") - if root is None: - root = awxkit.api.Api() # support mock stub for unit tests - # Prefer token if its set, otherwise use username/password - # auth paths for the API taken from: - # https://github.com/ansible/awx/blob/ddb6c5d0cce60779be279b702a15a2fddfcd0724/awxkit/awxkit/cli/client.py#L85-L94 - # unit test mock structure means the root API instance can't be loaded on the same line - if self.token: - helpers.emit(auth_type="token") - logger.info("Using token authentication") - config.token = self.token - try: - root.connection.login( - username=None, password=None, token=self.token, auth_type="Bearer" - ) - except awxkit.exceptions.Unauthorized as err: - raise exceptions.AuthenticationError(err.args[0]) - versions = root.get().available_versions - try: - # lookup the user that authenticated with the token - # If a username was specified in config, use that instead - my_username = ( - self.uname or versions.v2.get().me.get().results[0].username - ) - except (IndexError, AttributeError): - # lookup failed for whatever reason - raise exceptions.ProviderError( - provider="AnsibleTower", - message="Failed to lookup a username for the given token, please check credentials", - ) - else: # dynaconf validators should have checked that either token or password was provided - helpers.emit(auth_type="password") - logger.info("Using username and password authentication") - config.credentials = { - "default": {"username": self.uname, "password": self.pword} - } - config.use_sessions = True - root.load_session().get() - versions = root.available_versions - my_username = self.uname - self.v2 = versions.v2.get() + self.v2, self.username = get_awxkit_and_uname( + config=config, + root=root, + url=self.url, + token=self.token, + uname=self.uname, + pword=self.pword, + ) # Check to see if we're running AAP (ver 4.0+) self._is_aap = False if self.v2.ping.get().version[0] == "3" else True - self.username = my_username @staticmethod def _pull_params(kwargs): @@ -542,6 +553,7 @@ def nick_help(self, **kwargs): ] if res_filter := kwargs.get("results_filter"): workflows = results_filter(workflows, res_filter) + workflows = workflows if isinstance(workflows, list) else [workflows] workflows = "\n".join(workflows[:results_limit]) logger.info(f"Available workflows:\n{workflows}") elif inventory := kwargs.get("inventory"): @@ -555,6 +567,7 @@ def nick_help(self, **kwargs): ] if res_filter := kwargs.get("results_filter"): inv = results_filter(inv, res_filter) + inv = inv if isinstance(inv, list) else [inv] inv = "\n".join(inv[:results_limit]) logger.info(f"Available Inventories:\n{inv}") elif job_template := kwargs.get("job_template"): @@ -570,6 +583,7 @@ def nick_help(self, **kwargs): ] if res_filter := kwargs.get("results_filter"): job_templates = results_filter(job_templates, res_filter) + job_templates = job_templates if isinstance(job_templates, list) else [job_templates] job_templates = "\n".join(job_templates[:results_limit]) logger.info(f"Available job templates:\n{job_templates}") elif kwargs.get("templates"): @@ -584,6 +598,7 @@ def nick_help(self, **kwargs): templates.sort(reverse=True) if res_filter := kwargs.get("results_filter"): templates = results_filter(templates, res_filter) + templates = templates if isinstance(templates, list) else [templates] templates = "\n".join(templates[:results_limit]) logger.info(f"Available templates:\n{templates}") else: diff --git a/broker/providers/container.py b/broker/providers/container.py index 8f98784f..56364f06 100644 --- a/broker/providers/container.py +++ b/broker/providers/container.py @@ -1,3 +1,4 @@ +from functools import cache import getpass import inspect from uuid import uuid4 @@ -32,6 +33,19 @@ def _host_release(): caller_host._checked_in = True +@cache +def get_runtime( + runtime_cls=None, host=None, username=None, password=None, port=None, timeout=None +): + return runtime_cls( + host=host, + username=username, + password=password, + port=port, + timeout=timeout, + ) + + class Container(Provider): _validators = [ Validator("CONTAINER.runtime", default="podman"), @@ -73,7 +87,8 @@ def __init__(self, **kwargs): "Container", f"Broker has no bind for {settings.container.runtime} containers", ) - self.runtime = self._runtime_cls( + self.runtime = get_runtime( + runtime_cls=self._runtime_cls, host=settings.container.host, username=settings.container.host_username, password=settings.container.host_password, @@ -82,7 +97,6 @@ def __init__(self, **kwargs): ) self._name_prefix = settings.container.get("name_prefix", getpass.getuser()) - def _post_pickle(self, purified): self._validate_settings() self.runtime = self._runtime_cls( @@ -220,12 +234,14 @@ def nick_help(self, **kwargs): ] if res_filter := kwargs.get("results_filter"): images = helpers.results_filter(images, res_filter) + images = images if isinstance(images, list) else [images] images = "\n".join(images[:results_limit]) logger.info(f"Available host images:\n{images}") elif kwargs.get("container_apps"): images = [img.tags[0] for img in self.runtime.images if img.tags] if res_filter := kwargs.get("results_filter"): images = helpers.results_filter(images, res_filter) + images = images if isinstance(images, list) else [images] images = "\n".join(images[:results_limit]) logger.info(f"Available app images:\n{images}") diff --git a/broker/session.py b/broker/session.py index 7edb697d..1e55d104 100644 --- a/broker/session.py +++ b/broker/session.py @@ -232,12 +232,15 @@ def sftp_write(self, source, destination=None): for src in source: if not Path(src).exists(): raise FileNotFoundError(src) - destination = destination or source[0].parent + destination = Path(destination) or source[0].parent # Files need to be added to a tarfile with helpers.temporary_tar(source) as tar: logger.debug( f"{self._cont_inst.hostname} adding file(s) {source} to {destination}" ) + # if the destination is a file, create the parent path + if destination.is_file(): + self.execute(f"mkdir -p {destination.parent}") self._cont_inst._cont_inst.put_archive(str(destination), tar.read_bytes()) def sftp_read(self, source, destination=None):