Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send dags for backstage #1509

Merged
merged 3 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions libsys_airflow/dags/data_exports/backstage_transmission.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
from datetime import datetime, timedelta

from airflow.decorators import dag
from airflow.models import Variable
from airflow.operators.empty import EmptyOperator
from airflow.timetables.interval import CronDataIntervalTimetable

from libsys_airflow.plugins.data_exports.transmission_tasks import (
gather_files_task,
transmit_data_ftp_task,
archive_transmitted_data_task,
)

from libsys_airflow.plugins.data_exports.email import (
failed_transmission_email,
)

logger = logging.getLogger(__name__)

default_args = {
"owner": "libsys",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1),
}


@dag(
default_args=default_args,
schedule=CronDataIntervalTimetable(
cron=Variable.get("transmit_backstage", "30 1 * * SAT"),
timezone="America/Los_Angeles",
),
start_date=datetime(2024, 11, 18),
catchup=False,
tags=["data export", "backstage"],
)
def send_backstage_records():
start = EmptyOperator(task_id="start")

end = EmptyOperator(task_id="end")

gather_files = gather_files_task(vendor="backstage")

transmit_data = transmit_data_ftp_task("backstage", gather_files)

archive_data = archive_transmitted_data_task(transmit_data['success'])

email_failures = failed_transmission_email(transmit_data["failures"])

start >> gather_files >> transmit_data >> [archive_data, email_failures] >> end


send_backstage_records()
33 changes: 17 additions & 16 deletions libsys_airflow/dags/data_exports/default_schedules.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
{
"select_backstage": "30 22 * * FRI",
"select_gobi": "30 22 * * TUE",
"transmit_gobi": "30 1 * * WED",
"select_google": "0 0 * * *",
"transmit_google": "0 2 * * *",
"select_hathi": "0 1 * * *",
"transmit_hathi": "0 3 * * *",
"select_nielsen": "0 2 * * *",
"transmit_nielsen": "0 4 * * *",
"select_oclc": "30 1 * * *",
"transmit_oclc": "30 3 * * FRI",
"select_pod": "0 22 * * *",
"transmit_pod": "0 0 * * *",
"select_sharevde": "0 20 * * *",
"transmit_sharevde": "0 22 * * *"
}
"select_backstage": "30 22 * * FRI",
"transmit_backstage": "30 1 * * SAT",
"select_gobi": "30 22 * * TUE",
"transmit_gobi": "30 1 * * WED",
"select_google": "0 0 * * *",
"transmit_google": "0 2 * * *",
"select_hathi": "0 1 * * *",
"transmit_hathi": "0 3 * * *",
"select_nielsen": "0 2 * * *",
"transmit_nielsen": "0 4 * * *",
"select_oclc": "30 1 * * *",
"transmit_oclc": "30 3 * * FRI",
"select_pod": "0 22 * * *",
"transmit_pod": "0 0 * * *",
"select_sharevde": "0 20 * * *",
"transmit_sharevde": "0 22 * * *",
}
57 changes: 49 additions & 8 deletions libsys_airflow/dags/data_exports/full_dump_transmission.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import logging
from datetime import datetime, timedelta

from airflow.decorators import dag
from airflow.decorators import dag, task
from airflow.models.param import Param
from airflow.models.connection import Connection
from airflow.models import Variable
from airflow.operators.empty import EmptyOperator

from libsys_airflow.plugins.data_exports.transmission_tasks import (
gather_files_task,
retry_failed_files_task,
transmit_data_http_task,
transmit_data_ftp_task,
)

from libsys_airflow.plugins.data_exports.email import (
Expand All @@ -28,6 +30,32 @@
}


@task(multiple_outputs=True)
def retrieve_params(**kwargs):
"""
Determine connection type based on "vendor" Param
"""
params = kwargs.get("params", {})
conn_id = params["vendor"]
return {"conn_id": conn_id}


@task.branch()
def http_or_ftp_path(**kwargs):
"""
Determine transmission type based on conn_type from connection
"""
conn_id = kwargs.get("connection")
logger.info(f"Send all records to vendor {conn_id}")
connection = Connection.get_connection_from_secrets(conn_id)
conn_type = connection.conn_type
logger.info(f"Transmit data via {conn_type}")
if conn_type == "http":
return "transmit_data_http_task"
else:
return "transmit_data_ftp_task"


