diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index ddf4f19cfccb5..774f2c897d8ba 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -44,7 +44,10 @@ ReportScheduleUnexpectedError, ReportScheduleWorkingTimeoutError, ) -from superset.reports.dao import ReportScheduleDAO +from superset.reports.dao import ( + REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER, + ReportScheduleDAO, +) from superset.reports.notifications import create_notification from superset.reports.notifications.base import NotificationContent, ScreenshotData from superset.reports.notifications.exceptions import NotificationError @@ -147,6 +150,7 @@ def _get_screenshot_user(self) -> User: def _get_screenshot(self) -> ScreenshotData: """ Get a chart or dashboard screenshot + :raises: ReportScheduleScreenshotFailedError """ screenshot: Optional[BaseScreenshot] = None @@ -170,6 +174,7 @@ def _get_screenshot(self) -> ScreenshotData: def _get_notification_content(self) -> NotificationContent: """ Gets a notification content, this is composed by a title and a screenshot + :raises: ReportScheduleScreenshotFailedError """ screenshot_data = self._get_screenshot() @@ -185,14 +190,13 @@ def _get_notification_content(self) -> NotificationContent: ) return NotificationContent(name=name, screenshot=screenshot_data) - def send(self) -> None: + def _send(self, notification_content: NotificationContent) -> None: """ - Creates the notification content and sends them to all recipients + Sends a notification to all recipients :raises: ReportScheduleNotificationError """ notification_errors = [] - notification_content = self._get_notification_content() for recipient in self._report_schedule.recipients: notification = create_notification(recipient, notification_content) try: @@ -203,6 +207,24 @@ def send(self) -> None: if notification_errors: raise ReportScheduleNotificationError(";".join(notification_errors)) + def send(self) -> None: + """ + Creates the notification content and sends them to all recipients + + :raises: ReportScheduleNotificationError + """ + notification_content = self._get_notification_content() + self._send(notification_content) + + def send_error(self, name: str, message: str) -> None: + """ + Creates and sends a notification for an error, to all recipients + + :raises: ReportScheduleNotificationError + """ + notification_content = NotificationContent(name=name, text=message) + self._send(notification_content) + def is_in_grace_period(self) -> bool: """ Checks if an alert is on it's grace period @@ -218,6 +240,23 @@ def is_in_grace_period(self) -> bool: < last_success.end_dttm ) + def is_in_error_grace_period(self) -> bool: + """ + Checks if an alert/report on error is on it's notification grace period + """ + last_success = ReportScheduleDAO.find_last_error_notification( + self._report_schedule, session=self._session + ) + if not last_success: + return False + return ( + last_success is not None + and self._report_schedule.grace_period + and datetime.utcnow() + - timedelta(seconds=self._report_schedule.grace_period) + < last_success.end_dttm + ) + def is_on_working_timeout(self) -> bool: """ Checks if an alert is on a working timeout @@ -256,9 +295,25 @@ def next(self) -> None: return self.send() self.set_state_and_log(ReportState.SUCCESS) - except CommandException as ex: - self.set_state_and_log(ReportState.ERROR, error_message=str(ex)) - raise ex + except CommandException as first_ex: + self.set_state_and_log(ReportState.ERROR, error_message=str(first_ex)) + # TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE + if not self.is_in_error_grace_period(): + try: + self.send_error( + f"Error occurred for {self._report_schedule.type}:" + f" {self._report_schedule.name}", + str(first_ex), + ) + self.set_state_and_log( + ReportState.ERROR, + error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER, + ) + except CommandException as second_ex: + self.set_state_and_log( + ReportState.ERROR, error_message=str(second_ex) + ) + raise first_ex class ReportWorkingState(BaseReportState): diff --git a/superset/reports/commands/log_prune.py b/superset/reports/commands/log_prune.py index 4280d1b46fed8..4b577b1f55dca 100644 --- a/superset/reports/commands/log_prune.py +++ b/superset/reports/commands/log_prune.py @@ -50,9 +50,9 @@ def run(self) -> None: report_schedule, from_date, session=session, commit=False ) logger.info( - "Deleted %s logs for %s", + "Deleted %s logs for report schedule id: %s", str(row_count), - ReportSchedule.name, + str(report_schedule.id), ) except DAODeleteFailedError as ex: prune_errors.append(str(ex)) diff --git a/superset/reports/dao.py b/superset/reports/dao.py index 4e3e9495404c2..5ffa0f0f10aa2 100644 --- a/superset/reports/dao.py +++ b/superset/reports/dao.py @@ -35,6 +35,9 @@ logger = logging.getLogger(__name__) +REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER = "Notification sent with error" + + class ReportScheduleDAO(BaseDAO): model_cls = ReportSchedule @@ -223,6 +226,41 @@ def find_last_success_log( .first() ) + @staticmethod + def find_last_error_notification( + report_schedule: ReportSchedule, session: Optional[Session] = None, + ) -> Optional[ReportExecutionLog]: + """ + Finds last error email sent + """ + session = session or db.session + last_error_email_log = ( + session.query(ReportExecutionLog) + .filter( + ReportExecutionLog.error_message + == REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER, + ReportExecutionLog.report_schedule == report_schedule, + ) + .order_by(ReportExecutionLog.end_dttm.desc()) + .first() + ) + if not last_error_email_log: + return None + # Checks that only errors have occurred since the last email + report_from_last_email = ( + session.query(ReportExecutionLog) + .filter( + ReportExecutionLog.state.notin_( + [ReportState.ERROR, ReportState.WORKING] + ), + ReportExecutionLog.report_schedule == report_schedule, + ReportExecutionLog.end_dttm < last_error_email_log.end_dttm, + ) + .order_by(ReportExecutionLog.end_dttm.desc()) + .first() + ) + return last_error_email_log if not report_from_last_email else None + @staticmethod def bulk_delete_logs( model: ReportSchedule, diff --git a/superset/reports/notifications/base.py b/superset/reports/notifications/base.py index f55154c1e7430..5fe7fe9bcb791 100644 --- a/superset/reports/notifications/base.py +++ b/superset/reports/notifications/base.py @@ -30,7 +30,8 @@ class ScreenshotData: @dataclass class NotificationContent: name: str - screenshot: ScreenshotData + screenshot: Optional[ScreenshotData] = None + text: Optional[str] = None class BaseNotification: # pylint: disable=too-few-public-methods diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py index e99a7f43e37da..f2bd6e6fd8577 100644 --- a/superset/reports/notifications/email.py +++ b/superset/reports/notifications/email.py @@ -19,7 +19,7 @@ import logging from dataclasses import dataclass from email.utils import make_msgid, parseaddr -from typing import Dict +from typing import Dict, Optional from flask_babel import gettext as __ @@ -35,7 +35,7 @@ @dataclass class EmailContent: body: str - images: Dict[str, bytes] + images: Optional[Dict[str, bytes]] = None class EmailNotification(BaseNotification): # pylint: disable=too-few-public-methods @@ -49,22 +49,35 @@ class EmailNotification(BaseNotification): # pylint: disable=too-few-public-met def _get_smtp_domain() -> str: return parseaddr(app.config["SMTP_MAIL_FROM"])[1].split("@")[1] - def _get_content(self) -> EmailContent: - # Get the domain from the 'From' address .. - # and make a message id without the < > in the ends - domain = self._get_smtp_domain() - msgid = make_msgid(domain)[1:-1] - - image = {msgid: self._content.screenshot.image} - body = __( + @staticmethod + def _error_template(text: str) -> str: + return __( """ - Explore in Superset

