Skip to content

Commit

Permalink
Merge pull request #1245 from sul-dlss/t1221-new-instance-dag
Browse files Browse the repository at this point in the history
Triggers Digital Bookplates Instances DAG
  • Loading branch information
shelleydoljack authored Oct 4, 2024
2 parents d7a1f24 + 10cc9db commit 3b3c791
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from datetime import datetime, timedelta

from airflow.decorators import dag
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import get_current_context
from airflow.timetables.interval import CronDataIntervalTimetable


from libsys_airflow.plugins.digital_bookplates.bookplates import (
bookplate_fund_po_lines,
launch_add_979_fields_task,
Expand Down Expand Up @@ -43,11 +45,19 @@ def digital_bookplate_instances():

end = EmptyOperator(task_id="end")

@task
def get_funds() -> list:
context = get_current_context()
params = context.get("params", {}) # type: ignore
return params.get("funds", [])

funds = get_funds()

retrieve_paid_invoices = invoices_paid_within_date_range()

retrieve_paid_invoice_lines = invoice_lines_from_invoices(retrieve_paid_invoices)

filter_paid_invoice_lines = filter_invoice_lines(retrieve_paid_invoice_lines)
filter_paid_invoice_lines = filter_invoice_lines(retrieve_paid_invoice_lines, funds)

retrieve_bookplate_fund_po_lines = bookplate_fund_po_lines(
filter_paid_invoice_lines
Expand All @@ -57,7 +67,7 @@ def digital_bookplate_instances():

launch_add_tag_dag = launch_add_979_fields_task(instances=retrieve_instances)

start >> retrieve_paid_invoices
start >> [funds, retrieve_paid_invoices]
launch_add_tag_dag >> end


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
extract_bookplate_metadata,
fetch_druids,
filter_updates_errors,
trigger_instances_dag,
)


Expand Down Expand Up @@ -82,6 +83,8 @@ def fetch_digital_bookplates():
>> end
)

trigger_instances_dag(new=filtered_data["new"]) >> end

start >> fetch_bookplate_purls >> db_results


Expand Down
13 changes: 13 additions & 0 deletions libsys_airflow/plugins/digital_bookplates/purl_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import httpx

from airflow.decorators import task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

from jsonpath_ng.ext import parse
Expand Down Expand Up @@ -145,6 +146,18 @@ def filter_updates_errors(db_results: list) -> dict:
return {"failures": failures, "new": new, "updates": updates}


@task
def trigger_instances_dag(**kwargs) -> bool:
new_funds = kwargs.get("new", [])
if len(new_funds) < 1:
logger.info("No new funds to trigger digital_bookplate_instances DAG")
return True
TriggerDagRunOperator.partial(
task_id="new-instance-dag", trigger_dag_id="digital_bookplate_instances"
).expand(**{"logical_date": "2023-08-28T00:00:00+00:00", "funds": new_funds})
return True


def _add_bookplate(metadata: dict, session: Session) -> dict:
new_bookplate = DigitalBookplate(
created=datetime.datetime.utcnow(),
Expand Down
2 changes: 1 addition & 1 deletion libsys_airflow/plugins/folio/invoices.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def invoice_lines_from_invoices(invoices: list) -> list:


@task
def filter_invoice_lines(invoice_lines: list) -> list:
def filter_invoice_lines(invoice_lines: list, funds: list) -> list:
"""
Given a list of invoice line dictionaries,
Filters invoice lines to only those with fund IDs and po line IDs
Expand Down
16 changes: 16 additions & 0 deletions tests/digital_bookplates/test_purl_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
check_deleted_from_argo,
extract_bookplate_metadata,
fetch_druids,
filter_updates_errors,
trigger_instances_dag,
)

rows = Rows(
Expand Down Expand Up @@ -186,6 +188,14 @@ def test_failed_bookplate(pg_hook):
assert result["failure"]["druid"] == "ef919yq2614"


def test_filter_updates_errors():
db_results = [{"failure": {}}, {"new": {}}, {"update": {}}]

result = filter_updates_errors.function(db_results)

assert result['failures'] == [{}]


def test_missing_image_file(mocker):

mocker.patch(
Expand Down Expand Up @@ -248,6 +258,12 @@ def test_new_bookplate(pg_hook):
assert result["new"]["db_id"] == 4


def test_trigger_instances_dag_no_new(caplog):
trigger_instances_dag.function(new=[])

assert "No new funds to trigger digital_bookplate_instances DAG" in caplog.text


def test_update_bookplate(pg_hook):
updated_metadata = {
"druid": "ab123xy4567",
Expand Down
4 changes: 2 additions & 2 deletions tests/test_invoices.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ def test_invoice_lines_from_invoices(mocker, mock_folio_client, caplog):
assert len(invoice_lines) == 2


def test_filter_invoice_liness(mock_invoice_lines):
invoice_lines_data_struct = filter_invoice_lines.function(mock_invoice_lines)
def test_filter_invoice_lines(mock_invoice_lines):
invoice_lines_data_struct = filter_invoice_lines.function(mock_invoice_lines, [])
assert len(invoice_lines_data_struct) == 2
for v in invoice_lines_data_struct[0].values():
assert v["fund_ids"] == ["3eb86c5f-c77b-4cc9-8f29-7de7ce313411"]
Expand Down

0 comments on commit 3b3c791

Please sign in to comment.