Skip to content

Commit

Permalink
Add optimizations for concurrency
Browse files Browse the repository at this point in the history
Switch ProcessPoolExecutor to ThreadPoolExecutor since these tasks are IO bound.
Additionally, experiment with session and container runtime caching.
  • Loading branch information
JacobCallahan committed Dec 8, 2022
1 parent 442d894 commit 0321000
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 66 deletions.
23 changes: 4 additions & 19 deletions broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from broker.providers.test_provider import TestProvider
from broker.hosts import Host
from broker import exceptions, helpers
from concurrent.futures import ProcessPoolExecutor, as_completed
from concurrent.futures import ThreadPoolExecutor, as_completed


PROVIDERS = {
Expand Down Expand Up @@ -38,26 +38,11 @@ class mp_decorator:
The decorated method is expected to return an itearable.
"""

# Note that this is a descriptor as the other option -- using nested function
# like this:
#
# def mp_decorator(func)
# @wraps(func)
# def wrapper(func)
# return
#
# return wrapper
#
# is not working with pickling that is necessary for the ProcessPoolExecutor of
# concurrent.futures. I got errors like:
# _pickle.PicklingError: Can't pickle ... it's not the same object as ...

MAX_WORKERS = None
""" If set to integer, the count of workers will be limited to that amount.
If set to None, the max workers count of the EXECUTOR will match the count of items."""

EXECUTOR = ProcessPoolExecutor
EXECUTOR = ThreadPoolExecutor

def __init__(self, func=None):
self.func = func
Expand Down Expand Up @@ -239,7 +224,7 @@ def checkin(self, sequential=False, host=None):
logger.debug("Checkin called with no hosts, taking no action")
return

with ProcessPoolExecutor(max_workers=1 if sequential else None) as workers:
with ThreadPoolExecutor(max_workers=1 if sequential else None) as workers:
completed_checkins = as_completed(
# reversing over a copy of the list to avoid skipping
workers.submit(self._checkin, _host)
Expand Down Expand Up @@ -291,7 +276,7 @@ def extend(self, sequential=False, host=None):
logger.debug("Extend called with no hosts, taking no action")
return

with ProcessPoolExecutor(
with ThreadPoolExecutor(
max_workers=1 if sequential else len(hosts)
) as workers:
completed_extends = as_completed(
Expand Down
12 changes: 12 additions & 0 deletions broker/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Miscellaneous helpers live here"""
import collections
from contextlib import contextmanager
import getpass
import inspect
Expand Down Expand Up @@ -339,6 +340,17 @@ def __getitem__(self, key):
def __call__(self, *args, **kwargs):
return self

def __hash__(self):
return hash(
tuple(
(
kp
for kp in self.__dict__.items()
if isinstance(kp[1], collections.abc.Hashable)
)
)
)


def update_log_level(ctx, param, value):
silent = False
Expand Down
103 changes: 59 additions & 44 deletions broker/providers/ansible_tower.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import yaml
from urllib import parse as url_parser
from functools import cached_property
from functools import cache, cached_property
from dynaconf import Validator
from broker import exceptions
from broker.helpers import find_origin, results_filter
Expand All @@ -22,6 +22,51 @@
from broker import helpers


@cache
def get_awxkit_and_uname(
config=None, root=None, url=None, token=None, uname=None, pword=None
):
"""Return an awxkit api object and resolved username"""
# Prefer token if its set, otherwise use username/password
# auth paths for the API taken from:
# https://github.com/ansible/awx/blob/ddb6c5d0cce60779be279b702a15a2fddfcd0724/awxkit/awxkit/cli/client.py#L85-L94
# unit test mock structure means the root API instance can't be loaded on the same line
config = config or awxkit.config
config.base_url = url
if root is None:
root = awxkit.api.Api() # support mock stub for unit tests
if token:
helpers.emit(auth_type="token")
logger.info("Using token authentication")
config.token = token
try:
root.connection.login(
username=None, password=None, token=token, auth_type="Bearer"
)
except awxkit.exceptions.Unauthorized as err:
raise exceptions.AuthenticationError(err.args[0])
versions = root.get().available_versions
try:
# lookup the user that authenticated with the token
# If a username was specified in config, use that instead
my_username = uname or versions.v2.get().me.get().results[0].username
except (IndexError, AttributeError):
# lookup failed for whatever reason
raise exceptions.ProviderError(
provider="AnsibleTower",
message="Failed to lookup a username for the given token, please check credentials",
)
else: # dynaconf validators should have checked that either token or password was provided
helpers.emit(auth_type="password")
logger.info("Using username and password authentication")
config.credentials = {"default": {"username": uname, "password": pword}}
config.use_sessions = True
root.load_session().get()
versions = root.available_versions
my_username = uname
return versions.v2.get(), my_username


class AnsibleTower(Provider):

_validators = [
Expand Down Expand Up @@ -83,52 +128,18 @@ def __init__(self, **kwargs):
kwargs.get("tower_inventory") or settings.ANSIBLETOWER.inventory
)
# Init the class itself
config = kwargs.get("config", awxkit.config)
config.base_url = self.url
config = kwargs.get("config")
root = kwargs.get("root")
if root is None:
root = awxkit.api.Api() # support mock stub for unit tests
# Prefer token if its set, otherwise use username/password
# auth paths for the API taken from:
# https://github.com/ansible/awx/blob/ddb6c5d0cce60779be279b702a15a2fddfcd0724/awxkit/awxkit/cli/client.py#L85-L94
# unit test mock structure means the root API instance can't be loaded on the same line
if self.token:
helpers.emit(auth_type="token")
logger.info("Using token authentication")
config.token = self.token
try:
root.connection.login(
username=None, password=None, token=self.token, auth_type="Bearer"
)
except awxkit.exceptions.Unauthorized as err:
raise exceptions.AuthenticationError(err.args[0])
versions = root.get().available_versions
try:
# lookup the user that authenticated with the token
# If a username was specified in config, use that instead
my_username = (
self.uname or versions.v2.get().me.get().results[0].username
)
except (IndexError, AttributeError):
# lookup failed for whatever reason
raise exceptions.ProviderError(
provider="AnsibleTower",
message="Failed to lookup a username for the given token, please check credentials",
)
else: # dynaconf validators should have checked that either token or password was provided
helpers.emit(auth_type="password")
logger.info("Using username and password authentication")
config.credentials = {
"default": {"username": self.uname, "password": self.pword}
}
config.use_sessions = True
root.load_session().get()
versions = root.available_versions
my_username = self.uname
self.v2 = versions.v2.get()
self.v2, self.username = get_awxkit_and_uname(
config=config,
root=root,
url=self.url,
token=self.token,
uname=self.uname,
pword=self.pword,
)
# Check to see if we're running AAP (ver 4.0+)
self._is_aap = False if self.v2.ping.get().version[0] == "3" else True
self.username = my_username

@staticmethod
def _pull_params(kwargs):
Expand Down Expand Up @@ -542,6 +553,7 @@ def nick_help(self, **kwargs):
]
if res_filter := kwargs.get("results_filter"):
workflows = results_filter(workflows, res_filter)
workflows = workflows if isinstance(workflows, list) else [workflows]
workflows = "\n".join(workflows[:results_limit])
logger.info(f"Available workflows:\n{workflows}")
elif inventory := kwargs.get("inventory"):
Expand All @@ -555,6 +567,7 @@ def nick_help(self, **kwargs):
]
if res_filter := kwargs.get("results_filter"):
inv = results_filter(inv, res_filter)
inv = inv if isinstance(inv, list) else [inv]
inv = "\n".join(inv[:results_limit])
logger.info(f"Available Inventories:\n{inv}")
elif job_template := kwargs.get("job_template"):
Expand All @@ -570,6 +583,7 @@ def nick_help(self, **kwargs):
]
if res_filter := kwargs.get("results_filter"):
job_templates = results_filter(job_templates, res_filter)
job_templates = job_templates if isinstance(job_templates, list) else [job_templates]
job_templates = "\n".join(job_templates[:results_limit])
logger.info(f"Available job templates:\n{job_templates}")
elif kwargs.get("templates"):
Expand All @@ -584,6 +598,7 @@ def nick_help(self, **kwargs):
templates.sort(reverse=True)
if res_filter := kwargs.get("results_filter"):
templates = results_filter(templates, res_filter)
templates = templates if isinstance(templates, list) else [templates]
templates = "\n".join(templates[:results_limit])
logger.info(f"Available templates:\n{templates}")
else:
Expand Down
20 changes: 18 additions & 2 deletions broker/providers/container.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import cache
import getpass
import inspect
from uuid import uuid4
Expand Down Expand Up @@ -32,6 +33,19 @@ def _host_release():
caller_host._checked_in = True


