From c59bbdecdbdd920c5d3d298d691129c6bbc94c5e Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 8 Nov 2021 16:57:12 -0500 Subject: [PATCH 1/2] Refactor canceling to work through messaging and signals, not database If canceled attempted before, still allow attempting another cancel in this case, attempt to send the sigterm signal again. Keep clicking, you might help! Replace other cancel_callbacks with sigterm watcher adapt special inventory mechanism for this too Get rid of the cancel_watcher method with exception in main thread Handle academic case of sigterm race condition Process cancelation as control signal Fully connect cancel method and run_dispatcher to control Never transition workflows directly to canceled, add logs --- awx/main/dispatch/control.py | 10 ++- awx/main/dispatch/worker/base.py | 13 +++- .../management/commands/run_dispatcher.py | 22 +++++- awx/main/models/unified_jobs.py | 69 ++++++++++++------- awx/main/models/workflow.py | 5 +- awx/main/tasks/callback.py | 27 +------- awx/main/tasks/jobs.py | 5 +- awx/main/tasks/receptor.py | 48 +++++++------ awx/main/tasks/signals.py | 8 +++ awx/main/tests/functional/test_dispatch.py | 2 +- .../unit/models/test_unified_job_unit.py | 13 +++- awx/main/utils/handlers.py | 2 +- docs/ansible_runner_integration.md | 2 +- 13 files changed, 136 insertions(+), 90 deletions(-) diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index cb7fabda9429..e6647170dae9 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -37,18 +37,24 @@ def status(self, *args, **kwargs): def running(self, *args, **kwargs): return self.control_with_reply('running', *args, **kwargs) + def cancel(self, task_ids, *args, **kwargs): + return self.control_with_reply('cancel', *args, extra_data={'task_ids': task_ids}, **kwargs) + @classmethod def generate_reply_queue_name(cls): return f"reply_to_{str(uuid.uuid4()).replace('-','_')}" - def control_with_reply(self, command, timeout=5): + def control_with_reply(self, command, timeout=5, extra_data=None): logger.warning('checking {} {} for {}'.format(self.service, command, self.queuename)) reply_queue = Control.generate_reply_queue_name() self.result = None with pg_bus_conn(new_connection=True) as conn: conn.listen(reply_queue) - conn.notify(self.queuename, json.dumps({'control': command, 'reply_to': reply_queue})) + send_data = {'control': command, 'reply_to': reply_queue} + if extra_data: + send_data.update(extra_data) + conn.notify(self.queuename, json.dumps(send_data)) for reply in conn.events(select_timeout=timeout, yield_timeouts=True): if reply is None: diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index b30f7ec17a81..ff05d9e8461f 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -63,7 +63,7 @@ def listening_on(self): def control(self, body): logger.warning(f'Received control signal:\n{body}') control = body.get('control') - if control in ('status', 'running'): + if control in ('status', 'running', 'cancel'): reply_queue = body['reply_to'] if control == 'status': msg = '\n'.join([self.listening_on, self.pool.debug()]) @@ -72,6 +72,17 @@ def control(self, body): for worker in self.pool.workers: worker.calculate_managed_tasks() msg.extend(worker.managed_tasks.keys()) + elif control == 'cancel': + msg = [] + task_ids = set(body['task_ids']) + for worker in self.pool.workers: + task = worker.current_task + if task and task['uuid'] in task_ids: + logger.warn(f'Sending SIGTERM to task id={task["uuid"]}, task={task.get("task")}, args={task.get("args")}') + os.kill(worker.pid, signal.SIGTERM) + msg.append(task['uuid']) + if task_ids and not msg: + logger.info(f'Could not locate running tasks to cancel with ids={task_ids}') with pg_bus_conn() as conn: conn.notify(reply_queue, json.dumps(msg)) diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index 2fc35a75d280..74a8ac3f942a 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -1,6 +1,7 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. import logging +import yaml from django.conf import settings from django.core.cache import cache as django_cache @@ -30,7 +31,16 @@ def add_arguments(self, parser): '--reload', dest='reload', action='store_true', - help=('cause the dispatcher to recycle all of its worker processes;' 'running jobs will run to completion first'), + help=('cause the dispatcher to recycle all of its worker processes; running jobs will run to completion first'), + ) + parser.add_argument( + '--cancel', + dest='cancel', + help=( + 'Cancel a particular task id. Takes either a single id string, or a JSON list of multiple ids. ' + 'Can take in output from the --running argument as input to cancel all tasks. ' + 'Only running tasks can be canceled, queued tasks must be started before they can be canceled.' + ), ) def handle(self, *arg, **options): @@ -42,6 +52,16 @@ def handle(self, *arg, **options): return if options.get('reload'): return Control('dispatcher').control({'control': 'reload'}) + if options.get('cancel'): + cancel_str = options.get('cancel') + try: + cancel_data = yaml.safe_load(cancel_str) + except Exception: + cancel_data = [cancel_str] + if not isinstance(cancel_data, list): + cancel_data = [cancel_str] + print(Control('dispatcher').cancel(cancel_data)) + return # It's important to close these because we're _about_ to fork, and we # don't want the forked processes to inherit the open sockets diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index b49bcf02422b..5ef8fed0f710 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1395,23 +1395,6 @@ def signal_start(self, **kwargs): # Done! return True - @property - def actually_running(self): - # returns True if the job is running in the appropriate dispatcher process - running = False - if all([self.status == 'running', self.celery_task_id, self.execution_node]): - # If the job is marked as running, but the dispatcher - # doesn't know about it (or the dispatcher doesn't reply), - # then cancel the job - timeout = 5 - try: - running = self.celery_task_id in ControlDispatcher('dispatcher', self.controller_node or self.execution_node).running(timeout=timeout) - except socket.timeout: - logger.error('could not reach dispatcher on {} within {}s'.format(self.execution_node, timeout)) - except Exception: - logger.exception("error encountered when checking task status") - return running - @property def can_cancel(self): return bool(self.status in CAN_CANCEL) @@ -1421,27 +1404,61 @@ def _build_job_explanation(self): return 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (self.model_to_str(), self.name, self.id) return None + def fallback_cancel(self): + if not self.celery_task_id: + self.refresh_from_db(fields=['celery_task_id']) + self.cancel_dispatcher_process() + + def cancel_dispatcher_process(self): + """Returns True if dispatcher running this job acknowledged request and sent SIGTERM""" + if not self.celery_task_id: + return + canceled = [] + try: + # Use control and reply mechanism to cancel and obtain confirmation + timeout = 5 + canceled = ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id]) + except socket.timeout: + logger.error(f'could not reach dispatcher on {self.controller_node} within {timeout}s') + except Exception: + logger.exception("error encountered when checking task status") + return bool(self.celery_task_id in canceled) # True or False, whether confirmation was obtained + def cancel(self, job_explanation=None, is_chain=False): if self.can_cancel: if not is_chain: for x in self.get_jobs_fail_chain(): x.cancel(job_explanation=self._build_job_explanation(), is_chain=True) + cancel_fields = [] if not self.cancel_flag: self.cancel_flag = True self.start_args = '' # blank field to remove encrypted passwords - cancel_fields = ['cancel_flag', 'start_args'] - if self.status in ('pending', 'waiting', 'new'): - self.status = 'canceled' - cancel_fields.append('status') - if self.status == 'running' and not self.actually_running: - self.status = 'canceled' - cancel_fields.append('status') + cancel_fields.extend(['cancel_flag', 'start_args']) + connection.on_commit(lambda: self.websocket_emit_status("canceled")) + if job_explanation is not None: self.job_explanation = job_explanation cancel_fields.append('job_explanation') - self.save(update_fields=cancel_fields) - self.websocket_emit_status("canceled") + + controller_notified = False + if self.celery_task_id: + controller_notified = self.cancel_dispatcher_process() + + else: + # Avoid race condition where we have stale model from pending state but job has already started, + # its checking signal but not cancel_flag, so re-send signal after this database commit + connection.on_commit(self.fallback_cancel) + + # If a SIGTERM signal was sent to the control process, and acked by the dispatcher + # then we want to let its own cleanup change status, otherwise change status now + if not controller_notified: + if self.status != 'canceled': + self.status = 'canceled' + cancel_fields.append('status') + + self.save(update_fields=cancel_fields) + return self.cancel_flag @property diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index c9301f769ada..4f52ade6b436 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -723,11 +723,10 @@ def get_notification_friendly_name(self): def preferred_instance_groups(self): return [] - @property - def actually_running(self): + def cancel_dispatcher_process(self): # WorkflowJobs don't _actually_ run anything in the dispatcher, so # there's no point in asking the dispatcher if it knows about this task - return self.status == 'running' + return True class WorkflowApprovalTemplate(UnifiedJobTemplate, RelatedJobsMixin): diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py index a4a02421a064..e131f368a5b3 100644 --- a/awx/main/tasks/callback.py +++ b/awx/main/tasks/callback.py @@ -6,17 +6,16 @@ import stat # Django -from django.utils.timezone import now from django.conf import settings from django_guid import get_guid from django.utils.functional import cached_property +from django.db import connections # AWX from awx.main.redact import UriCleaner from awx.main.constants import MINIMAL_EVENTS, ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE from awx.main.utils.update_model import update_model from awx.main.queue import CallbackQueueDispatcher -from awx.main.tasks.signals import signal_callback logger = logging.getLogger('awx.main.tasks.callback') @@ -175,28 +174,6 @@ def event_handler(self, event_data): return False - def cancel_callback(self): - """ - Ansible runner callback to tell the job when/if it is canceled - """ - unified_job_id = self.instance.pk - if signal_callback(): - return True - try: - self.instance = self.update_model(unified_job_id) - except Exception: - logger.exception(f'Encountered error during cancel check for {unified_job_id}, canceling now') - return True - if not self.instance: - logger.error('unified job {} was deleted while running, canceling'.format(unified_job_id)) - return True - if self.instance.cancel_flag or self.instance.status == 'canceled': - cancel_wait = (now() - self.instance.modified).seconds if self.instance.modified else 0 - if cancel_wait > 5: - logger.warning('Request to cancel {} took {} seconds to complete.'.format(self.instance.log_format, cancel_wait)) - return True - return False - def finished_callback(self, runner_obj): """ Ansible runner callback triggered on finished run @@ -227,6 +204,8 @@ def status_handler(self, status_data, runner_config): with disable_activity_stream(): self.instance = self.update_model(self.instance.pk, job_args=json.dumps(runner_config.command), job_cwd=runner_config.cwd, job_env=job_env) + # We opened a connection just for that save, close it here now + connections.close_all() elif status_data['status'] == 'failed': # For encrypted ssh_key_data, ansible-runner worker will open and write the # ssh_key_data to a named pipe. Then, once the podman container starts, ssh-agent will diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 0dd4e486aa36..8d2d2470b61e 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -487,6 +487,7 @@ def run(self, pk, **kwargs): self.instance.log_lifecycle("preparing_playbook") if self.instance.cancel_flag or signal_callback(): self.instance = self.update_model(self.instance.pk, status='canceled') + if self.instance.status != 'running': # Stop the task chain and prevent starting the job if it has # already been canceled. @@ -589,7 +590,7 @@ def run(self, pk, **kwargs): event_handler=self.runner_callback.event_handler, finished_callback=self.runner_callback.finished_callback, status_handler=self.runner_callback.status_handler, - cancel_callback=self.runner_callback.cancel_callback, + cancel_callback=signal_callback, **params, ) else: @@ -1622,7 +1623,7 @@ def post_run_hook(self, inventory_update, status): handler = SpecialInventoryHandler( self.runner_callback.event_handler, - self.runner_callback.cancel_callback, + signal_callback, verbosity=inventory_update.verbosity, job_timeout=self.get_instance_timeout(self.instance), start_time=inventory_update.started, diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 54ff67f6cb15..0350a96836be 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -12,6 +12,7 @@ # Django from django.conf import settings +from django.db import connections # Runner import ansible_runner @@ -25,6 +26,7 @@ cleanup_new_process, ) from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER +from awx.main.tasks.signals import signal_state, signal_callback, SignalExit # Receptorctl from receptorctl.socket_interface import ReceptorControl @@ -335,24 +337,32 @@ def _run_internal(self, receptor_ctl): shutil.rmtree(artifact_dir) resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id, return_socket=True, return_sockfile=True) - # Both "processor" and "cancel_watcher" are spawned in separate threads. - # We wait for the first one to return. If cancel_watcher returns first, - # we yank the socket out from underneath the processor, which will cause it - # to exit. A reference to the processor_future is passed into the cancel_watcher_future, - # Which exits if the job has finished normally. The context manager ensures we do not - # leave any threads laying around. - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + + connections.close_all() + + # "processor" and the main thread will be separate threads. + # If a cancel happens, the main thread will encounter an exception, in which case + # we yank the socket out from underneath the processor, which will cause it to exit. + # The ThreadPoolExecutor context manager ensures we do not leave any threads laying around. + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: processor_future = executor.submit(self.processor, resultfile) - cancel_watcher_future = executor.submit(self.cancel_watcher, processor_future) - futures = [processor_future, cancel_watcher_future] - first_future = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) - res = list(first_future.done)[0].result() - if res.status == 'canceled': + try: + signal_state.raise_exception = True + # address race condition where SIGTERM was issued after this dispatcher task started + if signal_callback(): + raise SignalExit() + res = processor_future.result() + except SignalExit: receptor_ctl.simple_command(f"work cancel {self.unit_id}") resultsock.shutdown(socket.SHUT_RDWR) resultfile.close() - elif res.status == 'error': + result = namedtuple('result', ['status', 'rc']) + res = result('canceled', 1) + finally: + signal_state.raise_exception = False + + if res.status == 'error': # If ansible-runner ran, but an error occured at runtime, the traceback information # is saved via the status_handler passed in to the processor. if 'result_traceback' in self.task.runner_callback.extra_update_fields: @@ -446,18 +456,6 @@ def work_type(self): return 'local' return 'ansible-runner' - @cleanup_new_process - def cancel_watcher(self, processor_future): - while True: - if processor_future.done(): - return processor_future.result() - - if self.task.runner_callback.cancel_callback(): - result = namedtuple('result', ['status', 'rc']) - return result('canceled', 1) - - time.sleep(1) - @property def pod_definition(self): ee = self.task.instance.execution_environment diff --git a/awx/main/tasks/signals.py b/awx/main/tasks/signals.py index 6f0c69ca4cac..95610548b991 100644 --- a/awx/main/tasks/signals.py +++ b/awx/main/tasks/signals.py @@ -9,12 +9,17 @@ __all__ = ['with_signal_handling', 'signal_callback'] +class SignalExit(Exception): + pass + + class SignalState: def reset(self): self.sigterm_flag = False self.is_active = False self.original_sigterm = None self.original_sigint = None + self.raise_exception = False def __init__(self): self.reset() @@ -22,6 +27,9 @@ def __init__(self): def set_flag(self, *args): """Method to pass into the python signal.signal method to receive signals""" self.sigterm_flag = True + if self.raise_exception: + self.raise_exception = False # so it is not raised a second time in error handling + raise SignalExit() def connect_signals(self): self.original_sigterm = signal.getsignal(signal.SIGTERM) diff --git a/awx/main/tests/functional/test_dispatch.py b/awx/main/tests/functional/test_dispatch.py index f3c9afe58b78..86e90e50a0f4 100644 --- a/awx/main/tests/functional/test_dispatch.py +++ b/awx/main/tests/functional/test_dispatch.py @@ -244,7 +244,7 @@ def test_lost_worker_autoscale(self): assert not self.pool.should_grow alive_pid = self.pool.workers[1].pid self.pool.workers[0].process.terminate() - time.sleep(1) # wait a moment for sigterm + time.sleep(2) # wait a moment for sigterm # clean up and the dead worker self.pool.cleanup() diff --git a/awx/main/tests/unit/models/test_unified_job_unit.py b/awx/main/tests/unit/models/test_unified_job_unit.py index 592c457b0c5b..b0567500fc4e 100644 --- a/awx/main/tests/unit/models/test_unified_job_unit.py +++ b/awx/main/tests/unit/models/test_unified_job_unit.py @@ -22,6 +22,10 @@ def test_unified_job_workflow_attributes(): assert job.workflow_job_id == 1 +def mock_on_commit(f): + f() + + @pytest.fixture def unified_job(mocker): mocker.patch.object(UnifiedJob, 'can_cancel', return_value=True) @@ -30,12 +34,14 @@ def unified_job(mocker): j.cancel_flag = None j.save = mocker.MagicMock() j.websocket_emit_status = mocker.MagicMock() + j.fallback_cancel = mocker.MagicMock() return j def test_cancel(unified_job): - unified_job.cancel() + with mock.patch('awx.main.models.unified_jobs.connection.on_commit', wraps=mock_on_commit): + unified_job.cancel() assert unified_job.cancel_flag is True assert unified_job.status == 'canceled' @@ -50,10 +56,11 @@ def test_cancel(unified_job): def test_cancel_job_explanation(unified_job): job_explanation = 'giggity giggity' - unified_job.cancel(job_explanation=job_explanation) + with mock.patch('awx.main.models.unified_jobs.connection.on_commit'): + unified_job.cancel(job_explanation=job_explanation) assert unified_job.job_explanation == job_explanation - unified_job.save.assert_called_with(update_fields=['cancel_flag', 'start_args', 'status', 'job_explanation']) + unified_job.save.assert_called_with(update_fields=['cancel_flag', 'start_args', 'job_explanation', 'status']) def test_organization_copy_to_jobs(): diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index c6a2b3b5964a..636de46cdfa2 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -76,7 +76,7 @@ def __init__(self, event_handler, cancel_callback, job_timeout, verbosity, start def emit(self, record): # check cancel and timeout status regardless of log level this_time = now() - if (this_time - self.last_check).total_seconds() > 0.5: # cancel callback is expensive + if (this_time - self.last_check).total_seconds() > 0.1: self.last_check = this_time if self.cancel_callback(): raise PostRunError('Inventory update has been canceled', status='canceled') diff --git a/docs/ansible_runner_integration.md b/docs/ansible_runner_integration.md index 5475c6877df2..e7ef0df8877d 100644 --- a/docs/ansible_runner_integration.md +++ b/docs/ansible_runner_integration.md @@ -8,7 +8,7 @@ In AWX, a task of a certain job type is kicked off (_i.e._, RunJob, RunProjectUp The callbacks and handlers are: * `event_handler`: Called each time a new event is created in `ansible-runner`. AWX will dispatch the event to `redis` to be processed on the other end by the callback receiver. -* `cancel_callback`: Called periodically by `ansible-runner`; this is so that AWX can inform `ansible-runner` if the job should be canceled or not. +* `cancel_callback`: Called periodically by `ansible-runner`; this is so that AWX can inform `ansible-runner` if the job should be canceled or not. Only applies for system jobs now, and other jobs are canceled via receptor. * `finished_callback`: Called once by `ansible-runner` to denote that the process that was asked to run is finished. AWX will construct the special control event, `EOF`, with the associated total number of events that it observed. * `status_handler`: Called by `ansible-runner` as the process transitions state internally. AWX uses the `starting` status to know that `ansible-runner` has made all of its decisions around the process that it will launch. AWX gathers and associates these decisions with the Job for historical observation. From f51297199189325c95a71fe37fb216bb804805b0 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 5 Sep 2022 22:29:19 -0400 Subject: [PATCH 2/2] Add project sync to job cancel chain --- awx/main/models/mixins.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/awx/main/models/mixins.py b/awx/main/models/mixins.py index 34e05fa818ce..0e38d7288c54 100644 --- a/awx/main/models/mixins.py +++ b/awx/main/models/mixins.py @@ -412,6 +412,11 @@ class TaskManagerJobMixin(TaskManagerUnifiedJobMixin): class Meta: abstract = True + def get_jobs_fail_chain(self): + if self.project_update_id: + return [self.project_update] + return [] + class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin): class Meta: