Skip to content

Commit

Permalink
fix(alerts&reports): add celery soft timeout support (apache#13436)
Browse files Browse the repository at this point in the history
* fix(alerts&reports): add celery soft timeout support

* make a specific exception for screenshots timeout

* fix docs, add new test
  • Loading branch information
dpgaspar authored and Allan Caetano de Oliveira committed May 21, 2021
1 parent 7f35350 commit 2d1c7dc
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 15 deletions.
35 changes: 32 additions & 3 deletions superset/reports/commands/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from typing import Optional

import numpy as np
import pandas as pd
from celery.exceptions import SoftTimeLimitExceeded
from flask_babel import lazy_gettext as _

from superset import jinja_context
Expand All @@ -30,6 +32,7 @@
AlertQueryInvalidTypeError,
AlertQueryMultipleColumnsError,
AlertQueryMultipleRowsError,
AlertQueryTimeout,
AlertValidatorConfigError,
)

Expand All @@ -48,6 +51,20 @@ def __init__(self, report_schedule: ReportSchedule):
self._result: Optional[float] = None

def run(self) -> bool:
"""
Executes an alert SQL query and validates it.
Will set the report_schedule.last_value or last_value_row_json
with the query result
:return: bool, if the alert triggered or not
:raises AlertQueryError: SQL query is not valid
:raises AlertQueryInvalidTypeError: The output from the SQL query
is not an allowed type
:raises AlertQueryMultipleColumnsError: The SQL query returned multiple columns
:raises AlertQueryMultipleRowsError: The SQL query returned multiple rows
:raises AlertQueryTimeout: The SQL query received a celery soft timeout
:raises AlertValidatorConfigError: The validator query data is not valid
"""
self.validate()

if self._is_validator_not_null:
Expand Down Expand Up @@ -112,9 +129,13 @@ def _is_validator_operator(self) -> bool:
self._report_schedule.validator_type == ReportScheduleValidatorType.OPERATOR
)

