From 8607718d386fa59671dc4d9d77de8ca99bf6e5af Mon Sep 17 00:00:00 2001 From: Eduardo Lauer Date: Thu, 3 Oct 2024 16:18:45 -0300 Subject: [PATCH] include datasets --- dag_load_inlabs/ro-dou_inlabs_load_pg_dag.py | 51 +++++++++++++++++++- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/dag_load_inlabs/ro-dou_inlabs_load_pg_dag.py b/dag_load_inlabs/ro-dou_inlabs_load_pg_dag.py index c9eaf46..5c76112 100644 --- a/dag_load_inlabs/ro-dou_inlabs_load_pg_dag.py +++ b/dag_load_inlabs/ro-dou_inlabs_load_pg_dag.py @@ -10,8 +10,10 @@ from airflow import Dataset from airflow.decorators import dag, task +from airflow.operators.python import get_current_context from airflow.models import Variable from airflow.providers.common.sql.operators.sql import SQLCheckOperator +from airflow.operators.python import BranchPythonOperator sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) @@ -19,7 +21,7 @@ DEST_DIR = "download_inlabs" #XXX update here -DEST_CONN_ID = "database_to_load_inlabs_data" +DEST_CONN_ID = "inlabs_db" #XXX connection to https://inlabs.in.gov.br/ INLABS_CONN_ID = "inlabs_portal" #XXX remember to create schema `dou_inlabs` on db @@ -41,7 +43,7 @@ @dag( dag_id="ro-dou_inlabs_load_pg", default_args=default_args, - schedule="59 3,23 * * *", + schedule="00 15,23 * * *", catchup=False, description=__doc__, max_active_runs=1, @@ -213,16 +215,61 @@ def _clean_db(hook: PostgresHook): outlets=[Dataset("inlabs")] ) + @task.branch + def check_if_first_run_of_day(): + context = get_current_context() + execution_date = context['logical_date'] + prev_execution_date = context['prev_execution_date'] + logging.info("Execution_date: %s", execution_date) + logging.info("Prev_execution_date: %s", prev_execution_date) + + if execution_date.day == prev_execution_date.day: + logging.info ("Não é a primeira execução do dia") + logging.info ("Triggering dataset edicao_extra") + return "trigger_dataset_edicao_extra" + else: + logging.info ("Primeira execução do dia") + logging.info ("Triggering dataset e DAGs do INLABS") + return "trigger_dataset_inlabs" + + + @task(outlets=[Dataset("inlabs_edicao_extra")]) + def trigger_dataset_inlabs_edicao_extra(): + pass + + @task(outlets=[Dataset("inlabs")]) + def trigger_dataset_inlabs(): + pass + + @task def remove_directory(): dest_path = os.path.join(Variable.get("path_tmp"), DEST_DIR) subprocess.run(f"rm -rf {dest_path}", shell=True, check=True) logging.info("Directory %s removed.", dest_path) + # @task_group(group_id='datasets') + # def trigger_datasets(): + # @task.run_if(lambda context: context["task_instance"].execution_date.hour == 15) + # @task(outlets=[Dataset("inlabs")]) + # def trigger_dataset_edicao_normal(): + # logging.info("Disparando DAGs do INLABS") + + # @task.run_if(lambda context: context["task_instance"].execution_date.hour > 15) + # @task(outlets=[Dataset("inlabs_edicao_extra")]) + # def trigger_dataset_edicao_extra(**kwargs): + # logging.info(context["task_instance"]) + # logging.info("Atualizando o Dataset de Edição Extra") + + # trigger_dataset_edicao_normal(), trigger_dataset_edicao_extra() + + ## Orchestration trigger_date = get_date() download_n_unzip_files(trigger_date) >> \ load_data(trigger_date) >> check_loaded_data >> \ + check_if_first_run_of_day() >> \ + [trigger_dataset_inlabs_edicao_extra(),trigger_dataset_inlabs()] >> \ remove_directory()