Skip to content

Commit

Permalink
Fix notification timing issue by sending in the latter of 2 events (#…
Browse files Browse the repository at this point in the history
…12110)

* Track host_status_counts and use that to process notifications

* Remove now unused setting

* Back out changes to callback class not needed after all

* Skirt the need for duck typing by leaning on the cached field

* Delete tests for deleted task

* Revert "Back out changes to callback class not needed after all"

This reverts commit 3b8ae35.

* Directly hardcode stats_event_type for callback class

* Fire notifications if stats event was never sent

* Remove test content for deleted methods

* Add placeholder for when no hosts matched

* Make field default be None, denote events processed with empty dict

* Make UI process null value for host_status_counts

* Fix tracking of EOF dispatch for system jobs

* Reorganize EVENT_MAP into class properties

* Consolidate conditional I missed from EVENT_MAP refactor

* Give up on the null condition, also applies for empty hosts

* Remove cls position argument not being used

* Move wrapup method out of class, add tests
  • Loading branch information
AlanCoding authored Apr 29, 2022
1 parent 41b0607 commit 29d6084
Show file tree
Hide file tree
Showing 15 changed files with 140 additions and 182 deletions.
29 changes: 0 additions & 29 deletions awx/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,6 @@ def get_related(self, obj):

class ProjectUpdateDetailSerializer(ProjectUpdateSerializer):

host_status_counts = serializers.SerializerMethodField(help_text=_('A count of hosts uniquely assigned to each status.'))
playbook_counts = serializers.SerializerMethodField(help_text=_('A count of all plays and tasks for the job run.'))

class Meta:
Expand All @@ -1622,14 +1621,6 @@ def get_playbook_counts(self, obj):

return data

def get_host_status_counts(self, obj):
try:
counts = obj.project_update_events.only('event_data').get(event='playbook_on_stats').get_host_status_counts()
except ProjectUpdateEvent.DoesNotExist:
counts = {}

return counts


class ProjectUpdateListSerializer(ProjectUpdateSerializer, UnifiedJobListSerializer):
class Meta:
Expand Down Expand Up @@ -3107,7 +3098,6 @@ def get_summary_fields(self, obj):

class JobDetailSerializer(JobSerializer):

host_status_counts = serializers.SerializerMethodField(help_text=_('A count of hosts uniquely assigned to each status.'))
playbook_counts = serializers.SerializerMethodField(help_text=_('A count of all plays and tasks for the job run.'))
custom_virtualenv = serializers.ReadOnlyField()

Expand All @@ -3123,14 +3113,6 @@ def get_playbook_counts(self, obj):

return data

def get_host_status_counts(self, obj):
try:
counts = obj.get_event_queryset().only('event_data').get(event='playbook_on_stats').get_host_status_counts()
except JobEvent.DoesNotExist:
counts = {}

return counts


class JobCancelSerializer(BaseSerializer):

Expand Down Expand Up @@ -3319,21 +3301,10 @@ def validate_extra_vars(self, value):


class AdHocCommandDetailSerializer(AdHocCommandSerializer):

host_status_counts = serializers.SerializerMethodField(help_text=_('A count of hosts uniquely assigned to each status.'))

class Meta:
model = AdHocCommand
fields = ('*', 'host_status_counts')

def get_host_status_counts(self, obj):
try:
counts = obj.ad_hoc_command_events.only('event_data').get(event='playbook_on_stats').get_host_status_counts()
except AdHocCommandEvent.DoesNotExist:
counts = {}

return counts


class AdHocCommandCancelSerializer(AdHocCommandSerializer):

Expand Down
74 changes: 45 additions & 29 deletions awx/main/dispatch/worker/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from django.conf import settings
from django.utils.timezone import now as tz_now
from django.db import DatabaseError, OperationalError, connection as django_connection
from django.db import DatabaseError, OperationalError, transaction, connection as django_connection
from django.db.utils import InterfaceError, InternalError
from django_guid import set_guid

Expand All @@ -16,8 +16,8 @@
import redis

from awx.main.consumers import emit_channel_notification
from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent, UnifiedJob, Job
from awx.main.tasks.system import handle_success_and_failure_notifications
from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent, UnifiedJob
from awx.main.constants import ACTIVE_STATES
from awx.main.models.events import emit_event_detail
from awx.main.utils.profiling import AWXProfiler
import awx.main.analytics.subsystem_metrics as s_metrics
Expand All @@ -26,6 +26,32 @@
logger = logging.getLogger('awx.main.commands.run_callback_receiver')


