diff --git a/src/dou_dag_generator.py b/src/dou_dag_generator.py index b31d48b..7de77f4 100755 --- a/src/dou_dag_generator.py +++ b/src/dou_dag_generator.py @@ -8,7 +8,6 @@ [] - Definir sufixo do título do email a partir de configuração """ -import ast import logging import os import sys @@ -344,12 +343,29 @@ def perform_searches( return search_dict - def has_matches(self, search_result: list, skip_null: bool) -> str: + def get_xcom_pull_tasks(self, num_searches, **context): + """Retrieve XCom values from multiple tasks and append them to a new list. + Function required for Airflow version 2.10.0 or later + (https://github.com/apache/airflow/issues/41983). + """ + search_results = [] + for counter in range(1, num_searches + 1): + search_results.append(context["ti"].xcom_pull( + task_ids=f'exec_searchs.exec_search_{counter}')) + + return search_results + + + def has_matches(self, num_searches: int, skip_null: bool, **context) -> str: + """Check if search has matches and return to skip notification or not""" if skip_null: + search_results = self.get_xcom_pull_tasks(num_searches=num_searches, + **context) + skip_notification = True - search_result = ast.literal_eval(search_result) - for search in search_result: + + for search in search_results: items = ["contains" for k, v in search["result"].items() if v] if items: skip_notification = False @@ -378,6 +394,20 @@ def select_terms_from_db(self, sql: str, conn_id: str): return terms_df.to_json(orient="columns") + def send_notification(self, + num_searches: int, + specs: DAGConfig, + report_date: str, + **context) -> str: + """Send user notification using class Notifier + """ + search_report = self.get_xcom_pull_tasks(num_searches=num_searches, + **context) + + notifier = Notifier(specs) + + notifier.send_notification(search_report=search_report, report_date=report_date) + def create_dag(self, specs: DAGConfig, config_file: str) -> DAG: """Creates the DAG object and tasks @@ -413,11 +443,7 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG: with TaskGroup(group_id="exec_searchs") as tg_exec_searchs: - # is it a single search or a list of searchers? - if isinstance(specs.search, list): - searches = specs.search - else: - searches = [specs.search] + searches = specs.search for counter, subsearch in enumerate(searches, 1): @@ -476,14 +502,7 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG: task_id="has_matches", python_callable=self.has_matches, op_kwargs={ - "search_result": "{{ ti.xcom_pull(task_ids=" - + str( - [ - f"exec_searchs.exec_search_{count}" - for count in range(1, len(searches) + 1) - ] - ) - + ") }}", + "num_searches": len(searches), "skip_null": specs.report.skip_null, }, ) @@ -492,16 +511,10 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG: send_notification_task = PythonOperator( task_id="send_notification", - python_callable=Notifier(specs).send_notification, + python_callable=self.send_notification, op_kwargs={ - "search_report": "{{ ti.xcom_pull(task_ids=" - + str( - [ - f"exec_searchs.exec_search_{count}" - for count in range(1, len(searches) + 1) - ] - ) - + ") }}", + "num_searches": len(searches), + "specs": specs, "report_date": template_ano_mes_dia_trigger_local_time, }, ) diff --git a/src/notification/notifier.py b/src/notification/notifier.py index 604801d..adcd0c2 100644 --- a/src/notification/notifier.py +++ b/src/notification/notifier.py @@ -41,8 +41,6 @@ def send_notification(self, search_report: str, report_date: str): search_report (str): The report to be sent report_date (str): The date of the report """ - # Convert to data structure after it's retrieved from xcom - search_report = ast.literal_eval(search_report) for sender in self.senders: sender.send_report(search_report, report_date)