@cache
def get_runtime(
runtime_cls=None, host=None, username=None, password=None, port=None, timeout=None
):
return runtime_cls(
host=host,
username=username,
password=password,
port=port,
timeout=timeout,
)


class Container(Provider):
_validators = [
Validator("CONTAINER.runtime", default="podman"),
Expand Down Expand Up @@ -73,7 +87,8 @@ def __init__(self, **kwargs):
"Container",
f"Broker has no bind for {settings.container.runtime} containers",
)
self.runtime = self._runtime_cls(
self.runtime = get_runtime(
runtime_cls=self._runtime_cls,
host=settings.container.host,
username=settings.container.host_username,
password=settings.container.host_password,
Expand All @@ -82,7 +97,6 @@ def __init__(self, **kwargs):
)
self._name_prefix = settings.container.get("name_prefix", getpass.getuser())


def _post_pickle(self, purified):
self._validate_settings()
self.runtime = self._runtime_cls(
Expand Down Expand Up @@ -220,12 +234,14 @@ def nick_help(self, **kwargs):
]
if res_filter := kwargs.get("results_filter"):
images = helpers.results_filter(images, res_filter)
images = images if isinstance(images, list) else [images]
images = "\n".join(images[:results_limit])
logger.info(f"Available host images:\n{images}")
elif kwargs.get("container_apps"):
images = [img.tags[0] for img in self.runtime.images if img.tags]
if res_filter := kwargs.get("results_filter"):
images = helpers.results_filter(images, res_filter)
images = images if isinstance(images, list) else [images]
images = "\n".join(images[:results_limit])
logger.info(f"Available app images:\n{images}")

Expand Down
5 changes: 4 additions & 1 deletion broker/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,15 @@ def sftp_write(self, source, destination=None):
for src in source:
if not Path(src).exists():
raise FileNotFoundError(src)
destination = destination or source[0].parent
destination = Path(destination) or source[0].parent
# Files need to be added to a tarfile
with helpers.temporary_tar(source) as tar:
logger.debug(
f"{self._cont_inst.hostname} adding file(s) {source} to {destination}"
)
# if the destination is a file, create the parent path
if destination.is_file():
self.execute(f"mkdir -p {destination.parent}")
self._cont_inst._cont_inst.put_archive(str(destination), tar.read_bytes())

def sftp_read(self, source, destination=None):
Expand Down

0 comments on commit 0321000

Please sign in to comment.