Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(reports): send notification on error with grace #13135

69 changes: 62 additions & 7 deletions superset/reports/commands/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
ReportScheduleUnexpectedError,
ReportScheduleWorkingTimeoutError,
)
from superset.reports.dao import ReportScheduleDAO
from superset.reports.dao import (
REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
ReportScheduleDAO,
)
from superset.reports.notifications import create_notification
from superset.reports.notifications.base import NotificationContent, ScreenshotData
from superset.reports.notifications.exceptions import NotificationError
Expand Down Expand Up @@ -147,6 +150,7 @@ def _get_screenshot_user(self) -> User:
def _get_screenshot(self) -> ScreenshotData:
"""
Get a chart or dashboard screenshot

:raises: ReportScheduleScreenshotFailedError
"""
screenshot: Optional[BaseScreenshot] = None
Expand All @@ -170,6 +174,7 @@ def _get_screenshot(self) -> ScreenshotData:
def _get_notification_content(self) -> NotificationContent:
"""
Gets a notification content, this is composed by a title and a screenshot

:raises: ReportScheduleScreenshotFailedError
"""
screenshot_data = self._get_screenshot()
Expand All @@ -185,14 +190,13 @@ def _get_notification_content(self) -> NotificationContent:
)
return NotificationContent(name=name, screenshot=screenshot_data)

def send(self) -> None:
def _send(self, notification_content: NotificationContent) -> None:
"""
Creates the notification content and sends them to all recipients
Sends a notification to all recipients

:raises: ReportScheduleNotificationError
"""
notification_errors = []
notification_content = self._get_notification_content()
for recipient in self._report_schedule.recipients:
notification = create_notification(recipient, notification_content)
try:
Expand All @@ -203,6 +207,24 @@ def send(self) -> None:
if notification_errors:
raise ReportScheduleNotificationError(";".join(notification_errors))

def send(self) -> None:
"""
Creates the notification content and sends them to all recipients

:raises: ReportScheduleNotificationError
"""
notification_content = self._get_notification_content()
self._send(notification_content)

def send_error(self, name: str, message: str) -> None:
"""
Creates and sends a notification for an error, to all recipients

:raises: ReportScheduleNotificationError
"""
notification_content = NotificationContent(name=name, text=message)
self._send(notification_content)

def is_in_grace_period(self) -> bool:
"""
Checks if an alert is on it's grace period
Expand All @@ -218,6 +240,23 @@ def is_in_grace_period(self) -> bool:
< last_success.end_dttm
)

def is_in_error_grace_period(self) -> bool:
"""
Checks if an alert/report on error is on it's notification grace period
"""
last_success = ReportScheduleDAO.find_last_error_notification(
self._report_schedule, session=self._session
)
if not last_success:
return False
return (
last_success is not None
and self._report_schedule.grace_period
and datetime.utcnow()
- timedelta(seconds=self._report_schedule.grace_period)
< last_success.end_dttm
)

def is_on_working_timeout(self) -> bool:
"""
Checks if an alert is on a working timeout
Expand Down Expand Up @@ -256,9 +295,25 @@ def next(self) -> None:
return
self.send()
self.set_state_and_log(ReportState.SUCCESS)
except CommandException as ex:
self.set_state_and_log(ReportState.ERROR, error_message=str(ex))
raise ex
except CommandException as first_ex:
self.set_state_and_log(ReportState.ERROR, error_message=str(first_ex))
# TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE
if not self.is_in_error_grace_period():
try:
self.send_error(
f"Error occurred for {self._report_schedule.type}:"
f" {self._report_schedule.name}",
str(first_ex),
)
self.set_state_and_log(
ReportState.ERROR,
error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
)
except CommandException as second_ex:
self.set_state_and_log(
ReportState.ERROR, error_message=str(second_ex)
)
raise first_ex


class ReportWorkingState(BaseReportState):
Expand Down
4 changes: 2 additions & 2 deletions superset/reports/commands/log_prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ def run(self) -> None:
report_schedule, from_date, session=session, commit=False
)
logger.info(
"Deleted %s logs for %s",
"Deleted %s logs for report schedule id: %s",
str(row_count),
ReportSchedule.name,
str(report_schedule.id),
)
except DAODeleteFailedError as ex:
prune_errors.append(str(ex))
Expand Down
38 changes: 38 additions & 0 deletions superset/reports/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
logger = logging.getLogger(__name__)


REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER = "Notification sent with error"


class ReportScheduleDAO(BaseDAO):
model_cls = ReportSchedule

Expand Down Expand Up @@ -223,6 +226,41 @@ def find_last_success_log(
.first()
)

@staticmethod
def find_last_error_notification(
report_schedule: ReportSchedule, session: Optional[Session] = None,
) -> Optional[ReportExecutionLog]:
"""
Finds last error email sent
"""
session = session or db.session
last_error_email_log = (
session.query(ReportExecutionLog)
.filter(
ReportExecutionLog.error_message
== REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
ReportExecutionLog.report_schedule == report_schedule,
)
.order_by(ReportExecutionLog.end_dttm.desc())
.first()
)
if not last_error_email_log:
return None
# Checks that only errors have occurred since the last email
report_from_last_email = (
session.query(ReportExecutionLog)
.filter(
ReportExecutionLog.state.notin_(
[ReportState.ERROR, ReportState.WORKING]
),
ReportExecutionLog.report_schedule == report_schedule,
ReportExecutionLog.end_dttm < last_error_email_log.end_dttm,
)
.order_by(ReportExecutionLog.end_dttm.desc())
.first()
)
return last_error_email_log if not report_from_last_email else None

