Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customize and use fix_encumbrances_quesnelia script #1494

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
omit =
tests/*
vendor_loads_migration/*
libsys_airflow/plugins/folio/encumbrances/fix_encumbrances.py
libsys_airflow/plugins/folio/encumbrances/fix_encumbrances*.py
libsys_airflow/dags/*
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ orafin-files/
vendor-data/
virtual-env/
vendor-keys/
digital-bookplates/
digital-bookplates/
authorities/
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ x-airflow-common:
- ${AIRFLOW_PROJ_DIR:-.}/data-export-files:/opt/airflow/data-export-files
- ${AIRFLOW_PROJ_DIR:-.}/fix_encumbrances:/opt/airflow/fix_encumbrances
- ${AIRFLOW_PROJ_DIR:-.}/orafin-files:/opt/airflow/orafin-files
- ${AIRFLOW_PROJ_DIR:-.}/authorities:/opt/airflow/authorities
- ${AIRFLOW_PROJ_DIR:-.}/.aws:/home/airflow/.aws
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
Expand Down
67 changes: 67 additions & 0 deletions libsys_airflow/dags/authority_control/load_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Load MARC Authority and Bibiliographic Records into FOLIO."""

import logging
from datetime import datetime

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

from libsys_airflow.plugins.authority_control import (
clean_up,
email_report,
run_folio_data_import,
)

logger = logging.getLogger(__name__)


@dag(
schedule=None,
start_date=datetime(2025, 2, 7),
catchup=False,
tags=["authorities", "folio"],
)
def load_marc_file(**kwargs):
"""DAG loads an Authority or Bib File into FOLIO."""

@task
def prepare_file_upload(*args, **kwargs):
task_instance = kwargs["ti"]
context = get_current_context()
params = context.get("params", {})
file_path = params["kwargs"].get("file")
if file_path is None:
raise ValueError("File path is required")
task_instance.xcom_push(key="file_path", value=file_path)
profile_name = params["kwargs"].get("profile")
if profile_name is None:
raise ValueError("Profile name is required")
task_instance.xcom_push(key="profile_name", value=profile_name)
return {"file_path": file_path, "profile_name": profile_name}

@task
def initiate_folio_data_import(file_path, profile_name):
context = get_current_context()
bash_operator = run_folio_data_import(file_path, profile_name)
return bash_operator.execute(context)

@task
def email_load_report(**kwargs):
return email_report(**kwargs)

@task
def clean_up_dag(*args, **kwargs):
task_instance = kwargs["ti"]
marc_file_path = task_instance.xcom_pull(key="file_path")
return clean_up(marc_file_path)

dag_params = prepare_file_upload()
bash_result = initiate_folio_data_import(
dag_params["file_path"], dag_params["profile_name"]
)

email_load_report(bash_result=bash_result)
bash_result >> clean_up_dag()


load_marc_file()
39 changes: 37 additions & 2 deletions libsys_airflow/dags/data_exports/full_dump_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

from airflow import DAG
from airflow.models.param import Param
from airflow.models import Variable
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import get_current_context
from airflow.decorators import task, task_group

from libsys_airflow.plugins.data_exports.full_dump_marc import (
create_materialized_view,
fetch_number_of_records,
fetch_full_dump_marc,
reset_s3,
Expand Down Expand Up @@ -54,11 +56,37 @@
type="integer",
description="Number of batch processing jobs to run in parallel.",
),
"exclude_tags": Param(
True,
type="boolean",
description="Remove excluded tags listed in marc/excluded_tags.pyfrom incoming record.",
),
"from_date": Param(
Variable.get("FOLIO_EPOCH_DATE", "2023-08-23"),
format="date",
type="string",
description="The earliest date to select record IDs from FOLIO.",
),
"to_date": Param(
f"{(datetime.now()).strftime('%Y-%m-%d')}",
format="date",
type="string",
description="The latest date to select record IDs from FOLIO.",
),
"recreate_view": Param(
False,
type="boolean",
description="Recreate the materialized view with the original FOLIO marc records to process.",
),
},
) as dag:

start = EmptyOperator(task_id='start')

@task
def create_full_selection_matrerialized_view():
create_materialized_view()

@task
def reset_s3_bucket():
reset_s3()
Expand Down Expand Up @@ -131,8 +159,13 @@ def transform_marc_records_add_holdings(marc_files: list):

@task
def transform_marc_records_clean_serialize(marc_files: list):
context = get_current_context()
params = context.get("params", {}) # type: ignore
exclude_tags = params.get("exclude_tags", True)
for marc_file in marc_files:
marc_clean_serialize(marc_file, full_dump=True)
marc_clean_serialize(
marc_file, full_dump=True, exclude_tags=exclude_tags
)

@task
def compress_marc_files(marc_files: list):
Expand Down Expand Up @@ -161,6 +194,8 @@ def compress_marc_files(marc_files: list):
number_in_batch=batch_size,
)

create_view = create_full_selection_matrerialized_view()

delete_s3_files = reset_s3_bucket()

start_stop = calculate_start_stop.partial(div=record_div).expand(job=number_of_jobs)
Expand All @@ -175,5 +210,5 @@ def compress_marc_files(marc_files: list):
task_id="finish_marc",
)

start >> delete_s3_files >> total_records
start >> create_view >> delete_s3_files >> total_records
finish_transforms >> finish_processing_marc
2 changes: 1 addition & 1 deletion libsys_airflow/dags/folio_finance/lane_fix_encumbrances.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"retry_delay": timedelta(minutes=1),
}

FY_CODE = Variable.get("FISCAL_YEAR_CODE_LANE", "LANE2025")
FY_CODE = Variable.get("FISCAL_YEAR_CODE_LANE", "")


with DAG(
Expand Down
2 changes: 1 addition & 1 deletion libsys_airflow/dags/folio_finance/law_fix_encumbrances.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"retry_delay": timedelta(minutes=1),
}

FY_CODE = Variable.get("FISCAL_YEAR_CODE_LAW", "LAW2025")
FY_CODE = Variable.get("FISCAL_YEAR_CODE_LAW", "")

with DAG(
"fix_encumbrances_law",
Expand Down
2 changes: 1 addition & 1 deletion libsys_airflow/dags/folio_finance/sul_fix_encumbrances.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"retry_delay": timedelta(minutes=1),
}

FY_CODE = Variable.get("FISCAL_YEAR_CODE_SUL", "SUL2025")
FY_CODE = Variable.get("FISCAL_YEAR_CODE_SUL", "")


with DAG(
Expand Down
5 changes: 5 additions & 0 deletions libsys_airflow/plugins/authority_control/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from libsys_airflow.plugins.authority_control.data_import import ( # noqa
run_folio_data_import,
)
from libsys_airflow.plugins.authority_control.email import email_report # noqa
from libsys_airflow.plugins.authority_control.helpers import clean_up # noqa
34 changes: 34 additions & 0 deletions libsys_airflow/plugins/authority_control/data_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import logging

from airflow.models import Variable
from airflow.operators.bash import BashOperator

logger = logging.getLogger(__name__)


def run_folio_data_import(file_path: str, profile_name: str):
"""
Run the folio data import
"""
args = [
"python3 -m folio_data_import",
"--record-type MARC21",
"--gateway_url $gateway_url",
"--tenant_id sul",
"--username $username",
"--password $password",
"--marc_file_path $marc_file_path",
"--import_profile_name \"$profile_name\"",
]
bash_operator = BashOperator(
task_id="run_folio_data_import",
bash_command=" ".join(args),
env={
"gateway_url": Variable.get("OKAPI_URL"),
"username": Variable.get("FOLIO_USER"),
"password": Variable.get("FOLIO_PASSWORD"),
"marc_file_path": file_path,
"profile_name": profile_name,
},
)
return bash_operator
38 changes: 38 additions & 0 deletions libsys_airflow/plugins/authority_control/email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging

from airflow.models import Variable

from libsys_airflow.plugins.shared.utils import (
is_production,
send_email_with_server_name,
dag_run_url,
)

logger = logging.getLogger(__name__)


def email_report(**kwargs):
"""
Emails result of folio-data-import to developers and SUL

Args:
- bash_result (str): result of the bash command
"""
bash_result = kwargs.get("bash_result")
devs_email = Variable.get("EMAIL_DEVS")
to_emails = [devs_email]

if is_production():
sul_email = Variable.get("OCLC_EMAIL_SUL")
to_emails.append(sul_email)

url_dag_run = dag_run_url(**kwargs)
body = (
f"""<p>{bash_result}</p><p>DAG Run<a href="{url_dag_run}">{url_dag_run}</a>"""
)
send_email_with_server_name(
to=to_emails,
subject="Folio Data Import",
html_content=body,
)
logger.info(f"Emailing load report: {bash_result}")
19 changes: 19 additions & 0 deletions libsys_airflow/plugins/authority_control/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging
import pathlib

logger = logging.getLogger(__name__)


def clean_up(marc_file: str, airflow: str = '/opt/airflow'):
"""
Moves marc file after running folio data import
"""
marc_file_path = pathlib.Path(marc_file)
archive_dir = pathlib.Path(airflow) / "authorities/archive"
archive_dir.mkdir(parents=True, exist_ok=True)

archive_file = archive_dir / marc_file_path.name
marc_file_path.rename(archive_file)

logger.info(f"Moved {marc_file_path} to archive")
return True
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def run_data_export_upload(self):
except Exception as e:
flash(f"Error: {e}")
finally:
return default_rendered_page(self)
return default_rendered_page(self) # noqa

@expose("/")
def data_export_upload_home(self):
Expand Down
42 changes: 42 additions & 0 deletions libsys_airflow/plugins/data_exports/full_dump_marc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging

from datetime import datetime, timedelta
from pathlib import Path
from s3path import S3Path
from typing import Union

from airflow.models import Variable
from airflow.operators.python import get_current_context
Expand All @@ -10,6 +13,45 @@
logger = logging.getLogger(__name__)


def create_materialized_view(**kwargs) -> Union[str, None]:
context = get_current_context()
params = context.get("params", {}) # type: ignore
recreate = params.get("recreate_view", False)
from_date = params.get("from_date")
to_date = params.get(
"to_date", (datetime.now() + timedelta(1)).strftime('%Y-%m-%d')
)

query = None
if recreate:
with open(materialized_view_sql_file()) as sqf:
query = sqf.read()

SQLExecuteQueryOperator(
task_id="postgres_full_count_query",
conn_id="postgres_folio",
database=kwargs.get("database", "okapi"),
sql=query,
parameters={
"from_date": from_date,
"to_date": to_date,
},
).execute(context)
else:
logger.info("Skipping refresh of materialized view")

return query


def materialized_view_sql_file(**kwargs) -> Path:
sql_path = (
Path(kwargs.get("airflow", "/opt/airflow"))
/ "libsys_airflow/plugins/data_exports/sql/materialized_view.sql"
)

return sql_path


def fetch_full_dump_marc(**kwargs) -> str:
offset = kwargs.get("offset")
batch_size = kwargs.get("batch_size", 1000)
Expand Down
1 change: 1 addition & 0 deletions libsys_airflow/plugins/data_exports/marc/excluded_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
'005',
'029',
'049',
'079',
'066',
'099',
'265',
Expand Down
Loading