Skip to content

Commit

Permalink
fix: improve Slack rate limiting logic when updating alert groups
Browse files Browse the repository at this point in the history
  • Loading branch information
joeyorlando committed Nov 21, 2024
1 parent fda05a6 commit 2e7e46f
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 26 deletions.
34 changes: 18 additions & 16 deletions engine/apps/alerts/models/alert_receive_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
metrics_remove_deleted_integration_from_cache,
metrics_update_integration_cache,
)
from apps.slack.constants import SLACK_RATE_LIMIT_DELAY, SLACK_RATE_LIMIT_TIMEOUT
from apps.slack.constants import SLACK_RATE_LIMIT_TIMEOUT
from apps.slack.tasks import post_slack_rate_limit_message
from apps.slack.utils import post_message_to_channel
from common.api_helpers.utils import create_engine_url
Expand All @@ -43,7 +43,7 @@

from apps.alerts.models import AlertGroup, ChannelFilter
from apps.labels.models import AlertReceiveChannelAssociatedLabel
from apps.user_management.models import Organization, Team
from apps.user_management.models import Organization, Team, User

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -391,7 +391,7 @@ def save(self, *args, **kwargs):

return super().save(*args, **kwargs)

def change_team(self, team_id, user):
def change_team(self, team_id: int, user: "User") -> None:
if team_id == self.team_id:
raise TeamCanNotBeChangedError("Integration is already in this team")

Expand All @@ -409,52 +409,54 @@ def grafana_alerting_sync_manager(self):
return GrafanaAlertingSyncManager(self)

@property
def is_alerting_integration(self):
def is_alerting_integration(self) -> bool:
return self.integration in {
AlertReceiveChannel.INTEGRATION_GRAFANA_ALERTING,
AlertReceiveChannel.INTEGRATION_LEGACY_GRAFANA_ALERTING,
}

@cached_property
def team_name(self):
def team_name(self) -> str:
return self.team.name if self.team else "No team"

@cached_property
def team_id_or_no_team(self):
def team_id_or_no_team(self) -> str:
return self.team_id if self.team else "no_team"

@cached_property
def emojized_verbal_name(self):
def emojized_verbal_name(self) -> str:
return emoji.emojize(self.verbal_name, language="alias")

@property
def new_incidents_web_link(self):
def new_incidents_web_link(self) -> str:
from apps.alerts.models import AlertGroup

return UIURLBuilder(self.organization).alert_groups(
f"?integration={self.public_primary_key}&status={AlertGroup.NEW}",
)

@property
def is_rate_limited_in_slack(self):
def is_rate_limited_in_slack(self) -> bool:
return (
self.rate_limited_in_slack_at is not None
and self.rate_limited_in_slack_at + SLACK_RATE_LIMIT_TIMEOUT > timezone.now()
)

def start_send_rate_limit_message_task(self, delay=SLACK_RATE_LIMIT_DELAY):
def start_send_rate_limit_message_task(self, error_message_verb: str, delay: int) -> None:
task_id = celery_uuid()

self.rate_limit_message_task_id = task_id
self.rate_limited_in_slack_at = timezone.now()
self.save(update_fields=["rate_limit_message_task_id", "rate_limited_in_slack_at"])
post_slack_rate_limit_message.apply_async((self.pk,), countdown=delay, task_id=task_id)

post_slack_rate_limit_message.apply_async((self.pk, error_message_verb), countdown=delay, task_id=task_id)

@property
def alert_groups_count(self):
def alert_groups_count(self) -> int:
return self.alert_groups.count()

@property
def alerts_count(self):
def alerts_count(self) -> int:
from apps.alerts.models import Alert

return Alert.objects.filter(group__channel=self).count()
Expand Down Expand Up @@ -548,14 +550,14 @@ def integration_url(self) -> str | None:
return create_engine_url(f"integrations/v1/{slug}/{self.token}/")

@property
def inbound_email(self):
def inbound_email(self) -> typing.Optional[str]:
if self.integration != AlertReceiveChannel.INTEGRATION_INBOUND_EMAIL:
return None

return f"{self.token}@{live_settings.INBOUND_EMAIL_DOMAIN}"

@property
def default_channel_filter(self):
def default_channel_filter(self) -> typing.Optional["ChannelFilter"]:
return self.channel_filters.filter(is_default=True).first()

# Templating
Expand Down Expand Up @@ -590,7 +592,7 @@ def templates(self):
}

@property
def is_available_for_custom_templates(self):
def is_available_for_custom_templates(self) -> bool:
return True

# Maintenance
Expand Down
27 changes: 23 additions & 4 deletions engine/apps/slack/alert_group_slack_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import typing

