From 46f7c6721c08b028b030bc572338b99059ea71be Mon Sep 17 00:00:00 2001 From: Elizabeth Thompson Date: Fri, 28 Oct 2022 17:32:55 -0700 Subject: [PATCH 1/2] log celery errors by status code --- superset/errors.py | 3 + superset/reports/commands/exceptions.py | 14 ++++ superset/reports/commands/execute.py | 35 +++++--- superset/reports/notifications/email.py | 2 +- superset/reports/notifications/exceptions.py | 30 ++++++- superset/reports/notifications/slack.py | 34 +++++++- superset/tasks/scheduler.py | 11 ++- superset/utils/log.py | 16 ++++ tests/integration_tests/event_logger_tests.py | 1 + .../reports/commands_tests.py | 81 ++++++++++++++++++- tests/unit_tests/utils/log_tests.py | 37 +++++++++ 11 files changed, 241 insertions(+), 23 deletions(-) create mode 100644 tests/unit_tests/utils/log_tests.py diff --git a/superset/errors.py b/superset/errors.py index 9198a82d3fe61..8bc84a7425c41 100644 --- a/superset/errors.py +++ b/superset/errors.py @@ -90,6 +90,9 @@ class SupersetErrorType(str, Enum): INVALID_PAYLOAD_FORMAT_ERROR = "INVALID_PAYLOAD_FORMAT_ERROR" INVALID_PAYLOAD_SCHEMA_ERROR = "INVALID_PAYLOAD_SCHEMA_ERROR" + # Report errors + REPORT_NOTIFICATION_ERROR = "REPORT_NOTIFICATION_ERROR" + ISSUE_CODES = { 1000: _("The datasource is too large to query."), diff --git a/superset/reports/commands/exceptions.py b/superset/reports/commands/exceptions.py index 89f2c82fb9ff1..a068b3c62860d 100644 --- a/superset/reports/commands/exceptions.py +++ b/superset/reports/commands/exceptions.py @@ -14,6 +14,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from typing import List + from flask_babel import lazy_gettext as _ from superset.commands.exceptions import ( @@ -24,6 +26,7 @@ ForbiddenError, ValidationError, ) +from superset.exceptions import SupersetError, SupersetErrorsException from superset.reports.models import ReportScheduleType @@ -258,6 +261,17 @@ class ReportScheduleStateNotFoundError(CommandException): message = _("Report Schedule state not found") +class ReportScheduleSystemErrorsException(CommandException, SupersetErrorsException): + errors: List[SupersetError] = [] + message = _("Report schedule system error") + + +class ReportScheduleClientErrorsException(CommandException, SupersetErrorsException): + status = 400 + errors: List[SupersetError] = [] + message = _("Report schedule client error") + + class ReportScheduleUnexpectedError(CommandException): message = _("Report schedule unexpected error") diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index 0ab6c62524406..28d921e028266 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -31,10 +31,13 @@ from superset.dashboards.permalink.commands.create import ( CreateDashboardPermalinkCommand, ) +from superset.errors import ErrorLevel, SupersetError, SupersetErrorType +from superset.exceptions import SupersetException from superset.extensions import feature_flag_manager, machine_auth_provider_factory from superset.reports.commands.alert import AlertCommand from superset.reports.commands.exceptions import ( ReportScheduleAlertGracePeriodError, + ReportScheduleClientErrorsException, ReportScheduleCsvFailedError, ReportScheduleCsvTimeout, ReportScheduleDataFrameFailedError, @@ -45,6 +48,7 @@ ReportScheduleScreenshotFailedError, ReportScheduleScreenshotTimeout, ReportScheduleStateNotFoundError, + ReportScheduleSystemErrorsException, ReportScheduleUnexpectedError, ReportScheduleWorkingTimeoutError, ) @@ -384,9 +388,9 @@ def _send( """ Sends a notification to all recipients - :raises: NotificationError + :raises: CommandException """ - notification_errors: List[NotificationError] = [] + notification_errors: List[SupersetError] = [] for recipient in recipients: notification = create_notification(recipient, notification_content) try: @@ -398,19 +402,32 @@ def _send( ) else: notification.send() - except NotificationError as ex: - # collect notification errors but keep processing them - notification_errors.append(ex) + except (NotificationError, SupersetException) as ex: + # collect errors but keep processing them + notification_errors.append( + SupersetError( + message=ex.message, + error_type=SupersetErrorType.REPORT_NOTIFICATION_ERROR, + level=ErrorLevel.ERROR + if ex.status >= 500 + else ErrorLevel.WARNING, + ) + ) if notification_errors: - # raise errors separately so that we can utilize error status codes + # log all errors but raise based on the most severe for error in notification_errors: - raise error + logger.warning(str(error)) + + if any(error.level == ErrorLevel.ERROR for error in notification_errors): + raise ReportScheduleSystemErrorsException(errors=notification_errors) + if any(error.level == ErrorLevel.WARNING for error in notification_errors): + raise ReportScheduleClientErrorsException(errors=notification_errors) def send(self) -> None: """ Creates the notification content and sends them to all recipients - :raises: NotificationError + :raises: CommandException """ notification_content = self._get_notification_content() self._send(notification_content, self._report_schedule.recipients) @@ -419,7 +436,7 @@ def send_error(self, name: str, message: str) -> None: """ Creates and sends a notification for an error, to all recipients - :raises: NotificationError + :raises: CommandException """ header_data = self._get_log_data() logger.info( diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py index 1f042ded83f5f..4526e7f729c5b 100644 --- a/superset/reports/notifications/email.py +++ b/superset/reports/notifications/email.py @@ -211,4 +211,4 @@ def send(self) -> None: "Report sent to email, notification content is %s", content.header_data ) except Exception as ex: - raise NotificationError(ex) from ex + raise NotificationError(str(ex)) from ex diff --git a/superset/reports/notifications/exceptions.py b/superset/reports/notifications/exceptions.py index 749a91fd955b0..aa06906f826b7 100644 --- a/superset/reports/notifications/exceptions.py +++ b/superset/reports/notifications/exceptions.py @@ -14,7 +14,33 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from superset.exceptions import SupersetException -class NotificationError(Exception): - pass +class NotificationError(SupersetException): + """ + Generic unknown exception - only used when + bubbling up unknown exceptions from lower levels + """ + + +class NotificationParamException(SupersetException): + status = 422 + + +class NotificationAuthorizationException(SupersetException): + status = 401 + + +class NotificationUnprocessableException(SupersetException): + """ + When a third party client service is down. + The request should be retried. There is no further + action required on our part or the user's other than to retry + """ + + status = 400 + + +class NotificationMalformedException(SupersetException): + status = 400 diff --git a/superset/reports/notifications/slack.py b/superset/reports/notifications/slack.py index 2ac311c7fbac6..8983972736094 100644 --- a/superset/reports/notifications/slack.py +++ b/superset/reports/notifications/slack.py @@ -23,12 +23,27 @@ import backoff from flask_babel import gettext as __ from slack_sdk import WebClient -from slack_sdk.errors import SlackApiError, SlackClientError +from slack_sdk.errors import ( + BotUserAccessError, + SlackApiError, + SlackClientConfigurationError, + SlackClientError, + SlackClientNotConnectedError, + SlackObjectFormationError, + SlackRequestError, + SlackTokenRotationError, +) from superset import app from superset.reports.models import ReportRecipientType from superset.reports.notifications.base import BaseNotification -from superset.reports.notifications.exceptions import NotificationError +from superset.reports.notifications.exceptions import ( + NotificationAuthorizationException, + NotificationError, + NotificationMalformedException, + NotificationParamException, + NotificationUnprocessableException, +) from superset.utils.decorators import statsd_gauge from superset.utils.urls import modify_url_query @@ -173,5 +188,18 @@ def send(self) -> None: else: client.chat_postMessage(channel=channel, text=body) logger.info("Report sent to slack") - except SlackClientError as ex: + except ( + BotUserAccessError, + SlackRequestError, + SlackClientConfigurationError, + ) as ex: + raise NotificationParamException from ex + except SlackObjectFormationError as ex: + raise NotificationMalformedException from ex + except SlackTokenRotationError as ex: + raise NotificationAuthorizationException from ex + except SlackClientNotConnectedError as ex: + raise NotificationUnprocessableException from ex + except (SlackClientError, SlackApiError) as ex: + # any other slack errors not caught above raise NotificationError(ex) from ex diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 2db721de7d101..73b833d87d4e6 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -29,6 +29,7 @@ from superset.reports.dao import ReportScheduleDAO from superset.tasks.cron_util import cron_schedule_window from superset.utils.celery import session_scope +from superset.utils.log import get_logger_from_status logger = logging.getLogger(__name__) @@ -92,11 +93,13 @@ def execute(self: Celery.task, report_schedule_id: int, scheduled_dttm: str) -> "An unexpected occurred while executing the report: %s", task_id ) self.update_state(state="FAILURE") - except CommandException: - logger.exception( - "A downstream exception occurred while generating" " a report: %s", task_id + except CommandException as ex: + logger_func, level = get_logger_from_status(ex.status) + logger_func( + "A downstream %s occurred while generating a report: %s", level, task_id ) - self.update_state(state="FAILURE") + if level == "exception": + self.update_state(state="FAILURE") @celery_app.task(name="reports.prune_log") diff --git a/superset/utils/log.py b/superset/utils/log.py index ef7b290b2eabc..3c1ccc5e71f5d 100644 --- a/superset/utils/log.py +++ b/superset/utils/log.py @@ -31,6 +31,7 @@ Dict, Iterator, Optional, + Tuple, Type, TYPE_CHECKING, Union, @@ -46,6 +47,8 @@ if TYPE_CHECKING: from superset.stats_logger import BaseStatsLogger +logger = logging.getLogger(__name__) + def collect_request_payload() -> Dict[str, Any]: """Collect log payload identifiable from request context""" @@ -75,6 +78,19 @@ def collect_request_payload() -> Dict[str, Any]: return payload +def get_logger_from_status( + status: int, +) -> Tuple[Callable[..., None], str]: + """ + Return logger method by status of exception. + Maps logger level to status code level + """ + log_map = {"2": "info", "3": "info", "4": "warning", "5": "exception"} + log_level = log_map[str(status)[0]] + + return (getattr(logger, log_level), log_level) + + class AbstractEventLogger(ABC): def __call__( self, diff --git a/tests/integration_tests/event_logger_tests.py b/tests/integration_tests/event_logger_tests.py index 4553bb9dc789b..2a1cd97aac6d6 100644 --- a/tests/integration_tests/event_logger_tests.py +++ b/tests/integration_tests/event_logger_tests.py @@ -29,6 +29,7 @@ AbstractEventLogger, DBEventLogger, get_event_logger_from_cfg_value, + get_logger_from_status, ) from tests.integration_tests.test_app import app diff --git a/tests/integration_tests/reports/commands_tests.py b/tests/integration_tests/reports/commands_tests.py index 2dd1c461cafc1..288e6746cc5c7 100644 --- a/tests/integration_tests/reports/commands_tests.py +++ b/tests/integration_tests/reports/commands_tests.py @@ -18,7 +18,7 @@ from contextlib import contextmanager from datetime import datetime, timedelta, timezone from typing import List, Optional -from unittest.mock import Mock, patch +from unittest.mock import call, Mock, patch from uuid import uuid4 import pytest @@ -29,6 +29,7 @@ from sqlalchemy.sql import func from superset import db +from superset.exceptions import SupersetException from superset.models.core import Database from superset.models.dashboard import Dashboard from superset.models.slice import Slice @@ -37,16 +38,22 @@ AlertQueryInvalidTypeError, AlertQueryMultipleColumnsError, AlertQueryMultipleRowsError, + ReportScheduleClientErrorsException, ReportScheduleCsvFailedError, ReportScheduleCsvTimeout, + ReportScheduleForbiddenError, ReportScheduleNotFoundError, ReportSchedulePreviousWorkingError, ReportScheduleScreenshotFailedError, ReportScheduleScreenshotTimeout, + ReportScheduleSystemErrorsException, ReportScheduleUnexpectedError, ReportScheduleWorkingTimeoutError, ) -from superset.reports.commands.execute import AsyncExecuteReportScheduleCommand +from superset.reports.commands.execute import ( + AsyncExecuteReportScheduleCommand, + BaseReportState, +) from superset.reports.commands.log_prune import AsyncPruneReportScheduleLogCommand from superset.reports.models import ( ReportDataFormat, @@ -56,6 +63,10 @@ ReportScheduleValidatorType, ReportState, ) +from superset.reports.notifications.exceptions import ( + NotificationError, + NotificationParamException, +) from superset.reports.types import ReportScheduleExecutor from superset.utils.database import get_example_database from tests.integration_tests.fixtures.birth_names_dashboard import ( @@ -115,7 +126,6 @@ def assert_log(state: str, error_message: Optional[str] = None): if state == ReportState.ERROR: # On error we send an email - print(logs) assert len(logs) == 3 else: assert len(logs) == 2 @@ -1392,7 +1402,7 @@ def test_email_dashboard_report_fails( screenshot_mock.return_value = SCREENSHOT_FILE email_mock.side_effect = SMTPException("Could not connect to SMTP XPTO") - with pytest.raises(ReportScheduleUnexpectedError): + with pytest.raises(ReportScheduleSystemErrorsException): AsyncExecuteReportScheduleCommand( TEST_ID, create_report_email_dashboard.id, datetime.utcnow() ).run() @@ -1886,3 +1896,66 @@ def test_prune_log_soft_time_out(bulk_delete_logs, create_report_email_dashboard with pytest.raises(SoftTimeLimitExceeded) as excinfo: AsyncPruneReportScheduleLogCommand().run() assert str(excinfo.value) == "SoftTimeLimitExceeded()" + + +@patch("superset.reports.commands.execute.logger") +@patch("superset.reports.commands.execute.create_notification") +def test__send_with_client_errors(notification_mock, logger_mock): + notification_content = "I am some content" + recipients = ["test@foo.com"] + notification_mock.return_value.send.side_effect = NotificationParamException() + with pytest.raises(ReportScheduleClientErrorsException) as excinfo: + BaseReportState._send(BaseReportState, notification_content, recipients) + + assert excinfo.errisinstance(SupersetException) + logger_mock.warning.assert_called_with( + ( + "SupersetError(message='', error_type=, level=, extra=None)" + ) + ) + + +@patch("superset.reports.commands.execute.logger") +@patch("superset.reports.commands.execute.create_notification") +def test__send_with_multiple_errors(notification_mock, logger_mock): + notification_content = "I am some content" + recipients = ["test@foo.com", "test2@bar.com"] + notification_mock.return_value.send.side_effect = [ + NotificationParamException(), + NotificationError(), + ] + # it raises the error with a 500 status if present + with pytest.raises(ReportScheduleSystemErrorsException) as excinfo: + BaseReportState._send(BaseReportState, notification_content, recipients) + + assert excinfo.errisinstance(SupersetException) + # it logs both errors as warnings + logger_mock.warning.assert_has_calls( + [ + call( + "SupersetError(message='', error_type=, level=, extra=None)" + ), + call( + "SupersetError(message='', error_type=, level=, extra=None)" + ), + ] + ) + + +@patch("superset.reports.commands.execute.logger") +@patch("superset.reports.commands.execute.create_notification") +def test__send_with_server_errors(notification_mock, logger_mock): + + notification_content = "I am some content" + recipients = ["test@foo.com"] + notification_mock.return_value.send.side_effect = NotificationError() + with pytest.raises(ReportScheduleSystemErrorsException) as excinfo: + BaseReportState._send(BaseReportState, notification_content, recipients) + + assert excinfo.errisinstance(SupersetException) + # it logs the error + logger_mock.warning.assert_called_with( + ( + "SupersetError(message='', error_type=, level=, extra=None)" + ) + ) diff --git a/tests/unit_tests/utils/log_tests.py b/tests/unit_tests/utils/log_tests.py new file mode 100644 index 0000000000000..5b031b5778875 --- /dev/null +++ b/tests/unit_tests/utils/log_tests.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from superset.utils.log import get_logger_from_status + + +def test_log_from_status_exception() -> None: + (func, log_level) = get_logger_from_status(500) + assert func.__name__ == "exception" + assert log_level == "exception" + + +def test_log_from_status_warning() -> None: + (func, log_level) = get_logger_from_status(422) + assert func.__name__ == "warning" + assert log_level == "warning" + + +def test_log_from_status_info() -> None: + (func, log_level) = get_logger_from_status(300) + assert func.__name__ == "info" + assert log_level == "info" From baa3c26d3e0aca2ce4ae93eeabb7382910e9ad85 Mon Sep 17 00:00:00 2001 From: Elizabeth Thompson Date: Tue, 8 Nov 2022 09:43:08 -0800 Subject: [PATCH 2/2] better error messages --- superset/reports/commands/execute.py | 24 ++++++++++++------- superset/reports/notifications/email.py | 5 ++++ superset/tasks/scheduler.py | 3 ++- superset/utils/core.py | 6 +++++ superset/utils/log.py | 9 +++++-- tests/integration_tests/event_logger_tests.py | 1 - 6 files changed, 36 insertions(+), 12 deletions(-) diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index 28d921e028266..d20775ffd6d3b 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -32,7 +32,7 @@ CreateDashboardPermalinkCommand, ) from superset.errors import ErrorLevel, SupersetError, SupersetErrorType -from superset.exceptions import SupersetException +from superset.exceptions import SupersetErrorsException, SupersetException from superset.extensions import feature_flag_manager, machine_auth_provider_factory from superset.reports.commands.alert import AlertCommand from superset.reports.commands.exceptions import ( @@ -106,7 +106,6 @@ def update_report_schedule_and_log( Update the report schedule state et al. and reflect the change in the execution log. """ - self.update_report_schedule(state) self.create_log(error_message) @@ -534,25 +533,34 @@ def next(self) -> None: return self.send() self.update_report_schedule_and_log(ReportState.SUCCESS) - except Exception as first_ex: + except (SupersetErrorsException, Exception) as first_ex: + error_message = str(first_ex) + if isinstance(first_ex, SupersetErrorsException): + error_message = ";".join([error.message for error in first_ex.errors]) + self.update_report_schedule_and_log( - ReportState.ERROR, error_message=str(first_ex) + ReportState.ERROR, error_message=error_message ) + # TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE if not self.is_in_error_grace_period(): + second_error_message = REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER try: self.send_error( f"Error occurred for {self._report_schedule.type}:" f" {self._report_schedule.name}", str(first_ex), ) - self.update_report_schedule_and_log( - ReportState.ERROR, - error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER, + + except SupersetErrorsException as second_ex: + second_error_message = ";".join( + [error.message for error in second_ex.errors] ) except Exception as second_ex: # pylint: disable=broad-except + second_error_message = str(second_ex) + finally: self.update_report_schedule_and_log( - ReportState.ERROR, error_message=str(second_ex) + ReportState.ERROR, error_message=second_error_message ) raise first_ex diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py index 4526e7f729c5b..f8b38cc3eae26 100644 --- a/superset/reports/notifications/email.py +++ b/superset/reports/notifications/email.py @@ -26,6 +26,7 @@ from flask_babel import gettext as __ from superset import app +from superset.exceptions import SupersetErrorsException from superset.reports.models import ReportRecipientType from superset.reports.notifications.base import BaseNotification from superset.reports.notifications.exceptions import NotificationError @@ -210,5 +211,9 @@ def send(self) -> None: logger.info( "Report sent to email, notification content is %s", content.header_data ) + except SupersetErrorsException as ex: + raise NotificationError( + ";".join([error.message for error in ex.errors]) + ) from ex except Exception as ex: raise NotificationError(str(ex)) from ex diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 73b833d87d4e6..7472f68554121 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -29,6 +29,7 @@ from superset.reports.dao import ReportScheduleDAO from superset.tasks.cron_util import cron_schedule_window from superset.utils.celery import session_scope +from superset.utils.core import LoggerLevel from superset.utils.log import get_logger_from_status logger = logging.getLogger(__name__) @@ -98,7 +99,7 @@ def execute(self: Celery.task, report_schedule_id: int, scheduled_dttm: str) -> logger_func( "A downstream %s occurred while generating a report: %s", level, task_id ) - if level == "exception": + if level == LoggerLevel.EXCEPTION: self.update_state(state="FAILURE") diff --git a/superset/utils/core.py b/superset/utils/core.py index cd992250ee7d6..ea2bd65880a81 100644 --- a/superset/utils/core.py +++ b/superset/utils/core.py @@ -190,6 +190,12 @@ class DatasourceType(str, Enum): VIEW = "view" +class LoggerLevel(str, Enum): + INFO = "info" + WARNING = "warning" + EXCEPTION = "exception" + + class HeaderDataType(TypedDict): notification_format: str owners: List[int] diff --git a/superset/utils/log.py b/superset/utils/log.py index 3c1ccc5e71f5d..a52556ae64879 100644 --- a/superset/utils/log.py +++ b/superset/utils/log.py @@ -42,7 +42,7 @@ from sqlalchemy.exc import SQLAlchemyError from typing_extensions import Literal -from superset.utils.core import get_user_id +from superset.utils.core import get_user_id, LoggerLevel if TYPE_CHECKING: from superset.stats_logger import BaseStatsLogger @@ -85,7 +85,12 @@ def get_logger_from_status( Return logger method by status of exception. Maps logger level to status code level """ - log_map = {"2": "info", "3": "info", "4": "warning", "5": "exception"} + log_map = { + "2": LoggerLevel.INFO, + "3": LoggerLevel.INFO, + "4": LoggerLevel.WARNING, + "5": LoggerLevel.EXCEPTION, + } log_level = log_map[str(status)[0]] return (getattr(logger, log_level), log_level) diff --git a/tests/integration_tests/event_logger_tests.py b/tests/integration_tests/event_logger_tests.py index 2a1cd97aac6d6..4553bb9dc789b 100644 --- a/tests/integration_tests/event_logger_tests.py +++ b/tests/integration_tests/event_logger_tests.py @@ -29,7 +29,6 @@ AbstractEventLogger, DBEventLogger, get_event_logger_from_cfg_value, - get_logger_from_status, ) from tests.integration_tests.test_app import app