Skip to content

Commit

Permalink
Style adjustments after rebase
Browse files Browse the repository at this point in the history
Reference task.sigterm_watcher instead of passing it in
  same pattern as the runner_callback object

Make the async task more robust in case it runs on another node

Mock on_commit so that unit tests pass
  • Loading branch information
AlanCoding committed Mar 16, 2022
1 parent 8f6aaff commit c4040dc
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion awx/main/tasks/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ def run(self, pk, **kwargs):
**params,
)
else:
receptor_job = AWXReceptorJob(self, params, sigterm_watcher=self.sigterm_watcher)
receptor_job = AWXReceptorJob(self, params)
res = receptor_job.run()
self.unit_id = receptor_job.unit_id

Expand Down
6 changes: 2 additions & 4 deletions awx/main/tasks/receptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def worker_cleanup(node_name, vargs, timeout=300.0):


class AWXReceptorJob:
def __init__(self, task, runner_params=None, sigterm_watcher=None):
def __init__(self, task, runner_params=None):
self.task = task
self.runner_params = runner_params
self.unit_id = None
Expand All @@ -265,8 +265,6 @@ def __init__(self, task, runner_params=None, sigterm_watcher=None):
if not settings.IS_K8S and self.work_type == 'local' and 'only_transmit_kwargs' not in self.runner_params:
self.runner_params['only_transmit_kwargs'] = True

self.sigterm_watcher = sigterm_watcher

def run(self):
# We establish a connection to the Receptor socket
receptor_ctl = get_receptor_ctl()
Expand Down Expand Up @@ -464,7 +462,7 @@ def cancel_watcher(self, processor_future):
if processor_future.done():
return processor_future.result()

if self.sigterm_watcher.cancel_callback():
if self.task.sigterm_watcher.cancel_callback():
return AnsibleRunnerResult('canceled', 1)

time.sleep(0.5)
Expand Down
4 changes: 2 additions & 2 deletions awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,11 @@ def cancel_unified_job(unified_job_id):
return
while unified_job.status in ACTIVE_STATES:
if unified_job.celery_task_id:
cancel_control_process.delay(unified_job.celery_task_id)
cancel_control_process.apply_async([unified_job.celery_task_id], queue=unified_job.get_queue_name())
logger.warning(f'sigterm issued to {unified_job.log_format} after it obtained a task id')
return
try:
unified_job.refresh_from_db(fields=['status'])
unified_job.refresh_from_db(fields=['status', 'controller_node', 'execution_node'])
except unified_job.DoesNotExist:
logger.info(f'Job id {unified_job_id} has been deleted, cancel aborted')
return
Expand Down
6 changes: 4 additions & 2 deletions awx/main/tests/unit/models/test_unified_job_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def unified_job(mocker):

def test_cancel(unified_job):

unified_job.cancel()
with mock.patch('awx.main.models.unified_jobs.connection.on_commit'):
unified_job.cancel()

assert unified_job.cancel_flag is True
assert unified_job.status == 'canceled'
Expand All @@ -49,7 +50,8 @@ def test_cancel(unified_job):
def test_cancel_job_explanation(unified_job):
job_explanation = 'giggity giggity'

unified_job.cancel(job_explanation=job_explanation)
with mock.patch('awx.main.models.unified_jobs.connection.on_commit'):
unified_job.cancel(job_explanation=job_explanation)

assert unified_job.job_explanation == job_explanation
unified_job.save.assert_called_with(update_fields=['cancel_flag', 'start_args', 'status', 'job_explanation'])
Expand Down

0 comments on commit c4040dc

Please sign in to comment.