diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py
index ddf4f19cfccb5..774f2c897d8ba 100644
--- a/superset/reports/commands/execute.py
+++ b/superset/reports/commands/execute.py
@@ -44,7 +44,10 @@
ReportScheduleUnexpectedError,
ReportScheduleWorkingTimeoutError,
)
-from superset.reports.dao import ReportScheduleDAO
+from superset.reports.dao import (
+ REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
+ ReportScheduleDAO,
+)
from superset.reports.notifications import create_notification
from superset.reports.notifications.base import NotificationContent, ScreenshotData
from superset.reports.notifications.exceptions import NotificationError
@@ -147,6 +150,7 @@ def _get_screenshot_user(self) -> User:
def _get_screenshot(self) -> ScreenshotData:
"""
Get a chart or dashboard screenshot
+
:raises: ReportScheduleScreenshotFailedError
"""
screenshot: Optional[BaseScreenshot] = None
@@ -170,6 +174,7 @@ def _get_screenshot(self) -> ScreenshotData:
def _get_notification_content(self) -> NotificationContent:
"""
Gets a notification content, this is composed by a title and a screenshot
+
:raises: ReportScheduleScreenshotFailedError
"""
screenshot_data = self._get_screenshot()
@@ -185,14 +190,13 @@ def _get_notification_content(self) -> NotificationContent:
)
return NotificationContent(name=name, screenshot=screenshot_data)
- def send(self) -> None:
+ def _send(self, notification_content: NotificationContent) -> None:
"""
- Creates the notification content and sends them to all recipients
+ Sends a notification to all recipients
:raises: ReportScheduleNotificationError
"""
notification_errors = []
- notification_content = self._get_notification_content()
for recipient in self._report_schedule.recipients:
notification = create_notification(recipient, notification_content)
try:
@@ -203,6 +207,24 @@ def send(self) -> None:
if notification_errors:
raise ReportScheduleNotificationError(";".join(notification_errors))
+ def send(self) -> None:
+ """
+ Creates the notification content and sends them to all recipients
+
+ :raises: ReportScheduleNotificationError
+ """
+ notification_content = self._get_notification_content()
+ self._send(notification_content)
+
+ def send_error(self, name: str, message: str) -> None:
+ """
+ Creates and sends a notification for an error, to all recipients
+
+ :raises: ReportScheduleNotificationError
+ """
+ notification_content = NotificationContent(name=name, text=message)
+ self._send(notification_content)
+
def is_in_grace_period(self) -> bool:
"""
Checks if an alert is on it's grace period
@@ -218,6 +240,23 @@ def is_in_grace_period(self) -> bool:
< last_success.end_dttm
)
+ def is_in_error_grace_period(self) -> bool:
+ """
+ Checks if an alert/report on error is on it's notification grace period
+ """
+ last_success = ReportScheduleDAO.find_last_error_notification(
+ self._report_schedule, session=self._session
+ )
+ if not last_success:
+ return False
+ return (
+ last_success is not None
+ and self._report_schedule.grace_period
+ and datetime.utcnow()
+ - timedelta(seconds=self._report_schedule.grace_period)
+ < last_success.end_dttm
+ )
+
def is_on_working_timeout(self) -> bool:
"""
Checks if an alert is on a working timeout
@@ -256,9 +295,25 @@ def next(self) -> None:
return
self.send()
self.set_state_and_log(ReportState.SUCCESS)
- except CommandException as ex:
- self.set_state_and_log(ReportState.ERROR, error_message=str(ex))
- raise ex
+ except CommandException as first_ex:
+ self.set_state_and_log(ReportState.ERROR, error_message=str(first_ex))
+ # TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE
+ if not self.is_in_error_grace_period():
+ try:
+ self.send_error(
+ f"Error occurred for {self._report_schedule.type}:"
+ f" {self._report_schedule.name}",
+ str(first_ex),
+ )
+ self.set_state_and_log(
+ ReportState.ERROR,
+ error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
+ )
+ except CommandException as second_ex:
+ self.set_state_and_log(
+ ReportState.ERROR, error_message=str(second_ex)
+ )
+ raise first_ex
class ReportWorkingState(BaseReportState):
diff --git a/superset/reports/commands/log_prune.py b/superset/reports/commands/log_prune.py
index 4280d1b46fed8..4b577b1f55dca 100644
--- a/superset/reports/commands/log_prune.py
+++ b/superset/reports/commands/log_prune.py
@@ -50,9 +50,9 @@ def run(self) -> None:
report_schedule, from_date, session=session, commit=False
)
logger.info(
- "Deleted %s logs for %s",
+ "Deleted %s logs for report schedule id: %s",
str(row_count),
- ReportSchedule.name,
+ str(report_schedule.id),
)
except DAODeleteFailedError as ex:
prune_errors.append(str(ex))
diff --git a/superset/reports/dao.py b/superset/reports/dao.py
index 4e3e9495404c2..5ffa0f0f10aa2 100644
--- a/superset/reports/dao.py
+++ b/superset/reports/dao.py
@@ -35,6 +35,9 @@
logger = logging.getLogger(__name__)
+REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER = "Notification sent with error"
+
+
class ReportScheduleDAO(BaseDAO):
model_cls = ReportSchedule
@@ -223,6 +226,41 @@ def find_last_success_log(
.first()
)
+ @staticmethod
+ def find_last_error_notification(
+ report_schedule: ReportSchedule, session: Optional[Session] = None,
+ ) -> Optional[ReportExecutionLog]:
+ """
+ Finds last error email sent
+ """
+ session = session or db.session
+ last_error_email_log = (
+ session.query(ReportExecutionLog)
+ .filter(
+ ReportExecutionLog.error_message
+ == REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
+ ReportExecutionLog.report_schedule == report_schedule,
+ )
+ .order_by(ReportExecutionLog.end_dttm.desc())
+ .first()
+ )
+ if not last_error_email_log:
+ return None
+ # Checks that only errors have occurred since the last email
+ report_from_last_email = (
+ session.query(ReportExecutionLog)
+ .filter(
+ ReportExecutionLog.state.notin_(
+ [ReportState.ERROR, ReportState.WORKING]
+ ),
+ ReportExecutionLog.report_schedule == report_schedule,
+ ReportExecutionLog.end_dttm < last_error_email_log.end_dttm,
+ )
+ .order_by(ReportExecutionLog.end_dttm.desc())
+ .first()
+ )
+ return last_error_email_log if not report_from_last_email else None
+
@staticmethod
def bulk_delete_logs(
model: ReportSchedule,
diff --git a/superset/reports/notifications/base.py b/superset/reports/notifications/base.py
index f55154c1e7430..5fe7fe9bcb791 100644
--- a/superset/reports/notifications/base.py
+++ b/superset/reports/notifications/base.py
@@ -30,7 +30,8 @@ class ScreenshotData:
@dataclass
class NotificationContent:
name: str
- screenshot: ScreenshotData
+ screenshot: Optional[ScreenshotData] = None
+ text: Optional[str] = None
class BaseNotification: # pylint: disable=too-few-public-methods
diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py
index e99a7f43e37da..f2bd6e6fd8577 100644
--- a/superset/reports/notifications/email.py
+++ b/superset/reports/notifications/email.py
@@ -19,7 +19,7 @@
import logging
from dataclasses import dataclass
from email.utils import make_msgid, parseaddr
-from typing import Dict
+from typing import Dict, Optional
from flask_babel import gettext as __
@@ -35,7 +35,7 @@
@dataclass
class EmailContent:
body: str
- images: Dict[str, bytes]
+ images: Optional[Dict[str, bytes]] = None
class EmailNotification(BaseNotification): # pylint: disable=too-few-public-methods
@@ -49,22 +49,35 @@ class EmailNotification(BaseNotification): # pylint: disable=too-few-public-met
def _get_smtp_domain() -> str:
return parseaddr(app.config["SMTP_MAIL_FROM"])[1].split("@")[1]
- def _get_content(self) -> EmailContent:
- # Get the domain from the 'From' address ..
- # and make a message id without the < > in the ends
- domain = self._get_smtp_domain()
- msgid = make_msgid(domain)[1:-1]
-
- image = {msgid: self._content.screenshot.image}
- body = __(
+ @staticmethod
+ def _error_template(text: str) -> str:
+ return __(
"""
- Explore in Superset
-
+ Error: %(text)s
""",
- url=self._content.screenshot.url,
- msgid=msgid,
+ text=text,
)
- return EmailContent(body=body, images=image)
+
+ def _get_content(self) -> EmailContent:
+ if self._content.text:
+ return EmailContent(body=self._error_template(self._content.text))
+ # Get the domain from the 'From' address ..
+ # and make a message id without the < > in the end
+ if self._content.screenshot:
+ domain = self._get_smtp_domain()
+ msgid = make_msgid(domain)[1:-1]
+
+ image = {msgid: self._content.screenshot.image}
+ body = __(
+ """
+ Explore in Superset
+
+ """,
+ url=self._content.screenshot.url,
+ msgid=msgid,
+ )
+ return EmailContent(body=body, images=image)
+ return EmailContent(body=self._error_template("Unexpected missing screenshot"))
def _get_subject(self) -> str:
return __(
diff --git a/superset/reports/notifications/slack.py b/superset/reports/notifications/slack.py
index 8e859ffc894fc..84f858c1e2eaf 100644
--- a/superset/reports/notifications/slack.py
+++ b/superset/reports/notifications/slack.py
@@ -18,13 +18,12 @@
import json
import logging
from io import IOBase
-from typing import cast, Optional, Union
+from typing import Optional, Union
from flask_babel import gettext as __
from retry.api import retry
from slack import WebClient
from slack.errors import SlackApiError, SlackClientError
-from slack.web.slack_response import SlackResponse
from superset import app
from superset.models.reports import ReportRecipientType
@@ -44,46 +43,52 @@ class SlackNotification(BaseNotification): # pylint: disable=too-few-public-met
def _get_channel(self) -> str:
return json.loads(self._recipient.recipient_config_json)["target"]
- def _get_body(self) -> str:
+ @staticmethod
+ def _error_template(name: str, text: str) -> str:
return __(
"""
*%(name)s*\n
- <%(url)s|Explore in Superset>
+ Error: %(text)s
""",
- name=self._content.name,
- url=self._content.screenshot.url,
+ name=name,
+ text=text,
)
+ def _get_body(self) -> str:
+ if self._content.text:
+ return self._error_template(self._content.name, self._content.text)
+ if self._content.screenshot:
+ return __(
+ """
+ *%(name)s*\n
+ <%(url)s|Explore in Superset>
+ """,
+ name=self._content.name,
+ url=self._content.screenshot.url,
+ )
+ return self._error_template(self._content.name, "Unexpected missing screenshot")
+
def _get_inline_screenshot(self) -> Optional[Union[str, IOBase, bytes]]:
- return self._content.screenshot.image
+ if self._content.screenshot:
+ return self._content.screenshot.image
+ return None
@retry(SlackApiError, delay=10, backoff=2, tries=5)
def send(self) -> None:
file = self._get_inline_screenshot()
channel = self._get_channel()
body = self._get_body()
-
try:
client = WebClient(
token=app.config["SLACK_API_TOKEN"], proxy=app.config["SLACK_PROXY"]
)
# files_upload returns SlackResponse as we run it in sync mode.
if file:
- response = cast(
- SlackResponse,
- client.files_upload(
- channels=channel,
- file=file,
- initial_comment=body,
- title="subject",
- ),
+ client.files_upload(
+ channels=channel, file=file, initial_comment=body, title="subject",
)
- assert response["file"], str(response) # the uploaded file
else:
- response = cast(
- SlackResponse, client.chat_postMessage(channel=channel, text=body),
- )
- assert response["message"]["text"], str(response)
+ client.chat_postMessage(channel=channel, text=body)
logger.info("Report sent to slack")
except SlackClientError as ex:
raise NotificationError(ex)
diff --git a/superset/utils/core.py b/superset/utils/core.py
index 1b5ef70ee6086..4efad98bcc716 100644
--- a/superset/utils/core.py
+++ b/superset/utils/core.py
@@ -958,7 +958,7 @@ def send_mime_email(
smtp.starttls()
if smtp_user and smtp_password:
smtp.login(smtp_user, smtp_password)
- logger.info("Sent an email to %s", str(e_to))
+ logger.debug("Sent an email to %s", str(e_to))
smtp.sendmail(e_from, e_to, mime_msg.as_string())
smtp.quit()
else:
diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py
index e25b6ecef7922..1de97ebec5bc3 100644
--- a/tests/reports/commands_tests.py
+++ b/tests/reports/commands_tests.py
@@ -21,6 +21,7 @@
import pytest
from contextlib2 import contextmanager
+from flask_sqlalchemy import BaseQuery
from freezegun import freeze_time
from sqlalchemy.sql import func
@@ -62,13 +63,34 @@
)
-def get_target_from_report_schedule(report_schedule) -> List[str]:
+def get_target_from_report_schedule(report_schedule: ReportSchedule) -> List[str]:
return [
json.loads(recipient.recipient_config_json)["target"]
for recipient in report_schedule.recipients
]
+def get_error_logs_query(report_schedule: ReportSchedule) -> BaseQuery:
+ return (
+ db.session.query(ReportExecutionLog)
+ .filter(
+ ReportExecutionLog.report_schedule == report_schedule,
+ ReportExecutionLog.state == ReportState.ERROR,
+ )
+ .order_by(ReportExecutionLog.end_dttm.desc())
+ )
+
+
+def get_notification_error_sent_count(report_schedule: ReportSchedule) -> int:
+ logs = get_error_logs_query(report_schedule).all()
+ notification_sent_logs = [
+ log.error_message
+ for log in logs
+ if log.error_message == "Notification sent with error"
+ ]
+ return len(notification_sent_logs)
+
+
def assert_log(state: str, error_message: Optional[str] = None):
db.session.commit()
logs = db.session.query(ReportExecutionLog).all()
@@ -77,7 +99,11 @@ def assert_log(state: str, error_message: Optional[str] = None):
assert logs[0].error_message == error_message
assert logs[0].state == state
return
- assert len(logs) == 2
+ # On error we send an email
+ if state == ReportState.ERROR:
+ assert len(logs) == 3
+ else:
+ assert len(logs) == 2
log_states = [log.state for log in logs]
assert ReportState.WORKING in log_states
assert state in log_states
@@ -94,6 +120,7 @@ def create_report_notification(
report_type: Optional[str] = None,
validator_type: Optional[str] = None,
validator_config_json: Optional[str] = None,
+ grace_period: Optional[int] = None,
) -> ReportSchedule:
report_type = report_type or ReportScheduleType.REPORT
target = email_target or slack_channel
@@ -121,6 +148,7 @@ def create_report_notification(
recipients=[recipient],
validator_type=validator_type,
validator_config_json=validator_config_json,
+ grace_period=grace_period,
)
return report_schedule
@@ -464,6 +492,7 @@ def create_invalid_sql_alert_email_chart(request):
validator_config_json=param_config[request.param][
"validator_config_json"
],
+ grace_period=60 * 60,
)
yield report_schedule
@@ -766,7 +795,8 @@ def test_email_mul_alert(create_mul_alert_email_chart):
@pytest.mark.usefixtures("create_invalid_sql_alert_email_chart")
-def test_invalid_sql_alert(create_invalid_sql_alert_email_chart):
+@patch("superset.reports.notifications.email.send_email_smtp")
+def test_invalid_sql_alert(email_mock, create_invalid_sql_alert_email_chart):
"""
ExecuteReport Command: Test alert with invalid SQL statements
"""
@@ -775,3 +805,120 @@ def test_invalid_sql_alert(create_invalid_sql_alert_email_chart):
AsyncExecuteReportScheduleCommand(
create_invalid_sql_alert_email_chart.id, datetime.utcnow()
).run()
+
+ notification_targets = get_target_from_report_schedule(
+ create_invalid_sql_alert_email_chart
+ )
+ # Assert the email smtp address, asserts a notification was sent with the error
+ assert email_mock.call_args[0][0] == notification_targets[0]
+
+
+@pytest.mark.usefixtures("create_invalid_sql_alert_email_chart")
+@patch("superset.reports.notifications.email.send_email_smtp")
+def test_grace_period_error(email_mock, create_invalid_sql_alert_email_chart):
+ """
+ ExecuteReport Command: Test alert grace period on error
+ """
+ with freeze_time("2020-01-01T00:00:00Z"):
+ with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+ AsyncExecuteReportScheduleCommand(
+ create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+ ).run()
+
+ # Only needed for MySQL, understand why
+ db.session.commit()
+ notification_targets = get_target_from_report_schedule(
+ create_invalid_sql_alert_email_chart
+ )
+ # Assert the email smtp address, asserts a notification was sent with the error
+ assert email_mock.call_args[0][0] == notification_targets[0]
+ assert (
+ get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1
+ )
+
+ with freeze_time("2020-01-01T00:30:00Z"):
+ with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+ AsyncExecuteReportScheduleCommand(
+ create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+ ).run()
+ db.session.commit()
+ assert (
+ get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1
+ )
+
+ # Grace period ends, assert a notification was sent
+ with freeze_time("2020-01-01T01:30:00Z"):
+ with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+ AsyncExecuteReportScheduleCommand(
+ create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+ ).run()
+ db.session.commit()
+ assert (
+ get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 2
+ )
+
+
+@pytest.mark.usefixtures("create_invalid_sql_alert_email_chart")
+@patch("superset.reports.notifications.email.send_email_smtp")
+@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
+def test_grace_period_error_flap(
+ screenshot_mock, email_mock, create_invalid_sql_alert_email_chart
+):
+ """
+ ExecuteReport Command: Test alert grace period on error
+ """
+ with freeze_time("2020-01-01T00:00:00Z"):
+ with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+ AsyncExecuteReportScheduleCommand(
+ create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+ ).run()
+ db.session.commit()
+ # Assert we have 1 notification sent on the log
+ assert (
+ get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1
+ )
+
+ with freeze_time("2020-01-01T00:30:00Z"):
+ with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+ AsyncExecuteReportScheduleCommand(
+ create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+ ).run()
+ db.session.commit()
+ assert (
+ get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1
+ )
+
+ # Change report_schedule to valid
+ create_invalid_sql_alert_email_chart.sql = "SELECT 1 AS metric"
+ create_invalid_sql_alert_email_chart.grace_period = 0
+ db.session.merge(create_invalid_sql_alert_email_chart)
+ db.session.commit()
+
+ with freeze_time("2020-01-01T00:31:00Z"):
+ # One success
+ AsyncExecuteReportScheduleCommand(
+ create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+ ).run()
+ # Grace period ends
+ AsyncExecuteReportScheduleCommand(
+ create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+ ).run()
+
+ db.session.commit()
+
+ create_invalid_sql_alert_email_chart.sql = "SELECT 'first'"
+ create_invalid_sql_alert_email_chart.grace_period = 10
+ db.session.merge(create_invalid_sql_alert_email_chart)
+ db.session.commit()
+
+ # assert that after a success, when back to error we send the error notification
+ # again
+ with freeze_time("2020-01-01T00:32:00Z"):
+ with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+ AsyncExecuteReportScheduleCommand(
+ create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+ ).run()
+ db.session.commit()
+ assert (
+ get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 2
+ )