from django.core.cache import cache

from apps.slack.client import SlackClient
from apps.slack.errors import (
SlackAPICantUpdateMessageError,
Expand All @@ -23,6 +25,11 @@
class AlertGroupSlackService:
_slack_client: SlackClient

UPDATE_ALERT_GROUP_DEBOUNCE_INTERVAL_SECONDS = 30
"""
Time in seconds to wait before allowing the next update to the Alert Group slack message
"""

def __init__(
self,
slack_team_identity: "SlackTeamIdentity",
Expand All @@ -35,7 +42,15 @@ def __init__(
self._slack_client = SlackClient(slack_team_identity)

def update_alert_group_slack_message(self, alert_group: "AlertGroup") -> None:
logger.info(f"Update message for alert_group {alert_group.pk}")
alert_group_pk = alert_group.pk
debounce_alert_group_update_cache_key = f"debounce_update_alert_group_slack_message_{alert_group_pk}"

logger.info(f"Update message for alert_group {alert_group_pk}")

# Check if the method has been called recently for this alert_group, if so skip to avoid approaching rate limits
if cache.get(debounce_alert_group_update_cache_key):
logger.info(f"Skipping update for alert_group {alert_group_pk} due to debounce interval")
return

try:
self._slack_client.chat_update(
Expand All @@ -44,13 +59,14 @@ def update_alert_group_slack_message(self, alert_group: "AlertGroup") -> None:
attachments=alert_group.render_slack_attachments(),
blocks=alert_group.render_slack_blocks(),
)
logger.info(f"Message has been updated for alert_group {alert_group.pk}")

logger.info(f"Message has been updated for alert_group {alert_group_pk}")
except SlackAPIRatelimitError as e:
if not alert_group.channel.is_maintenace_integration:
if not alert_group.channel.is_rate_limited_in_slack:
alert_group.channel.start_send_rate_limit_message_task(e.retry_after)
alert_group.channel.start_send_rate_limit_message_task("Updating", e.retry_after)
logger.info(
f"Message has not been updated for alert_group {alert_group.pk} due to slack rate limit."
f"Message has not been updated for alert_group {alert_group_pk} due to slack rate limit."
)
else:
raise
Expand All @@ -62,6 +78,9 @@ def update_alert_group_slack_message(self, alert_group: "AlertGroup") -> None:
SlackAPIChannelNotFoundError,
):
pass
finally:
# Set the cache key to enforce debounce interval
cache.set(debounce_alert_group_update_cache_key, True, self.UPDATE_ALERT_GROUP_DEBOUNCE_INTERVAL_SECONDS)

def publish_message_to_alert_group_thread(
self, alert_group: "AlertGroup", attachments=None, mrkdwn=True, unfurl_links=True, text=None
Expand Down
2 changes: 1 addition & 1 deletion engine/apps/slack/scenarios/distribute_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _post_alert_group_to_slack(
if not alert_group.channel.is_maintenace_integration:
alert_group.reason_to_skip_escalation = AlertGroup.RATE_LIMITED
alert_group.save(update_fields=["reason_to_skip_escalation"])
alert_group.channel.start_send_rate_limit_message_task(e.retry_after)
alert_group.channel.start_send_rate_limit_message_task("Delivering", e.retry_after)
logger.info("Not delivering alert due to slack rate limit.")
else:
raise e
Expand Down
9 changes: 4 additions & 5 deletions engine/apps/slack/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def populate_slack_user_identities(organization_pk):
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def post_slack_rate_limit_message(integration_id):
def post_slack_rate_limit_message(integration_id: int, error_message_verb: str) -> None:
from apps.alerts.models import AlertReceiveChannel

try:
Expand All @@ -306,10 +306,9 @@ def post_slack_rate_limit_message(integration_id):
default_route = integration.channel_filters.get(is_default=True)
if (slack_channel_id := default_route.slack_channel_id_or_org_default_id) is not None:
text = (
f"Delivering and updating alert groups of integration {integration.verbal_name} in Slack is "
f"temporarily stopped due to rate limit. You could find new alert groups at "
f"<{integration.new_incidents_web_link}|web page "
'"Alert Groups">'
f"{error_message_verb} Alert Groups in Slack, for integration {integration.verbal_name}, is "
f"temporarily rate-limited (due to a Slack rate-limit). Meanwhile, you can still find new Alert Groups "
f'in the <{integration.new_incidents_web_link}|"Alert Groups" web page>'
)
post_message_to_channel(integration.organization, slack_channel_id, text)

Expand Down
Loading

0 comments on commit 2e7e46f

Please sign in to comment.