- + Error: %(text)s """, - url=self._content.screenshot.url, - msgid=msgid, + text=text, ) - return EmailContent(body=body, images=image) + + def _get_content(self) -> EmailContent: + if self._content.text: + return EmailContent(body=self._error_template(self._content.text)) + # Get the domain from the 'From' address .. + # and make a message id without the < > in the end + if self._content.screenshot: + domain = self._get_smtp_domain() + msgid = make_msgid(domain)[1:-1] + + image = {msgid: self._content.screenshot.image} + body = __( + """ + Explore in Superset

+ + """, + url=self._content.screenshot.url, + msgid=msgid, + ) + return EmailContent(body=body, images=image) + return EmailContent(body=self._error_template("Unexpected missing screenshot")) def _get_subject(self) -> str: return __( diff --git a/superset/reports/notifications/slack.py b/superset/reports/notifications/slack.py index 8e859ffc894fc..84f858c1e2eaf 100644 --- a/superset/reports/notifications/slack.py +++ b/superset/reports/notifications/slack.py @@ -18,13 +18,12 @@ import json import logging from io import IOBase -from typing import cast, Optional, Union +from typing import Optional, Union from flask_babel import gettext as __ from retry.api import retry from slack import WebClient from slack.errors import SlackApiError, SlackClientError -from slack.web.slack_response import SlackResponse from superset import app from superset.models.reports import ReportRecipientType @@ -44,46 +43,52 @@ class SlackNotification(BaseNotification): # pylint: disable=too-few-public-met def _get_channel(self) -> str: return json.loads(self._recipient.recipient_config_json)["target"] - def _get_body(self) -> str: + @staticmethod + def _error_template(name: str, text: str) -> str: return __( """ *%(name)s*\n - <%(url)s|Explore in Superset> + Error: %(text)s """, - name=self._content.name, - url=self._content.screenshot.url, + name=name, + text=text, ) + def _get_body(self) -> str: + if self._content.text: + return self._error_template(self._content.name, self._content.text) + if self._content.screenshot: + return __( + """ + *%(name)s*\n + <%(url)s|Explore in Superset> + """, + name=self._content.name, + url=self._content.screenshot.url, + ) + return self._error_template(self._content.name, "Unexpected missing screenshot") + def _get_inline_screenshot(self) -> Optional[Union[str, IOBase, bytes]]: - return self._content.screenshot.image + if self._content.screenshot: + return self._content.screenshot.image + return None @retry(SlackApiError, delay=10, backoff=2, tries=5) def send(self) -> None: file = self._get_inline_screenshot() channel = self._get_channel() body = self._get_body() - try: client = WebClient( token=app.config["SLACK_API_TOKEN"], proxy=app.config["SLACK_PROXY"] ) # files_upload returns SlackResponse as we run it in sync mode. if file: - response = cast( - SlackResponse, - client.files_upload( - channels=channel, - file=file, - initial_comment=body, - title="subject", - ), + client.files_upload( + channels=channel, file=file, initial_comment=body, title="subject", ) - assert response["file"], str(response) # the uploaded file else: - response = cast( - SlackResponse, client.chat_postMessage(channel=channel, text=body), - ) - assert response["message"]["text"], str(response) + client.chat_postMessage(channel=channel, text=body) logger.info("Report sent to slack") except SlackClientError as ex: raise NotificationError(ex) diff --git a/superset/utils/core.py b/superset/utils/core.py index 1b5ef70ee6086..4efad98bcc716 100644 --- a/superset/utils/core.py +++ b/superset/utils/core.py @@ -958,7 +958,7 @@ def send_mime_email( smtp.starttls() if smtp_user and smtp_password: smtp.login(smtp_user, smtp_password) - logger.info("Sent an email to %s", str(e_to)) + logger.debug("Sent an email to %s", str(e_to)) smtp.sendmail(e_from, e_to, mime_msg.as_string()) smtp.quit() else: diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py index e25b6ecef7922..1de97ebec5bc3 100644 --- a/tests/reports/commands_tests.py +++ b/tests/reports/commands_tests.py @@ -21,6 +21,7 @@ import pytest from contextlib2 import contextmanager +from flask_sqlalchemy import BaseQuery from freezegun import freeze_time from sqlalchemy.sql import func @@ -62,13 +63,34 @@ ) -def get_target_from_report_schedule(report_schedule) -> List[str]: +def get_target_from_report_schedule(report_schedule: ReportSchedule) -> List[str]: return [ json.loads(recipient.recipient_config_json)["target"] for recipient in report_schedule.recipients ] +def get_error_logs_query(report_schedule: ReportSchedule) -> BaseQuery: + return ( + db.session.query(ReportExecutionLog) + .filter( + ReportExecutionLog.report_schedule == report_schedule, + ReportExecutionLog.state == ReportState.ERROR, + ) + .order_by(ReportExecutionLog.end_dttm.desc()) + ) + + +def get_notification_error_sent_count(report_schedule: ReportSchedule) -> int: + logs = get_error_logs_query(report_schedule).all() + notification_sent_logs = [ + log.error_message + for log in logs + if log.error_message == "Notification sent with error" + ] + return len(notification_sent_logs) + + def assert_log(state: str, error_message: Optional[str] = None): db.session.commit() logs = db.session.query(ReportExecutionLog).all() @@ -77,7 +99,11 @@ def assert_log(state: str, error_message: Optional[str] = None): assert logs[0].error_message == error_message assert logs[0].state == state return - assert len(logs) == 2 + # On error we send an email + if state == ReportState.ERROR: + assert len(logs) == 3 + else: + assert len(logs) == 2 log_states = [log.state for log in logs] assert ReportState.WORKING in log_states assert state in log_states @@ -94,6 +120,7 @@ def create_report_notification( report_type: Optional[str] = None, validator_type: Optional[str] = None, validator_config_json: Optional[str] = None, + grace_period: Optional[int] = None, ) -> ReportSchedule: report_type = report_type or ReportScheduleType.REPORT target = email_target or slack_channel @@ -121,6 +148,7 @@ def create_report_notification( recipients=[recipient], validator_type=validator_type, validator_config_json=validator_config_json, + grace_period=grace_period, ) return report_schedule @@ -464,6 +492,7 @@ def create_invalid_sql_alert_email_chart(request): validator_config_json=param_config[request.param][ "validator_config_json" ], + grace_period=60 * 60, ) yield report_schedule @@ -766,7 +795,8 @@ def test_email_mul_alert(create_mul_alert_email_chart): @pytest.mark.usefixtures("create_invalid_sql_alert_email_chart") -def test_invalid_sql_alert(create_invalid_sql_alert_email_chart): +@patch("superset.reports.notifications.email.send_email_smtp") +def test_invalid_sql_alert(email_mock, create_invalid_sql_alert_email_chart): """ ExecuteReport Command: Test alert with invalid SQL statements """ @@ -775,3 +805,120 @@ def test_invalid_sql_alert(create_invalid_sql_alert_email_chart): AsyncExecuteReportScheduleCommand( create_invalid_sql_alert_email_chart.id, datetime.utcnow() ).run() + + notification_targets = get_target_from_report_schedule( + create_invalid_sql_alert_email_chart + ) + # Assert the email smtp address, asserts a notification was sent with the error + assert email_mock.call_args[0][0] == notification_targets[0] + + +@pytest.mark.usefixtures("create_invalid_sql_alert_email_chart") +@patch("superset.reports.notifications.email.send_email_smtp") +def test_grace_period_error(email_mock, create_invalid_sql_alert_email_chart): + """ + ExecuteReport Command: Test alert grace period on error + """ + with freeze_time("2020-01-01T00:00:00Z"): + with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)): + AsyncExecuteReportScheduleCommand( + create_invalid_sql_alert_email_chart.id, datetime.utcnow() + ).run() + + # Only needed for MySQL, understand why + db.session.commit() + notification_targets = get_target_from_report_schedule( + create_invalid_sql_alert_email_chart + ) + # Assert the email smtp address, asserts a notification was sent with the error + assert email_mock.call_args[0][0] == notification_targets[0] + assert ( + get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1 + ) + + with freeze_time("2020-01-01T00:30:00Z"): + with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)): + AsyncExecuteReportScheduleCommand( + create_invalid_sql_alert_email_chart.id, datetime.utcnow() + ).run() + db.session.commit() + assert ( + get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1 + ) + + # Grace period ends, assert a notification was sent + with freeze_time("2020-01-01T01:30:00Z"): + with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)): + AsyncExecuteReportScheduleCommand( + create_invalid_sql_alert_email_chart.id, datetime.utcnow() + ).run() + db.session.commit() + assert ( + get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 2 + ) + + +@pytest.mark.usefixtures("create_invalid_sql_alert_email_chart") +@patch("superset.reports.notifications.email.send_email_smtp") +@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache") +def test_grace_period_error_flap( + screenshot_mock, email_mock, create_invalid_sql_alert_email_chart +): + """ + ExecuteReport Command: Test alert grace period on error + """ + with freeze_time("2020-01-01T00:00:00Z"): + with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)): + AsyncExecuteReportScheduleCommand( + create_invalid_sql_alert_email_chart.id, datetime.utcnow() + ).run() + db.session.commit() + # Assert we have 1 notification sent on the log + assert ( + get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1 + ) + + with freeze_time("2020-01-01T00:30:00Z"): + with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)): + AsyncExecuteReportScheduleCommand( + create_invalid_sql_alert_email_chart.id, datetime.utcnow() + ).run() + db.session.commit() + assert ( + get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1 + ) + + # Change report_schedule to valid + create_invalid_sql_alert_email_chart.sql = "SELECT 1 AS metric" + create_invalid_sql_alert_email_chart.grace_period = 0 + db.session.merge(create_invalid_sql_alert_email_chart) + db.session.commit() + + with freeze_time("2020-01-01T00:31:00Z"): + # One success + AsyncExecuteReportScheduleCommand( + create_invalid_sql_alert_email_chart.id, datetime.utcnow() + ).run() + # Grace period ends + AsyncExecuteReportScheduleCommand( + create_invalid_sql_alert_email_chart.id, datetime.utcnow() + ).run() + + db.session.commit() + + create_invalid_sql_alert_email_chart.sql = "SELECT 'first'" + create_invalid_sql_alert_email_chart.grace_period = 10 + db.session.merge(create_invalid_sql_alert_email_chart) + db.session.commit() + + # assert that after a success, when back to error we send the error notification + # again + with freeze_time("2020-01-01T00:32:00Z"): + with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)): + AsyncExecuteReportScheduleCommand( + create_invalid_sql_alert_email_chart.id, datetime.utcnow() + ).run() + db.session.commit() + assert ( + get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 2 + )