diff --git a/libsys_airflow/dags/digital_bookplates/digital_bookplate_instances.py b/libsys_airflow/dags/digital_bookplates/digital_bookplate_instances.py index 6c0f6477..ecc86cd4 100644 --- a/libsys_airflow/dags/digital_bookplates/digital_bookplate_instances.py +++ b/libsys_airflow/dags/digital_bookplates/digital_bookplate_instances.py @@ -1,9 +1,11 @@ from datetime import datetime, timedelta -from airflow.decorators import dag +from airflow.decorators import dag, task from airflow.operators.empty import EmptyOperator +from airflow.operators.python import get_current_context from airflow.timetables.interval import CronDataIntervalTimetable + from libsys_airflow.plugins.digital_bookplates.bookplates import ( bookplate_fund_po_lines, launch_add_979_fields_task, @@ -43,11 +45,19 @@ def digital_bookplate_instances(): end = EmptyOperator(task_id="end") + @task + def get_funds() -> list: + context = get_current_context() + params = context.get("params", {}) # type: ignore + return params.get("funds", []) + + funds = get_funds() + retrieve_paid_invoices = invoices_paid_within_date_range() retrieve_paid_invoice_lines = invoice_lines_from_invoices(retrieve_paid_invoices) - filter_paid_invoice_lines = filter_invoice_lines(retrieve_paid_invoice_lines) + filter_paid_invoice_lines = filter_invoice_lines(retrieve_paid_invoice_lines, funds) retrieve_bookplate_fund_po_lines = bookplate_fund_po_lines( filter_paid_invoice_lines @@ -57,7 +67,7 @@ def digital_bookplate_instances(): launch_add_tag_dag = launch_add_979_fields_task(instances=retrieve_instances) - start >> retrieve_paid_invoices + start >> [funds, retrieve_paid_invoices] launch_add_tag_dag >> end diff --git a/libsys_airflow/dags/digital_bookplates/fetch_digital_bookplates.py b/libsys_airflow/dags/digital_bookplates/fetch_digital_bookplates.py index e453c836..cc675b37 100644 --- a/libsys_airflow/dags/digital_bookplates/fetch_digital_bookplates.py +++ b/libsys_airflow/dags/digital_bookplates/fetch_digital_bookplates.py @@ -17,6 +17,7 @@ extract_bookplate_metadata, fetch_druids, filter_updates_errors, + trigger_instances_dag, ) @@ -82,6 +83,8 @@ def fetch_digital_bookplates(): >> end ) + trigger_instances_dag(new=filtered_data["new"]) >> end + start >> fetch_bookplate_purls >> db_results diff --git a/libsys_airflow/plugins/digital_bookplates/purl_fetcher.py b/libsys_airflow/plugins/digital_bookplates/purl_fetcher.py index 36e6838b..2d15c9a8 100644 --- a/libsys_airflow/plugins/digital_bookplates/purl_fetcher.py +++ b/libsys_airflow/plugins/digital_bookplates/purl_fetcher.py @@ -4,6 +4,7 @@ import httpx from airflow.decorators import task +from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from jsonpath_ng.ext import parse @@ -145,6 +146,18 @@ def filter_updates_errors(db_results: list) -> dict: return {"failures": failures, "new": new, "updates": updates} +@task +def trigger_instances_dag(**kwargs) -> bool: + new_funds = kwargs.get("new", []) + if len(new_funds) < 1: + logger.info("No new funds to trigger digital_bookplate_instances DAG") + return True + TriggerDagRunOperator.partial( + task_id="new-instance-dag", trigger_dag_id="digital_bookplate_instances" + ).expand(**{"logical_date": "2023-08-28T00:00:00+00:00", "funds": new_funds}) + return True + + def _add_bookplate(metadata: dict, session: Session) -> dict: new_bookplate = DigitalBookplate( created=datetime.datetime.utcnow(), diff --git a/libsys_airflow/plugins/folio/invoices.py b/libsys_airflow/plugins/folio/invoices.py index d2f1d886..88a46bf6 100644 --- a/libsys_airflow/plugins/folio/invoices.py +++ b/libsys_airflow/plugins/folio/invoices.py @@ -142,7 +142,7 @@ def invoice_lines_from_invoices(invoices: list) -> list: @task -def filter_invoice_lines(invoice_lines: list) -> list: +def filter_invoice_lines(invoice_lines: list, funds: list) -> list: """ Given a list of invoice line dictionaries, Filters invoice lines to only those with fund IDs and po line IDs diff --git a/tests/digital_bookplates/test_purl_fetcher.py b/tests/digital_bookplates/test_purl_fetcher.py index 2b040317..fb8553d3 100644 --- a/tests/digital_bookplates/test_purl_fetcher.py +++ b/tests/digital_bookplates/test_purl_fetcher.py @@ -13,6 +13,8 @@ check_deleted_from_argo, extract_bookplate_metadata, fetch_druids, + filter_updates_errors, + trigger_instances_dag, ) rows = Rows( @@ -186,6 +188,14 @@ def test_failed_bookplate(pg_hook): assert result["failure"]["druid"] == "ef919yq2614" +def test_filter_updates_errors(): + db_results = [{"failure": {}}, {"new": {}}, {"update": {}}] + + result = filter_updates_errors.function(db_results) + + assert result['failures'] == [{}] + + def test_missing_image_file(mocker): mocker.patch( @@ -248,6 +258,12 @@ def test_new_bookplate(pg_hook): assert result["new"]["db_id"] == 4 +def test_trigger_instances_dag_no_new(caplog): + trigger_instances_dag.function(new=[]) + + assert "No new funds to trigger digital_bookplate_instances DAG" in caplog.text + + def test_update_bookplate(pg_hook): updated_metadata = { "druid": "ab123xy4567", diff --git a/tests/test_invoices.py b/tests/test_invoices.py index 5c87100c..184c86ce 100644 --- a/tests/test_invoices.py +++ b/tests/test_invoices.py @@ -162,8 +162,8 @@ def test_invoice_lines_from_invoices(mocker, mock_folio_client, caplog): assert len(invoice_lines) == 2 -def test_filter_invoice_liness(mock_invoice_lines): - invoice_lines_data_struct = filter_invoice_lines.function(mock_invoice_lines) +def test_filter_invoice_lines(mock_invoice_lines): + invoice_lines_data_struct = filter_invoice_lines.function(mock_invoice_lines, []) assert len(invoice_lines_data_struct) == 2 for v in invoice_lines_data_struct[0].values(): assert v["fund_ids"] == ["3eb86c5f-c77b-4cc9-8f29-7de7ce313411"]