Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Triggers Digital Bookplates Instances DAG #1245

Merged
merged 3 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from datetime import datetime, timedelta

from airflow.decorators import dag
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import get_current_context
from airflow.timetables.interval import CronDataIntervalTimetable


from libsys_airflow.plugins.digital_bookplates.bookplates import (
bookplate_fund_po_lines,
launch_add_979_fields_task,
Expand Down Expand Up @@ -43,11 +45,19 @@ def digital_bookplate_instances():

end = EmptyOperator(task_id="end")

@task
def get_funds() -> list:
context = get_current_context()
params = context.get("params", {}) # type: ignore
return params.get("funds", [])

funds = get_funds()

retrieve_paid_invoices = invoices_paid_within_date_range()

retrieve_paid_invoice_lines = invoice_lines_from_invoices(retrieve_paid_invoices)

filter_paid_invoice_lines = filter_invoice_lines(retrieve_paid_invoice_lines)
filter_paid_invoice_lines = filter_invoice_lines(retrieve_paid_invoice_lines, funds)

retrieve_bookplate_fund_po_lines = bookplate_fund_po_lines(
filter_paid_invoice_lines
Expand All @@ -57,7 +67,7 @@ def digital_bookplate_instances():

launch_add_tag_dag = launch_add_979_fields_task(instances=retrieve_instances)

start >> retrieve_paid_invoices
start >> [funds, retrieve_paid_invoices]
launch_add_tag_dag >> end


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
extract_bookplate_metadata,
fetch_druids,
filter_updates_errors,
trigger_instances_dag,
)


Expand Down Expand Up @@ -82,6 +83,8 @@ def fetch_digital_bookplates():
>> end
)

trigger_instances_dag(new=filtered_data["new"]) >> end

start >> fetch_bookplate_purls >> db_results


Expand Down
13 changes: 13 additions & 0 deletions libsys_airflow/plugins/digital_bookplates/purl_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import httpx

from airflow.decorators import task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

from jsonpath_ng.ext import parse
Expand Down Expand Up @@ -145,6 +146,18 @@ def filter_updates_errors(db_results: list) -> dict:
return {"failures": failures, "new": new, "updates": updates}


@task
def trigger_instances_dag(**kwargs) -> bool:
new_funds = kwargs.get("new", [])
if len(new_funds) < 1:
logger.info("No new funds to trigger digital_bookplate_instances DAG")
return True
TriggerDagRunOperator.partial(
task_id="new-instance-dag", trigger_dag_id="digital_bookplate_instances"
).expand(**{"logical_date": "2023-08-28T00:00:00+00:00", "funds": new_funds})
return True


def _add_bookplate(metadata: dict, session: Session) -> dict:
new_bookplate = DigitalBookplate(
created=datetime.datetime.utcnow(),
Expand Down
2 changes: 1 addition & 1 deletion libsys_airflow/plugins/folio/invoices.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def invoice_lines_from_invoices(invoices: list) -> list:


@task
def filter_invoice_lines(invoice_lines: list) -> list:
def filter_invoice_lines(invoice_lines: list, funds: list) -> list:
"""
Given a list of invoice line dictionaries,
Filters invoice lines to only those with fund IDs and po line IDs
Expand Down
16 changes: 16 additions & 0 deletions tests/digital_bookplates/test_purl_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
check_deleted_from_argo,
extract_bookplate_metadata,
fetch_druids,
filter_updates_errors,
trigger_instances_dag,
)

rows = Rows(
Expand Down Expand Up @@ -186,6 +188,14 @@ def test_failed_bookplate(pg_hook):
assert result["failure"]["druid"] == "ef919yq2614"


def test_filter_updates_errors():
db_results = [{"failure": {}}, {"new": {}}, {"update": {}}]

result = filter_updates_errors.function(db_results)

assert result['failures'] == [{}]


def test_missing_image_file(mocker):

mocker.patch(
Expand Down Expand Up @@ -248,6 +258,12 @@ def test_new_bookplate(pg_hook):
assert result["new"]["db_id"] == 4


def test_trigger_instances_dag_no_new(caplog):
trigger_instances_dag.function(new=[])

assert "No new funds to trigger digital_bookplate_instances DAG" in caplog.text


def test_update_bookplate(pg_hook):
updated_metadata = {
"druid": "ab123xy4567",
Expand Down
4 changes: 2 additions & 2 deletions tests/test_invoices.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ def test_invoice_lines_from_invoices(mocker, mock_folio_client, caplog):
assert len(invoice_lines) == 2


def test_filter_invoice_liness(mock_invoice_lines):
invoice_lines_data_struct = filter_invoice_lines.function(mock_invoice_lines)
def test_filter_invoice_lines(mock_invoice_lines):
invoice_lines_data_struct = filter_invoice_lines.function(mock_invoice_lines, [])
assert len(invoice_lines_data_struct) == 2
for v in invoice_lines_data_struct[0].values():
assert v["fund_ids"] == ["3eb86c5f-c77b-4cc9-8f29-7de7ce313411"]
Expand Down
Loading