@dag(
default_args=default_args,
schedule=None,
Expand All @@ -39,7 +67,7 @@
"pod",
type="string",
description="Send all records to this vendor.",
enum=["pod", "sharevde"],
enum=["pod", "sharevde", "backstage"],
),
"bucket": Param(
Variable.get("FOLIO_AWS_BUCKET", "folio-data-export-prod"), type="string"
Expand All @@ -53,6 +81,11 @@ def send_all_records():

gather_files = gather_files_task(vendor="full-dump")

vars = retrieve_params()

choose_branch = http_or_ftp_path(connection=vars["conn_id"])

# http branch
transmit_data = transmit_data_http_task(
gather_files,
files_params="upload[files][]",
Expand All @@ -69,13 +102,21 @@ def send_all_records():

email_failures = failed_transmission_email(retry_transmission["failures"])

# ftp branch
transmit_data_ftp = transmit_data_ftp_task(vars["conn_id"], gather_files)
retry_files_ftp = retry_failed_files_task(
vendor="full-dump", files=transmit_data_ftp["failures"]
)
retry_transmit_data_ftp = transmit_data_ftp_task(vars["conn_id"], retry_files_ftp)
email_failures_ftp = failed_transmission_email(retry_transmit_data_ftp["failures"])

start >> gather_files >> vars >> choose_branch >> [transmit_data, transmit_data_ftp]
transmit_data >> retry_files >> retry_transmission >> email_failures >> end
(
start
>> gather_files
>> transmit_data
>> retry_files
>> retry_transmission
>> email_failures
transmit_data_ftp
>> retry_files_ftp
>> retry_transmit_data_ftp
>> email_failures_ftp
>> end
)

Expand Down
8 changes: 6 additions & 2 deletions libsys_airflow/plugins/data_exports/transmission_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def transmit_data_ftp_task(conn_id, gather_files) -> dict:
logger.info(f"Start transmission of file {f}")
hook.store_file(remote_file_path, f)
success.append(f)
logger.info(f"End transmission of file {f}")
logger.info(f"Transmitted file to {remote_file_path}")
except Exception as e:
logger.error(e)
logger.error(f"Exception for transmission of file {f}")
Expand Down Expand Up @@ -335,6 +335,8 @@ def vendor_fileformat_spec(vendor):
return "**/*.gz"
case "gobi":
return "**/*.txt"
case "backstage":
return "**/*.mrc"
case _:
return "**/*.xml"

Expand All @@ -346,10 +348,12 @@ def vendor_filename_spec(conn_id, filename):
if conn_id == "gobi":
# gobi should have "stf" prepended
return "stf" + Path(filename).name
elif conn_id == "backstage":
return "STF" + Path(filename).name
elif conn_id == "sharevde":
return "tbd"
else:
Path(filename).name
return Path(filename).name


def vendor_url_params(conn_id, is_s3_path) -> dict:
Expand Down
73 changes: 63 additions & 10 deletions tests/data_exports/test_transmission_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
set_holdings_oclc_task,
)

from libsys_airflow.dags.data_exports.full_dump_transmission import (
http_or_ftp_path,
)


@pytest.fixture(params=["pod", "gobi"])
@pytest.fixture(params=["pod", "gobi", "backstage"])
def mock_vendor_marc_files(tmp_path, request):
airflow = tmp_path / "airflow"
vendor = request.param
Expand All @@ -49,7 +53,7 @@ def mock_vendor_marc_files(tmp_path, request):
for i, x in enumerate(setup_files['filenames']):
file = pathlib.Path(f"{marc_file_dir}/{x}")
file.touch()
if i in [0, 2, 5]:
if i in [0, 2, 4, 5]:
file.write_text("hello world")
files.append(str(file))
return {"file_list": files, "s3": False}
Expand Down Expand Up @@ -90,6 +94,12 @@ def mock_marc_files(mock_file_system):
return {"file_list": marc_files, "s3": False}


@pytest.fixture(params=["http-example.com", "ftp-example.com"])
def mock_full_dump_params(request):
conn_id = request.param
return {"conn_id": conn_id}


@pytest.fixture
def mock_httpx_connection():
return Connection(
Expand Down Expand Up @@ -176,7 +186,7 @@ def test_retry_failed_files_task(mock_marc_files, caplog):
def test_transmit_data_ftp_task(
mocker, mock_ftphook_connection, mock_marc_files, caplog
):
ftp_hook = mocker.patch(
mocker.patch(
"airflow.providers.ftp.hooks.ftp.FTPHook.store_file", return_value=True
)
mocker.patch(
Expand All @@ -192,16 +202,14 @@ def test_transmit_data_ftp_task(
transmit_data = transmit_data_ftp_task.function("ftp-example.com", mock_marc_files)
assert len(transmit_data["success"]) == 3
assert "Start transmission of file" in caplog.text
assert ftp_hook.store_file.called_with(
"/remote/path/dir/2024022914.xml", "2024022914.xml"
)
assert "Transmitted file to /remote/path/dir/2024022914.xml" in caplog.text


@pytest.mark.parametrize("mock_vendor_marc_files", ["gobi"], indirect=True)
def test_transmit_gobi_data_ftp_task(
mocker, mock_ftphook_connection, mock_vendor_marc_files, tmp_path, caplog
):
ftp_hook = mocker.patch(
mocker.patch(
"airflow.providers.ftp.hooks.ftp.FTPHook.store_file", return_value=True
)
mocker.patch(
Expand All @@ -216,13 +224,36 @@ def test_transmit_gobi_data_ftp_task(

airflow = tmp_path / "airflow"
marc_files = gather_files_task.function(airflow=airflow, vendor="gobi")
transmit_data = transmit_data_ftp_task.function("ftp-example.com", marc_files)
transmit_data = transmit_data_ftp_task.function("gobi", marc_files)
assert len(transmit_data["success"]) == 1
assert "Start transmission of file" in caplog.text
assert ftp_hook.store_file.called_with(
"/remote/path/dir/2024030214.txt", "stf.2024030214.txt"
assert "Transmitted file to /remote/path/dir/stf2024030214.txt" in caplog.text


@pytest.mark.parametrize("mock_vendor_marc_files", ["backstage"], indirect=True)
def test_transmit_backstage_data_ftp_task(
mocker, mock_ftphook_connection, mock_vendor_marc_files, tmp_path, caplog
):
mocker.patch(
"airflow.providers.ftp.hooks.ftp.FTPHook.store_file", return_value=True
)
mocker.patch(
"libsys_airflow.plugins.data_exports.transmission_tasks.Connection.get_connection_from_secrets",
return_value=mock_ftphook_connection,
)

mocker.patch(
"libsys_airflow.plugins.data_exports.transmission_tasks.is_production",
return_value=True,
)

airflow = tmp_path / "airflow"
marc_files = gather_files_task.function(airflow=airflow, vendor="backstage")
transmit_data = transmit_data_ftp_task.function("backstage", marc_files)
assert len(transmit_data["success"]) == 1
assert "Start transmission of file" in caplog.text
assert "Transmitted file to /remote/path/dir/STF2024020314.mrc" in caplog.text


def test_transmit_data_task(
mocker, mock_httpx_connection, mock_httpx_success, mock_marc_files, caplog
Expand All @@ -247,6 +278,28 @@ def test_transmit_data_task(
assert "Setting URL params to" not in caplog.text


@pytest.mark.parametrize("mock_full_dump_params", ["http-example.com"], indirect=True)
def test_full_dump_http(mocker, mock_httpx_connection, mock_full_dump_params, caplog):
mocker.patch(
"libsys_airflow.plugins.data_exports.transmission_tasks.Connection.get_connection_from_secrets",
return_value=mock_httpx_connection,
)
branch = http_or_ftp_path.function(connection=mock_full_dump_params["conn_id"])
assert branch == "transmit_data_http_task"
assert "Transmit data via http" in caplog.text


@pytest.mark.parametrize("mock_full_dump_params", ["ftp-example.com"], indirect=True)
def test_full_dump_ftp(mocker, mock_ftphook_connection, mock_full_dump_params, caplog):
mocker.patch(
"libsys_airflow.plugins.data_exports.transmission_tasks.Connection.get_connection_from_secrets",
return_value=mock_ftphook_connection,
)
branch = http_or_ftp_path.function(connection=mock_full_dump_params["conn_id"])
assert branch == "transmit_data_ftp_task"
assert "Transmit data via ftp" in caplog.text


def test_transmit_data_from_s3_task(
mocker, mock_httpx_connection, mock_httpx_success, mock_marc_files, caplog
):
Expand Down