def validate(self) -> None:
def _execute_query(self) -> pd.DataFrame:
"""
Validate the query result as a Pandas DataFrame
Executes the actual alert SQL query template
:return: A pandas dataframe
:raises AlertQueryError: SQL query is not valid
:raises AlertQueryTimeout: The SQL query received a celery soft timeout
"""
sql_template = jinja_context.get_template_processor(
database=self._report_schedule.database
Expand All @@ -124,10 +145,18 @@ def validate(self) -> None:
limited_rendered_sql = self._report_schedule.database.apply_limit_to_sql(
rendered_sql, ALERT_SQL_LIMIT
)
df = self._report_schedule.database.get_df(limited_rendered_sql)
return self._report_schedule.database.get_df(limited_rendered_sql)
except SoftTimeLimitExceeded:
raise AlertQueryTimeout()
except Exception as ex:
raise AlertQueryError(message=str(ex))

def validate(self) -> None:
"""
Validate the query result as a Pandas DataFrame
"""
df = self._execute_query()

if df.empty and self._is_validator_not_null:
self._result = None
return
Expand Down
8 changes: 8 additions & 0 deletions superset/reports/commands/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ class AlertQueryError(CommandException):
message = _("Alert found an error while executing a query.")


class AlertQueryTimeout(CommandException):
message = _("A timeout occurred while executing the query.")


class ReportScheduleScreenshotTimeout(CommandException):
message = _("A timeout occurred while taking a screenshot.")


class ReportScheduleAlertGracePeriodError(CommandException):
message = _("Alert fired during grace period.")

Expand Down
15 changes: 11 additions & 4 deletions superset/reports/commands/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
from datetime import datetime, timedelta
from typing import Any, List, Optional

from celery.exceptions import SoftTimeLimitExceeded
from flask_appbuilder.security.sqla.models import User
from sqlalchemy.orm import Session

from superset import app, thumbnail_cache
from superset import app
from superset.commands.base import BaseCommand
from superset.commands.exceptions import CommandException
from superset.models.reports import (
Expand All @@ -39,6 +40,7 @@
ReportScheduleNotificationError,
ReportSchedulePreviousWorkingError,
ReportScheduleScreenshotFailedError,
ReportScheduleScreenshotTimeout,
ReportScheduleSelleniumUserNotFoundError,
ReportScheduleStateNotFoundError,
ReportScheduleUnexpectedError,
Expand Down Expand Up @@ -172,9 +174,14 @@ def _get_screenshot(self) -> ScreenshotData:
)
image_url = self._get_url(user_friendly=True)
user = self._get_screenshot_user()
image_data = screenshot.compute_and_cache(
user=user, cache=thumbnail_cache, force=True,
)
try:
image_data = screenshot.get_screenshot(user=user)
except SoftTimeLimitExceeded:
raise ReportScheduleScreenshotTimeout()
except Exception as ex:
raise ReportScheduleScreenshotFailedError(
f"Failed taking a screenshot {str(ex)}"
)
if not image_data:
raise ReportScheduleScreenshotFailedError()
return ScreenshotData(url=image_url, image=image_data)
Expand Down
101 changes: 93 additions & 8 deletions tests/reports/commands_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import json
from datetime import datetime, timedelta
from typing import List, Optional
from unittest.mock import Mock, patch
from unittest.mock import patch

import pytest
from contextlib2 import contextmanager
Expand Down Expand Up @@ -46,6 +46,8 @@
ReportScheduleNotFoundError,
ReportScheduleNotificationError,
ReportSchedulePreviousWorkingError,
ReportScheduleScreenshotFailedError,
ReportScheduleScreenshotTimeout,
ReportScheduleWorkingTimeoutError,
)
from superset.reports.commands.execute import AsyncExecuteReportScheduleCommand
Expand Down Expand Up @@ -503,7 +505,7 @@ def create_invalid_sql_alert_email_chart(request):
"load_birth_names_dashboard_with_slices", "create_report_email_chart"
)
@patch("superset.reports.notifications.email.send_email_smtp")
@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
def test_email_chart_report_schedule(
screenshot_mock, email_mock, create_report_email_chart
):
Expand Down Expand Up @@ -541,7 +543,7 @@ def test_email_chart_report_schedule(
"load_birth_names_dashboard_with_slices", "create_report_email_dashboard"
)
@patch("superset.reports.notifications.email.send_email_smtp")
@patch("superset.utils.screenshots.DashboardScreenshot.compute_and_cache")
@patch("superset.utils.screenshots.DashboardScreenshot.get_screenshot")
def test_email_dashboard_report_schedule(
screenshot_mock, email_mock, create_report_email_dashboard
):
Expand Down Expand Up @@ -573,7 +575,7 @@ def test_email_dashboard_report_schedule(
"load_birth_names_dashboard_with_slices", "create_report_slack_chart"
)
@patch("superset.reports.notifications.slack.WebClient.files_upload")
@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
def test_slack_chart_report_schedule(
screenshot_mock, file_upload_mock, create_report_slack_chart
):
Expand Down Expand Up @@ -694,7 +696,7 @@ def test_report_schedule_success_grace_end(create_alert_slack_chart_grace):

@pytest.mark.usefixtures("create_alert_email_chart")
@patch("superset.reports.notifications.email.send_email_smtp")
@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
def test_alert_limit_is_applied(screenshot_mock, email_mock, create_alert_email_chart):
"""
ExecuteReport Command: Test that all alerts apply a SQL limit to stmts
Expand All @@ -718,7 +720,7 @@ def test_alert_limit_is_applied(screenshot_mock, email_mock, create_alert_email_
"load_birth_names_dashboard_with_slices", "create_report_email_dashboard"
)
@patch("superset.reports.notifications.email.send_email_smtp")
@patch("superset.utils.screenshots.DashboardScreenshot.compute_and_cache")
@patch("superset.utils.screenshots.DashboardScreenshot.get_screenshot")
def test_email_dashboard_report_fails(
screenshot_mock, email_mock, create_report_email_dashboard
):
Expand All @@ -744,7 +746,7 @@ def test_email_dashboard_report_fails(
"load_birth_names_dashboard_with_slices", "create_alert_email_chart"
)
@patch("superset.reports.notifications.email.send_email_smtp")
@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
def test_slack_chart_alert(screenshot_mock, email_mock, create_alert_email_chart):
"""
ExecuteReport Command: Test chart slack alert
Expand Down Expand Up @@ -794,6 +796,89 @@ def test_email_mul_alert(create_mul_alert_email_chart):
).run()