def job_stats_wrapup(job_identifier, event=None):
"""Fill in the unified job host_status_counts, fire off notifications if needed"""
try:
# empty dict (versus default of None) can still indicate that events have been processed
# for job types like system jobs, and jobs with no hosts matched
host_status_counts = {}
if event:
host_status_counts = event.get_host_status_counts()

# Update host_status_counts while holding the row lock
with transaction.atomic():
uj = UnifiedJob.objects.select_for_update().get(pk=job_identifier)
uj.host_status_counts = host_status_counts
uj.save(update_fields=['host_status_counts'])

uj.log_lifecycle("stats_wrapup_finished")

# If the status was a finished state before this update was made, send notifications
# If not, we will send notifications when the status changes
if uj.status not in ACTIVE_STATES:
uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')

except Exception:
logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier))


class CallbackBrokerWorker(BaseWorker):
"""
A worker implementation that deserializes callback event data and persists
Expand Down Expand Up @@ -146,6 +172,8 @@ def flush(self, force=False):
if not getattr(e, '_skip_websocket_message', False):
metrics_events_broadcast += 1
emit_event_detail(e)
if getattr(e, '_notification_trigger_event', False):
job_stats_wrapup(getattr(e, e.JOB_REFERENCE), event=e)
self.buff = {}
self.last_flush = time.time()
# only update metrics if we saved events
Expand All @@ -165,47 +193,32 @@ def perform_work(self, body):
if flush:
self.last_event = ''
if not flush:
event_map = {
'job_id': JobEvent,
'ad_hoc_command_id': AdHocCommandEvent,
'project_update_id': ProjectUpdateEvent,
'inventory_update_id': InventoryUpdateEvent,
'system_job_id': SystemJobEvent,
}

job_identifier = 'unknown job'
for key, cls in event_map.items():
if key in body:
job_identifier = body[key]
for cls in (JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent):
if cls.JOB_REFERENCE in body:
job_identifier = body[cls.JOB_REFERENCE]
break

self.last_event = f'\n\t- {cls.__name__} for #{job_identifier} ({body.get("event", "")} {body.get("uuid", "")})' # noqa

notification_trigger_event = bool(body.get('event') == cls.WRAPUP_EVENT)

if body.get('event') == 'EOF':
try:
if 'guid' in body:
set_guid(body['guid'])
final_counter = body.get('final_counter', 0)
logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier))
logger.info('Starting EOF event processing for Job {}'.format(job_identifier))
# EOF events are sent when stdout for the running task is
# closed. don't actually persist them to the database; we
# just use them to report `summary` websocket events as an
# approximation for when a job is "done"
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=job_identifier, final_counter=final_counter))
# Additionally, when we've processed all events, we should
# have all the data we need to send out success/failure
# notification templates
uj = UnifiedJob.objects.get(pk=job_identifier)

if isinstance(uj, Job):
# *actual playbooks* send their success/failure
# notifications in response to the playbook_on_stats
# event handling code in main.models.events
pass
elif hasattr(uj, 'send_notification_templates'):
handle_success_and_failure_notifications.apply_async([uj.id])

if notification_trigger_event:
job_stats_wrapup(job_identifier)
except Exception:
logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier))
logger.exception('Worker failed to perform EOF tasks: Job {}'.format(job_identifier))
finally:
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -1)
set_guid('')
Expand All @@ -215,9 +228,12 @@ def perform_work(self, body):

event = cls.create_from_data(**body)

if skip_websocket_message:
if skip_websocket_message: # if this event sends websocket messages, fire them off on flush
event._skip_websocket_message = True

if notification_trigger_event: # if this is an Ansible stats event, ensure notifications on flush
event._notification_trigger_event = True

self.buff.setdefault(cls, []).append(event)

retries = 0
Expand Down
18 changes: 18 additions & 0 deletions awx/main/migrations/0161_unifiedjob_host_status_counts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.12 on 2022-04-27 02:16

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('main', '0160_alter_schedule_rrule'),
]

operations = [
migrations.AddField(
model_name='unifiedjob',
name='host_status_counts',
field=models.JSONField(blank=True, default=None, editable=False, help_text='Playbook stats from the Ansible playbook_on_stats event.', null=True),
),
]
18 changes: 9 additions & 9 deletions awx/main/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db import models, DatabaseError, connection
from django.db import models, DatabaseError
from django.utils.dateparse import parse_datetime
from django.utils.text import Truncator
from django.utils.timezone import utc, now
Expand Down Expand Up @@ -126,6 +126,7 @@ class BasePlaybookEvent(CreatedModifiedModel):
'host_name',
'verbosity',
]
WRAPUP_EVENT = 'playbook_on_stats'

class Meta:
abstract = True
Expand Down Expand Up @@ -384,14 +385,6 @@ def _update_from_event_data(self):
job.get_event_queryset().filter(uuid__in=changed).update(changed=True)
job.get_event_queryset().filter(uuid__in=failed).update(failed=True)

# send success/failure notifications when we've finished handling the playbook_on_stats event
from awx.main.tasks.system import handle_success_and_failure_notifications # circular import

def _send_notifications():
handle_success_and_failure_notifications.apply_async([job.id])

connection.on_commit(_send_notifications)

for field in ('playbook', 'play', 'task', 'role'):
value = force_str(event_data.get(field, '')).strip()
if value != getattr(self, field):
Expand Down Expand Up @@ -470,6 +463,7 @@ class JobEvent(BasePlaybookEvent):
"""

VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['job_id', 'workflow_job_id', 'job_created']
JOB_REFERENCE = 'job_id'

objects = DeferJobCreatedManager()

Expand Down Expand Up @@ -600,6 +594,7 @@ class Meta:
class ProjectUpdateEvent(BasePlaybookEvent):

VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['project_update_id', 'workflow_job_id', 'job_created']
JOB_REFERENCE = 'project_update_id'

objects = DeferJobCreatedManager()

Expand Down Expand Up @@ -641,6 +636,7 @@ class BaseCommandEvent(CreatedModifiedModel):
"""

VALID_KEYS = ['event_data', 'created', 'counter', 'uuid', 'stdout', 'start_line', 'end_line', 'verbosity']
WRAPUP_EVENT = 'EOF'

class Meta:
abstract = True
Expand Down Expand Up @@ -736,6 +732,8 @@ def _update_from_event_data(self):
class AdHocCommandEvent(BaseCommandEvent):

VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command_id', 'event', 'host_name', 'host_id', 'workflow_job_id', 'job_created']
WRAPUP_EVENT = 'playbook_on_stats' # exception to BaseCommandEvent
JOB_REFERENCE = 'ad_hoc_command_id'

objects = DeferJobCreatedManager()

Expand Down Expand Up @@ -836,6 +834,7 @@ class Meta:
class InventoryUpdateEvent(BaseCommandEvent):

VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['inventory_update_id', 'workflow_job_id', 'job_created']
JOB_REFERENCE = 'inventory_update_id'

objects = DeferJobCreatedManager()

Expand Down Expand Up @@ -881,6 +880,7 @@ class Meta:
class SystemJobEvent(BaseCommandEvent):

VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['system_job_id', 'job_created']
JOB_REFERENCE = 'system_job_id'

objects = DeferJobCreatedManager()

Expand Down
15 changes: 1 addition & 14 deletions awx/main/models/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,21 +421,8 @@ def context(self, serialized_job):
The context will contain allowed content retrieved from a serialized job object
(see JobNotificationMixin.JOB_FIELDS_ALLOWED_LIST the job's friendly name,
and a url to the job run."""
job_context = {'host_status_counts': {}}
summary = None
try:
has_event_property = any([f for f in self.event_class._meta.fields if f.name == 'event'])
except NotImplementedError:
has_event_property = False
if has_event_property:
qs = self.get_event_queryset()
if qs:
event = qs.only('event_data').filter(event='playbook_on_stats').first()
if event:
summary = event.get_host_status_counts()
job_context['host_status_counts'] = summary
context = {
'job': job_context,
'job': {'host_status_counts': self.host_status_counts},
'job_friendly_name': self.get_notification_friendly_name(),
'url': self.get_ui_url(),
'job_metadata': json.dumps(self.notification_data(), ensure_ascii=False, indent=4),
Expand Down
7 changes: 7 additions & 0 deletions awx/main/models/unified_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,13 @@ class Meta:
editable=False,
help_text=_("The version of Ansible Core installed in the execution environment."),
)
host_status_counts = models.JSONField(
blank=True,
null=True,
default=None,
editable=False,
help_text=_("Playbook stats from the Ansible playbook_on_stats event."),
)
work_unit_id = models.CharField(
max_length=255, blank=True, default=None, editable=False, null=True, help_text=_("The Receptor work unit ID associated with this job.")
)
Expand Down
Loading

0 comments on commit 29d6084

Please sign in to comment.