@staticmethod
def bulk_delete_logs(
model: ReportSchedule,
Expand Down
3 changes: 2 additions & 1 deletion superset/reports/notifications/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class ScreenshotData:
@dataclass
class NotificationContent:
name: str
screenshot: ScreenshotData
screenshot: Optional[ScreenshotData] = None
text: Optional[str] = None


class BaseNotification: # pylint: disable=too-few-public-methods
Expand Down
43 changes: 28 additions & 15 deletions superset/reports/notifications/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import logging
from dataclasses import dataclass
from email.utils import make_msgid, parseaddr
from typing import Dict
from typing import Dict, Optional

from flask_babel import gettext as __

Expand All @@ -35,7 +35,7 @@
@dataclass
class EmailContent:
body: str
images: Dict[str, bytes]
images: Optional[Dict[str, bytes]] = None


class EmailNotification(BaseNotification): # pylint: disable=too-few-public-methods
Expand All @@ -49,22 +49,35 @@ class EmailNotification(BaseNotification): # pylint: disable=too-few-public-met
def _get_smtp_domain() -> str:
return parseaddr(app.config["SMTP_MAIL_FROM"])[1].split("@")[1]

def _get_content(self) -> EmailContent:
# Get the domain from the 'From' address ..
# and make a message id without the < > in the ends
domain = self._get_smtp_domain()
msgid = make_msgid(domain)[1:-1]

image = {msgid: self._content.screenshot.image}
body = __(
@staticmethod
def _error_template(text: str) -> str:
return __(
"""
<b><a href="%(url)s">Explore in Superset</a></b><p></p>
<img src="cid:%(msgid)s">
Error: %(text)s
""",
url=self._content.screenshot.url,
msgid=msgid,
text=text,
)
return EmailContent(body=body, images=image)

def _get_content(self) -> EmailContent:
if self._content.text:
return EmailContent(body=self._error_template(self._content.text))
# Get the domain from the 'From' address ..
# and make a message id without the < > in the end
if self._content.screenshot:
domain = self._get_smtp_domain()
msgid = make_msgid(domain)[1:-1]

image = {msgid: self._content.screenshot.image}
body = __(
"""
<b><a href="%(url)s">Explore in Superset</a></b><p></p>
<img src="cid:%(msgid)s">
""",
url=self._content.screenshot.url,
msgid=msgid,
)
return EmailContent(body=body, images=image)
return EmailContent(body=self._error_template("Unexpected missing screenshot"))

def _get_subject(self) -> str:
return __(
Expand Down
47 changes: 26 additions & 21 deletions superset/reports/notifications/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
import json
import logging
from io import IOBase
from typing import cast, Optional, Union
from typing import Optional, Union

from flask_babel import gettext as __
from retry.api import retry
from slack import WebClient
from slack.errors import SlackApiError, SlackClientError
from slack.web.slack_response import SlackResponse

from superset import app
from superset.models.reports import ReportRecipientType
Expand All @@ -44,46 +43,52 @@ class SlackNotification(BaseNotification): # pylint: disable=too-few-public-met
def _get_channel(self) -> str:
return json.loads(self._recipient.recipient_config_json)["target"]

def _get_body(self) -> str:
@staticmethod
def _error_template(name: str, text: str) -> str:
return __(
"""
*%(name)s*\n
<%(url)s|Explore in Superset>
Error: %(text)s
""",
name=self._content.name,
url=self._content.screenshot.url,
name=name,
text=text,
)

def _get_body(self) -> str:
if self._content.text:
return self._error_template(self._content.name, self._content.text)
if self._content.screenshot:
return __(
"""
*%(name)s*\n
<%(url)s|Explore in Superset>
""",
name=self._content.name,
url=self._content.screenshot.url,
)
return self._error_template(self._content.name, "Unexpected missing screenshot")

def _get_inline_screenshot(self) -> Optional[Union[str, IOBase, bytes]]:
return self._content.screenshot.image
if self._content.screenshot:
return self._content.screenshot.image
return None

@retry(SlackApiError, delay=10, backoff=2, tries=5)
def send(self) -> None:
file = self._get_inline_screenshot()
channel = self._get_channel()
body = self._get_body()

try:
client = WebClient(
token=app.config["SLACK_API_TOKEN"], proxy=app.config["SLACK_PROXY"]
)
# files_upload returns SlackResponse as we run it in sync mode.
if file:
response = cast(
SlackResponse,
client.files_upload(
channels=channel,
file=file,
initial_comment=body,
title="subject",
),
client.files_upload(
channels=channel, file=file, initial_comment=body, title="subject",
)
assert response["file"], str(response) # the uploaded file
else:
response = cast(
SlackResponse, client.chat_postMessage(channel=channel, text=body),
)
assert response["message"]["text"], str(response)
client.chat_postMessage(channel=channel, text=body)
logger.info("Report sent to slack")
except SlackClientError as ex:
raise NotificationError(ex)
2 changes: 1 addition & 1 deletion superset/utils/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ def send_mime_email(
smtp.starttls()
if smtp_user and smtp_password:
smtp.login(smtp_user, smtp_password)
logger.info("Sent an email to %s", str(e_to))
logger.debug("Sent an email to %s", str(e_to))
smtp.sendmail(e_from, e_to, mime_msg.as_string())
smtp.quit()
else:
Expand Down
Loading