From cddfd4aa9b7eaaa4bb524615e71abef7ac3ae51b Mon Sep 17 00:00:00 2001 From: Jason Davis <@dropbox.com> Date: Sun, 2 Aug 2020 18:16:02 -0700 Subject: [PATCH 1/4] refractored alerting to not pass sqlalchemy obj as args --- superset/tasks/schedules.py | 60 ++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/superset/tasks/schedules.py b/superset/tasks/schedules.py index 7b39362aec037..af7ff9c7b672e 100644 --- a/superset/tasks/schedules.py +++ b/superset/tasks/schedules.py @@ -53,6 +53,7 @@ from superset import app, db, security_manager, thumbnail_cache from superset.extensions import celery_app from superset.models.alerts import Alert, AlertLog +from superset.models.core import Database from superset.models.dashboard import Dashboard from superset.models.schedules import ( EmailDeliveryType, @@ -551,10 +552,17 @@ def schedule_alert_query( # pylint: disable=unused-argument if report_type == ScheduleType.alert: if is_test_alert and recipients: - deliver_alert(schedule, recipients) + deliver_alert(schedule.id, schedule.slice.id, schedule.label, recipients) return - if run_alert_query(schedule): + if run_alert_query( + schedule.id, + schedule.database_id, + schedule.slice.id, + schedule.sql, + schedule.label, + schedule.recipients, + ): # deliver_dashboard OR deliver_slice return else: @@ -567,21 +575,21 @@ class AlertState: PASS = "pass" -def deliver_alert(alert: Alert, recipients: Optional[str] = None) -> None: - logging.info("Triggering alert: %s", alert) +def deliver_alert(alert_id: int, slice_id: int, label: str, recipients: str) -> None: + logging.info("Triggering alert: <%s:%s>", alert_id, label) img_data = None images = {} - recipients = recipients or alert.recipients + alert_slice = db.session.query(Slice).get(slice_id) - if alert.slice: + if alert_slice: chart_url = get_url_path( - "Superset.slice", slice_id=alert.slice.id, standalone="true" + "Superset.slice", slice_id=alert_slice.id, standalone="true" ) - screenshot = ChartScreenshot(chart_url, alert.slice.digest) + screenshot = ChartScreenshot(chart_url, alert_slice.digest) cache_key = screenshot.cache_key() image_url = get_url_path( - "ChartRestApi.screenshot", pk=alert.slice.id, digest=cache_key + "ChartRestApi.screenshot", pk=alert_slice.id, digest=cache_key ) user = security_manager.find_user(current_app.config["THUMBNAIL_SELENIUM_USER"]) @@ -593,7 +601,7 @@ def deliver_alert(alert: Alert, recipients: Optional[str] = None) -> None: image_url = "https://media.giphy.com/media/dzaUX7CAG0Ihi/giphy.gif" # generate the email - subject = f"[Superset] Triggered alert: {alert.label}" + subject = f"[Superset] Triggered alert: {label}" deliver_as_group = False data = None if img_data: @@ -605,28 +613,36 @@ def deliver_alert(alert: Alert, recipients: Optional[str] = None) -> None: %(label)s """ ), - label=alert.label, + label=label, image_url=image_url, ) _deliver_email(recipients, deliver_as_group, subject, body, data, images) -def run_alert_query(alert: Alert) -> Optional[bool]: +def run_alert_query( + alert_id: int, + database_id: int, + slice_id: int, + sql: str, + label: str, + recipients: str, +) -> Optional[bool]: """ Execute alert.sql and return value if any rows are returned + database_id, sql, alert_id, slice_id, label """ - logger.info("Processing alert ID: %i", alert.id) - database = alert.database + logger.info("Processing alert ID: %i", alert_id) + database = db.session.query(Database).get(database_id) if not database: logger.error("Alert database not preset") return None - if not alert.sql: + if not sql: logger.error("Alert SQL not preset") return None - parsed_query = ParsedQuery(alert.sql) + parsed_query = ParsedQuery(sql) sql = parsed_query.stripped() state = None @@ -634,27 +650,31 @@ def run_alert_query(alert: Alert) -> Optional[bool]: df = pd.DataFrame() try: - logger.info("Evaluating SQL for alert %s", alert) + logger.info("Evaluating SQL for alert <%s:%s>", alert_id, label) df = database.get_df(sql) except Exception as exc: # pylint: disable=broad-except state = AlertState.ERROR logging.exception(exc) - logging.error("Failed at evaluating alert: %s (%s)", alert.label, alert.id) + logging.error("Failed at evaluating alert: %s (%s)", label, alert_id) dttm_end = datetime.utcnow() if state != AlertState.ERROR: - alert.last_eval_dttm = datetime.utcnow() if not df.empty: # Looking for truthy cells for row in df.to_records(): if any(row): state = AlertState.TRIGGER - deliver_alert(alert) + deliver_alert(alert_id, slice_id, label, recipients) break if not state: state = AlertState.PASS + alert = db.session.query(Alert).get(alert_id) + + if state != AlertState.ERROR: + alert.last_eval_dttm = datetime.utcnow() + alert.last_state = state alert.logs.append( AlertLog( From 4a7db8f4f05fe994a6b54916e1c3c9ff881c7a7b Mon Sep 17 00:00:00 2001 From: Jason Davis <@dropbox.com> Date: Mon, 3 Aug 2020 09:49:15 -0700 Subject: [PATCH 2/4] updated to pass only alert id as arg --- superset/tasks/schedules.py | 64 ++++++++++++++----------------------- 1 file changed, 24 insertions(+), 40 deletions(-) diff --git a/superset/tasks/schedules.py b/superset/tasks/schedules.py index af7ff9c7b672e..4670af8f6dda5 100644 --- a/superset/tasks/schedules.py +++ b/superset/tasks/schedules.py @@ -53,7 +53,6 @@ from superset import app, db, security_manager, thumbnail_cache from superset.extensions import celery_app from superset.models.alerts import Alert, AlertLog -from superset.models.core import Database from superset.models.dashboard import Dashboard from superset.models.schedules import ( EmailDeliveryType, @@ -552,17 +551,10 @@ def schedule_alert_query( # pylint: disable=unused-argument if report_type == ScheduleType.alert: if is_test_alert and recipients: - deliver_alert(schedule.id, schedule.slice.id, schedule.label, recipients) + deliver_alert(schedule_id, recipients) return - if run_alert_query( - schedule.id, - schedule.database_id, - schedule.slice.id, - schedule.sql, - schedule.label, - schedule.recipients, - ): + if run_alert_query(schedule_id): # deliver_dashboard OR deliver_slice return else: @@ -575,21 +567,23 @@ class AlertState: PASS = "pass" -def deliver_alert(alert_id: int, slice_id: int, label: str, recipients: str) -> None: - logging.info("Triggering alert: <%s:%s>", alert_id, label) +def deliver_alert(alert_id: int, recipients: Optional[str] = None) -> None: + alert = db.session.query(Alert).get(alert_id) + + logging.info("Triggering alert: %s", alert) img_data = None images = {} - alert_slice = db.session.query(Slice).get(slice_id) + recipients = recipients or alert.recipients - if alert_slice: + if alert.slice: chart_url = get_url_path( - "Superset.slice", slice_id=alert_slice.id, standalone="true" + "Superset.slice", slice_id=alert.slice.id, standalone="true" ) - screenshot = ChartScreenshot(chart_url, alert_slice.digest) + screenshot = ChartScreenshot(chart_url, alert.slice.digest) cache_key = screenshot.cache_key() image_url = get_url_path( - "ChartRestApi.screenshot", pk=alert_slice.id, digest=cache_key + "ChartRestApi.screenshot", pk=alert.slice.id, digest=cache_key ) user = security_manager.find_user(current_app.config["THUMBNAIL_SELENIUM_USER"]) @@ -601,7 +595,7 @@ def deliver_alert(alert_id: int, slice_id: int, label: str, recipients: str) -> image_url = "https://media.giphy.com/media/dzaUX7CAG0Ihi/giphy.gif" # generate the email - subject = f"[Superset] Triggered alert: {label}" + subject = f"[Superset] Triggered alert: {alert.label}" deliver_as_group = False data = None if img_data: @@ -613,36 +607,30 @@ def deliver_alert(alert_id: int, slice_id: int, label: str, recipients: str) -> %(label)s """ ), - label=label, + label=alert.label, image_url=image_url, ) _deliver_email(recipients, deliver_as_group, subject, body, data, images) -def run_alert_query( - alert_id: int, - database_id: int, - slice_id: int, - sql: str, - label: str, - recipients: str, -) -> Optional[bool]: +def run_alert_query(alert_id: int) -> Optional[bool]: """ Execute alert.sql and return value if any rows are returned - database_id, sql, alert_id, slice_id, label """ - logger.info("Processing alert ID: %i", alert_id) - database = db.session.query(Database).get(database_id) + alert = db.session.query(Alert).get(alert_id) + + logger.info("Processing alert ID: %i", alert.id) + database = alert.database if not database: logger.error("Alert database not preset") return None - if not sql: + if not alert.sql: logger.error("Alert SQL not preset") return None - parsed_query = ParsedQuery(sql) + parsed_query = ParsedQuery(alert.sql) sql = parsed_query.stripped() state = None @@ -650,31 +638,27 @@ def run_alert_query( df = pd.DataFrame() try: - logger.info("Evaluating SQL for alert <%s:%s>", alert_id, label) + logger.info("Evaluating SQL for alert %s", alert) df = database.get_df(sql) except Exception as exc: # pylint: disable=broad-except state = AlertState.ERROR logging.exception(exc) - logging.error("Failed at evaluating alert: %s (%s)", label, alert_id) + logging.error("Failed at evaluating alert: %s (%s)", alert.label, alert.id) dttm_end = datetime.utcnow() if state != AlertState.ERROR: + alert.last_eval_dttm = datetime.utcnow() if not df.empty: # Looking for truthy cells for row in df.to_records(): if any(row): state = AlertState.TRIGGER - deliver_alert(alert_id, slice_id, label, recipients) + deliver_alert(alert.id) break if not state: state = AlertState.PASS - alert = db.session.query(Alert).get(alert_id) - - if state != AlertState.ERROR: - alert.last_eval_dttm = datetime.utcnow() - alert.last_state = state alert.logs.append( AlertLog( From 7f74e7ca5d78b769fe46863c5ae0c81e5980ddb6 Mon Sep 17 00:00:00 2001 From: Jason Davis <@dropbox.com> Date: Mon, 3 Aug 2020 15:39:58 -0700 Subject: [PATCH 3/4] used object id instead of argument --- superset/tasks/schedules.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/superset/tasks/schedules.py b/superset/tasks/schedules.py index 4670af8f6dda5..a53ce08e771b4 100644 --- a/superset/tasks/schedules.py +++ b/superset/tasks/schedules.py @@ -551,10 +551,10 @@ def schedule_alert_query( # pylint: disable=unused-argument if report_type == ScheduleType.alert: if is_test_alert and recipients: - deliver_alert(schedule_id, recipients) + deliver_alert(schedule.id, recipients) return - if run_alert_query(schedule_id): + if run_alert_query(schedule.id): # deliver_dashboard OR deliver_slice return else: From 9a2a3dcc077fc39cbe682441cd018c0247072d94 Mon Sep 17 00:00:00 2001 From: Jason Davis <@dropbox.com> Date: Mon, 3 Aug 2020 15:59:28 -0700 Subject: [PATCH 4/4] updated alerts_tests.py to reflect change --- tests/alerts_tests.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/alerts_tests.py b/tests/alerts_tests.py index a7e032a0807eb..07205810edd05 100644 --- a/tests/alerts_tests.py +++ b/tests/alerts_tests.py @@ -86,21 +86,21 @@ def teardown_module(): @patch("superset.tasks.schedules.logging.Logger.error") def test_run_alert_query(mock_error, mock_deliver_alert): with app.app_context(): - run_alert_query(db.session.query(Alert).filter_by(id=1).one()) + run_alert_query(db.session.query(Alert).filter_by(id=1).one().id) alert1 = db.session.query(Alert).filter_by(id=1).one() assert mock_deliver_alert.call_count == 0 assert len(alert1.logs) == 1 assert alert1.logs[0].alert_id == 1 assert alert1.logs[0].state == "pass" - run_alert_query(db.session.query(Alert).filter_by(id=2).one()) + run_alert_query(db.session.query(Alert).filter_by(id=2).one().id) alert2 = db.session.query(Alert).filter_by(id=2).one() assert mock_deliver_alert.call_count == 1 assert len(alert2.logs) == 1 assert alert2.logs[0].alert_id == 2 assert alert2.logs[0].state == "trigger" - run_alert_query(db.session.query(Alert).filter_by(id=3).one()) + run_alert_query(db.session.query(Alert).filter_by(id=3).one().id) alert3 = db.session.query(Alert).filter_by(id=3).one() assert mock_deliver_alert.call_count == 1 assert mock_error.call_count == 2 @@ -108,11 +108,11 @@ def test_run_alert_query(mock_error, mock_deliver_alert): assert alert3.logs[0].alert_id == 3 assert alert3.logs[0].state == "error" - run_alert_query(db.session.query(Alert).filter_by(id=4).one()) + run_alert_query(db.session.query(Alert).filter_by(id=4).one().id) assert mock_deliver_alert.call_count == 1 assert mock_error.call_count == 3 - run_alert_query(db.session.query(Alert).filter_by(id=5).one()) + run_alert_query(db.session.query(Alert).filter_by(id=5).one().id) assert mock_deliver_alert.call_count == 1 assert mock_error.call_count == 4