Skip to content

Commit

Permalink
patch default user notification policy changes + fix failing e2e test (
Browse files Browse the repository at this point in the history
…#4635)

# What this PR does

This is a follow-up PR to #4628.
As @Ferril pointed out, there was a slight issue in
`apps.alerts.tasks.notify_user.perform_notification` method when using a
"fallback"/default user notification policy. This is because the
`log_record_pk` arg passed into `perform_notification` will fetch the
`UserNotificationPolicyLogRecord` object, but that object will have a
`notification_policy` set to `None` (because there's no persistent
`UserNotificationPolicy` object to refer to).

Instead we now pass in a second argument to `perform_notification`,
`use_default_notification_policy_fallback`. If this is true, simply grab
the transient/in-memory `UserNotificationPolicy` and use that inside of
this task

Related to #4410

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
  • Loading branch information
joeyorlando authored Jul 9, 2024
1 parent 9982eca commit 34a9013
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 63 deletions.
81 changes: 55 additions & 26 deletions engine/apps/alerts/tasks/notify_user.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
import typing
from functools import partial

from celery.exceptions import Retry
Expand Down Expand Up @@ -42,7 +43,6 @@ def notify_user_task(

countdown = 0
stop_escalation = False
log_message = ""
log_record = None

with transaction.atomic():
Expand Down Expand Up @@ -121,20 +121,23 @@ def notify_user_task(
reason = None

def _create_user_notification_policy_log_record(**kwargs):
if using_fallback_default_notification_policy_step and "notification_policy" in kwargs:
kwargs["notification_policy"] = None
return UserNotificationPolicyLogRecord(**kwargs)
return UserNotificationPolicyLogRecord(
**kwargs,
using_fallback_default_notification_policy_step=using_fallback_default_notification_policy_step,
)

if notification_policy is None:
stop_escalation = True
log_record = _create_user_notification_policy_log_record(
def _create_notification_finished_user_notification_policy_log_record():
return _create_user_notification_policy_log_record(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FINISHED,
notification_policy=notification_policy,
alert_group=alert_group,
slack_prevent_posting=prevent_posting_to_thread,
)
log_message += "Personal escalation exceeded"

if notification_policy is None:
stop_escalation = True
log_record = _create_notification_finished_user_notification_policy_log_record()
else:
if (
(alert_group.acknowledged and not notify_even_acknowledged)
Expand Down Expand Up @@ -192,6 +195,7 @@ def _create_user_notification_policy_log_record(**kwargs):
notification_step=notification_policy.step,
notification_channel=notification_policy.notify_by,
)

if log_record: # log_record is None if user notification policy step is unspecified
# if this is the first notification step, and user hasn't been notified for this alert group - update metric
if (
Expand All @@ -208,25 +212,43 @@ def _create_user_notification_policy_log_record(**kwargs):
if notify_user_task.request.retries == 0:
transaction.on_commit(partial(send_user_notification_signal.apply_async, (log_record.pk,)))

if not stop_escalation:
if notification_policy.step != UserNotificationPolicy.Step.WAIT:
def _create_perform_notification_task(log_record_pk, alert_group_pk, use_default_notification_policy_fallback):
task = perform_notification.apply_async((log_record_pk, use_default_notification_policy_fallback))
task_logger.info(
f"Created perform_notification task {task} log_record={log_record_pk} " f"alert_group={alert_group_pk}"
)

def _create_perform_notification_task(log_record_pk, alert_group_pk):
task = perform_notification.apply_async((log_record_pk,))
task_logger.info(
f"Created perform_notification task {task} log_record={log_record_pk} "
f"alert_group={alert_group_pk}"
)
def _update_user_has_notification_active_notification_policy_id(active_policy_id: typing.Optional[str]) -> None:
user_has_notification.active_notification_policy_id = active_policy_id
user_has_notification.save(update_fields=["active_notification_policy_id"])

transaction.on_commit(partial(_create_perform_notification_task, log_record.pk, alert_group_pk))
def _reset_user_has_notification_active_notification_policy_id() -> None:
_update_user_has_notification_active_notification_policy_id(None)

create_perform_notification_task = partial(
_create_perform_notification_task,
log_record.pk,
alert_group_pk,
using_fallback_default_notification_policy_step,
)

if using_fallback_default_notification_policy_step:
# if we are using default notification policy, we're done escalating.. there's no further notification
# policy steps in this case. Kick off the perform_notification task, create the
# TYPE_PERSONAL_NOTIFICATION_FINISHED log record, and reset the active_notification_policy_id
transaction.on_commit(create_perform_notification_task)
_create_notification_finished_user_notification_policy_log_record()
_reset_user_has_notification_active_notification_policy_id()
elif not stop_escalation:
if notification_policy.step != UserNotificationPolicy.Step.WAIT:
transaction.on_commit(create_perform_notification_task)

delay = NEXT_ESCALATION_DELAY
if countdown is not None:
delay += countdown
task_id = celery_uuid()

user_has_notification.active_notification_policy_id = task_id
user_has_notification.save(update_fields=["active_notification_policy_id"])
_update_user_has_notification_active_notification_policy_id(task_id)

transaction.on_commit(
partial(
Expand All @@ -241,10 +263,8 @@ def _create_perform_notification_task(log_record_pk, alert_group_pk):
task_id=task_id,
)
)

else:
user_has_notification.active_notification_policy_id = None
user_has_notification.save(update_fields=["active_notification_policy_id"])
_reset_user_has_notification_active_notification_policy_id()


@shared_dedicated_queue_retry_task(
Expand All @@ -253,7 +273,7 @@ def _create_perform_notification_task(log_record_pk, alert_group_pk):
dont_autoretry_for=(Retry,),
max_retries=1 if settings.DEBUG else None,
)
def perform_notification(log_record_pk):
def perform_notification(log_record_pk, use_default_notification_policy_fallback):
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
from apps.telegram.models import TelegramToUserConnector

Expand All @@ -272,8 +292,13 @@ def perform_notification(log_record_pk):

user = log_record.author
alert_group = log_record.alert_group
notification_policy = log_record.notification_policy
notification_policy = (
UserNotificationPolicy.get_default_fallback_policy(user)
if use_default_notification_policy_fallback
else log_record.notification_policy
)
notification_channel = notification_policy.notify_by if notification_policy else None

if user is None or notification_policy is None:
UserNotificationPolicyLogRecord(
author=user,
Expand Down Expand Up @@ -310,7 +335,9 @@ def perform_notification(log_record_pk):
TelegramToUserConnector.notify_user(user, alert_group, notification_policy)
except RetryAfter as e:
countdown = getattr(e, "retry_after", 3)
raise perform_notification.retry((log_record_pk,), countdown=countdown, exc=e)
raise perform_notification.retry(
(log_record_pk, use_default_notification_policy_fallback), countdown=countdown, exc=e
)

elif notification_channel == UserNotificationPolicy.NotificationChannel.SLACK:
# TODO: refactor checking the possibility of sending a notification in slack
Expand Down Expand Up @@ -390,7 +417,9 @@ def perform_notification(log_record_pk):
f"does not exist. Restarting perform_notification."
)
restart_delay_seconds = 60
perform_notification.apply_async((log_record_pk,), countdown=restart_delay_seconds)
perform_notification.apply_async(
(log_record_pk, use_default_notification_policy_fallback), countdown=restart_delay_seconds
)
else:
task_logger.debug(
f"send_slack_notification for alert_group {alert_group.pk} failed because slack message "
Expand Down
54 changes: 46 additions & 8 deletions engine/apps/alerts/tests/test_notify_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_custom_backend_call(
)

with patch("apps.base.tests.messaging_backend.TestOnlyBackend.notify_user") as mock_notify_user:
perform_notification(log_record.pk)
perform_notification(log_record.pk, False)

mock_notify_user.assert_called_once_with(user_1, alert_group, user_notification_policy)

Expand Down Expand Up @@ -72,7 +72,7 @@ def test_custom_backend_error(

with patch("apps.alerts.tasks.notify_user.get_messaging_backend_from_id") as mock_get_backend:
mock_get_backend.return_value = None
perform_notification(log_record.pk)
perform_notification(log_record.pk, False)

error_log_record = UserNotificationPolicyLogRecord.objects.last()
assert error_log_record.type == UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED
Expand Down Expand Up @@ -119,7 +119,7 @@ def test_notify_user_missing_data_errors(

with patch("apps.alerts.tasks.notify_user.get_messaging_backend_from_id") as mock_get_backend:
mock_get_backend.return_value = None
perform_notification(log_record.pk)
perform_notification(log_record.pk, False)

error_log_record = UserNotificationPolicyLogRecord.objects.last()
assert error_log_record.type == UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED
Expand Down Expand Up @@ -154,7 +154,7 @@ def test_notify_user_perform_notification_error_if_viewer(
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
)

perform_notification(log_record.pk)
perform_notification(log_record.pk, False)

error_log_record = UserNotificationPolicyLogRecord.objects.last()
assert error_log_record.type == UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED
Expand Down Expand Up @@ -230,7 +230,7 @@ def test_perform_notification_reason_to_skip_escalation_in_slack(
if not error_code:
make_slack_message(alert_group=alert_group, channel_id="test_channel_id", slack_id="test_slack_id")
with patch.object(SlackMessage, "send_slack_notification") as mocked_send_slack_notification:
perform_notification(log_record.pk)
perform_notification(log_record.pk, False)
last_log_record = UserNotificationPolicyLogRecord.objects.last()

if error_code:
Expand Down Expand Up @@ -277,7 +277,7 @@ def test_perform_notification_slack_prevent_posting(
make_slack_message(alert_group=alert_group, channel_id="test_channel_id", slack_id="test_slack_id")

with patch.object(SlackMessage, "send_slack_notification") as mocked_send_slack_notification:
perform_notification(log_record.pk)
perform_notification(log_record.pk, False)

mocked_send_slack_notification.assert_not_called()
last_log_record = UserNotificationPolicyLogRecord.objects.last()
Expand All @@ -292,7 +292,7 @@ def test_perform_notification_slack_prevent_posting(
@pytest.mark.django_db
def test_perform_notification_missing_user_notification_policy_log_record(caplog):
invalid_pk = 12345
perform_notification(invalid_pk)
perform_notification(invalid_pk, False)

assert (
f"perform_notification: log_record {invalid_pk} doesn't exist. Skipping remainder of task. "
Expand Down Expand Up @@ -327,7 +327,45 @@ def test_perform_notification_telegram_retryafter_error(
exc = RetryAfter(countdown)
with patch.object(TelegramToUserConnector, "notify_user", side_effect=exc) as mock_notify_user:
with pytest.raises(RetryAfter):
perform_notification(log_record.pk)
perform_notification(log_record.pk, False)

mock_notify_user.assert_called_once_with(user, alert_group, user_notification_policy)
assert alert_group.personal_log_records.last() == log_record


@patch("apps.base.models.UserNotificationPolicy.get_default_fallback_policy")
@patch("apps.base.tests.messaging_backend.TestOnlyBackend.notify_user")
@pytest.mark.django_db
def test_perform_notification_use_default_notification_policy_fallback(
mock_notify_user,
mock_get_default_fallback_policy,
make_organization,
make_user,
make_alert_receive_channel,
make_alert_group,
make_user_notification_policy_log_record,
):
organization = make_organization()
user = make_user(organization=organization)
fallback_notification_policy = UserNotificationPolicy(
user=user,
step=UserNotificationPolicy.Step.NOTIFY,
notify_by=UserNotificationPolicy.NotificationChannel.TESTONLY,
important=False,
order=0,
)

mock_get_default_fallback_policy.return_value = fallback_notification_policy

alert_receive_channel = make_alert_receive_channel(organization=organization)
alert_group = make_alert_group(alert_receive_channel=alert_receive_channel)
log_record = make_user_notification_policy_log_record(
author=user,
alert_group=alert_group,
notification_policy=None,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
)

perform_notification(log_record.pk, True)

mock_notify_user.assert_called_once_with(user, alert_group, fallback_notification_policy)
22 changes: 22 additions & 0 deletions engine/apps/base/models/user_notification_policy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import typing
from enum import unique
from typing import Tuple

Expand All @@ -13,6 +14,11 @@
from common.ordered_model.ordered_model import OrderedModel
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length

if typing.TYPE_CHECKING:
from django.db.models.manager import RelatedManager

from apps.base.models import UserNotificationPolicyLogRecord


def generate_public_primary_key_for_notification_policy():
prefix = "N"
Expand Down Expand Up @@ -66,6 +72,9 @@ def validate_channel_choice(value):


class UserNotificationPolicy(OrderedModel):
personal_log_records: "RelatedManager['UserNotificationPolicyLogRecord']"
user: typing.Optional[User]

order_with_respect_to = ("user_id", "important")

public_primary_key = models.CharField(
Expand Down Expand Up @@ -129,6 +138,19 @@ def get_short_verbals_for_user(cls, user: User) -> Tuple[Tuple[str, ...], Tuple[

return default, important

@staticmethod
def get_default_fallback_policy(user: User) -> "UserNotificationPolicy":
return UserNotificationPolicy(
user=user,
step=UserNotificationPolicy.Step.NOTIFY,
notify_by=settings.EMAIL_BACKEND_INTERNAL_ID,
# The important flag doesn't really matter here.. since we're just using this as a transient/fallacbk
# in-memory object (important is really only used for allowing users to group their
# notification policy steps)
important=False,
order=0,
)

@property
def short_verbal(self) -> str:
if self.step == UserNotificationPolicy.Step.NOTIFY:
Expand Down
44 changes: 44 additions & 0 deletions engine/apps/base/models/user_notification_policy_log_record.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import typing

import humanize
from django.db import models
Expand All @@ -15,11 +16,45 @@
from apps.slack.slack_formatter import SlackFormatter
from common.utils import clean_markup

if typing.TYPE_CHECKING:
from apps.alerts.models import AlertGroup
from apps.user_management.models import User

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


def _check_if_notification_policy_is_transient_fallback(kwargs):
"""
If `using_fallback_default_notification_policy_step` is present, and `True`, then the `notification_policy`
field should be set to `None`. This is because we do not persist default notification policies in the
database. It only exists as a transient/in-memory object, and therefore has no foreign key to reference.
"""
using_fallback_default_notification_policy_step = kwargs.pop(
"using_fallback_default_notification_policy_step", False
)

if using_fallback_default_notification_policy_step:
kwargs.pop("notification_policy", None)


class UserNotificationPolicyLogRecordQuerySet(models.QuerySet):
def create(self, **kwargs):
"""
Needed for when we do something like this:
notification_policy = UserNotificationPolicy.objects.create(arg1="foo", ...)
"""
_check_if_notification_policy_is_transient_fallback(kwargs)
return super().create(**kwargs)


class UserNotificationPolicyLogRecord(models.Model):
alert_group: "AlertGroup"
author: typing.Optional["User"]
notification_policy: typing.Optional[UserNotificationPolicy]

objects: models.Manager["UserNotificationPolicyLogRecord"] = UserNotificationPolicyLogRecordQuerySet.as_manager()

(
TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
TYPE_PERSONAL_NOTIFICATION_FINISHED,
Expand Down Expand Up @@ -112,6 +147,15 @@ class UserNotificationPolicyLogRecord(models.Model):
notification_step = models.IntegerField(choices=UserNotificationPolicy.Step.choices, null=True, default=None)
notification_channel = models.IntegerField(validators=[validate_channel_choice], null=True, default=None)

def __init__(self, *args, **kwargs):
"""
Needed for when we do something like this:
notification_policy = UserNotificationPolicy(arg1="foo", ...)
notification_policy.save()
"""
_check_if_notification_policy_is_transient_fallback(kwargs)
super().__init__(*args, **kwargs)

def rendered_notification_log_line(self, for_slack=False, html=False):
timeline = render_relative_timeline(self.created_at, self.alert_group.started_at)

Expand Down
Loading

0 comments on commit 34a9013

Please sign in to comment.