@pytest.mark.usefixtures(
"load_birth_names_dashboard_with_slices", "create_alert_email_chart"
)
@patch("superset.reports.notifications.email.send_email_smtp")
def test_soft_timeout_alert(email_mock, create_alert_email_chart):
"""
ExecuteReport Command: Test soft timeout on alert queries
"""
from celery.exceptions import SoftTimeLimitExceeded
from superset.reports.commands.exceptions import AlertQueryTimeout

with patch.object(
create_alert_email_chart.database.db_engine_spec, "execute", return_value=None
) as execute_mock:
execute_mock.side_effect = SoftTimeLimitExceeded()
with pytest.raises(AlertQueryTimeout):
AsyncExecuteReportScheduleCommand(
create_alert_email_chart.id, datetime.utcnow()
).run()

notification_targets = get_target_from_report_schedule(create_alert_email_chart)
# Assert the email smtp address, asserts a notification was sent with the error
assert email_mock.call_args[0][0] == notification_targets[0]

assert_log(
ReportState.ERROR, error_message="A timeout occurred while executing the query."
)


@pytest.mark.usefixtures(
"load_birth_names_dashboard_with_slices", "create_alert_email_chart"
)
@patch("superset.reports.notifications.email.send_email_smtp")
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
def test_soft_timeout_screenshot(screenshot_mock, email_mock, create_alert_email_chart):
"""
ExecuteReport Command: Test soft timeout on screenshot
"""
from celery.exceptions import SoftTimeLimitExceeded
from superset.reports.commands.exceptions import AlertQueryTimeout

screenshot_mock.side_effect = SoftTimeLimitExceeded()
with pytest.raises(ReportScheduleScreenshotTimeout):
AsyncExecuteReportScheduleCommand(
create_alert_email_chart.id, datetime.utcnow()
).run()

notification_targets = get_target_from_report_schedule(create_alert_email_chart)
# Assert the email smtp address, asserts a notification was sent with the error
assert email_mock.call_args[0][0] == notification_targets[0]

assert_log(
ReportState.ERROR, error_message="A timeout occurred while taking a screenshot."
)


@pytest.mark.usefixtures(
"load_birth_names_dashboard_with_slices", "create_alert_email_chart"
)
@patch("superset.reports.notifications.email.send_email_smtp")
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
def test_fail_screenshot(screenshot_mock, email_mock, create_alert_email_chart):
"""
ExecuteReport Command: Test soft timeout on screenshot
"""
from celery.exceptions import SoftTimeLimitExceeded
from superset.reports.commands.exceptions import AlertQueryTimeout

screenshot_mock.side_effect = Exception("Unexpected error")
with pytest.raises(ReportScheduleScreenshotFailedError):
AsyncExecuteReportScheduleCommand(
create_alert_email_chart.id, datetime.utcnow()
).run()

notification_targets = get_target_from_report_schedule(create_alert_email_chart)
# Assert the email smtp address, asserts a notification was sent with the error
assert email_mock.call_args[0][0] == notification_targets[0]

assert_log(
ReportState.ERROR, error_message="Failed taking a screenshot Unexpected error"
)


@pytest.mark.usefixtures("create_invalid_sql_alert_email_chart")
@patch("superset.reports.notifications.email.send_email_smtp")
def test_invalid_sql_alert(email_mock, create_invalid_sql_alert_email_chart):
Expand Down Expand Up @@ -860,7 +945,7 @@ def test_grace_period_error(email_mock, create_invalid_sql_alert_email_chart):

@pytest.mark.usefixtures("create_invalid_sql_alert_email_chart")
@patch("superset.reports.notifications.email.send_email_smtp")
@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
def test_grace_period_error_flap(
screenshot_mock, email_mock, create_invalid_sql_alert_email_chart
):
Expand Down

0 comments on commit 2d1c7dc

Please sign in to comment.