Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ajusta o código para utilizar XCom_pull de múltiplos Tasks IDs. #140

Merged
merged 6 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 39 additions & 26 deletions src/dou_dag_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
[] - Definir sufixo do título do email a partir de configuração
"""

import ast
import logging
import os
import sys
Expand Down Expand Up @@ -344,12 +343,29 @@ def perform_searches(

return search_dict

def has_matches(self, search_result: list, skip_null: bool) -> str:
def get_xcom_pull_tasks(self, num_searches, **context):
"""Retrieve XCom values from multiple tasks and append them to a new list.
Function required for Airflow version 2.10.0 or later
(https://github.com/apache/airflow/issues/41983).
"""
search_results = []
for counter in range(1, num_searches + 1):
search_results.append(context["ti"].xcom_pull(
task_ids=f'exec_searchs.exec_search_{counter}'))

return search_results


def has_matches(self, num_searches: int, skip_null: bool, **context) -> str:
"""Check if search has matches and return to skip notification or not"""

if skip_null:
search_results = self.get_xcom_pull_tasks(num_searches=num_searches,
**context)

skip_notification = True
search_result = ast.literal_eval(search_result)
for search in search_result:

for search in search_results:
items = ["contains" for k, v in search["result"].items() if v]
if items:
skip_notification = False
Expand Down Expand Up @@ -378,6 +394,20 @@ def select_terms_from_db(self, sql: str, conn_id: str):

return terms_df.to_json(orient="columns")

def send_notification(self,
num_searches: int,
specs: DAGConfig,
report_date: str,
**context) -> str:
"""Send user notification using class Notifier
"""
search_report = self.get_xcom_pull_tasks(num_searches=num_searches,
**context)

notifier = Notifier(specs)

notifier.send_notification(search_report=search_report, report_date=report_date)

def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:
"""Creates the DAG object and tasks

Expand Down Expand Up @@ -413,11 +443,7 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:

with TaskGroup(group_id="exec_searchs") as tg_exec_searchs:

# is it a single search or a list of searchers?
if isinstance(specs.search, list):
searches = specs.search
else:
searches = [specs.search]
searches = specs.search

for counter, subsearch in enumerate(searches, 1):

Expand Down Expand Up @@ -476,14 +502,7 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:
task_id="has_matches",
python_callable=self.has_matches,
op_kwargs={
"search_result": "{{ ti.xcom_pull(task_ids="
+ str(
[
f"exec_searchs.exec_search_{count}"
for count in range(1, len(searches) + 1)
]
)
+ ") }}",
"num_searches": len(searches),
"skip_null": specs.report.skip_null,
},
)
Expand All @@ -492,16 +511,10 @@ def create_dag(self, specs: DAGConfig, config_file: str) -> DAG:

send_notification_task = PythonOperator(
task_id="send_notification",
python_callable=Notifier(specs).send_notification,
python_callable=self.send_notification,
op_kwargs={
"search_report": "{{ ti.xcom_pull(task_ids="
+ str(
[
f"exec_searchs.exec_search_{count}"
for count in range(1, len(searches) + 1)
]
)
+ ") }}",
"num_searches": len(searches),
"specs": specs,
"report_date": template_ano_mes_dia_trigger_local_time,
},
)
Expand Down
2 changes: 0 additions & 2 deletions src/notification/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ def send_notification(self, search_report: str, report_date: str):
search_report (str): The report to be sent
report_date (str): The date of the report
"""
# Convert to data structure after it's retrieved from xcom
search_report = ast.literal_eval(search_report)

for sender in self.senders:
sender.send_report(search_report, report_date)
Loading