Skip to content

Commit

Permalink
Add tests for identify_file_format script
Browse files Browse the repository at this point in the history
* Add type hints for identify_file_format script
  • Loading branch information
replaceafill authored Aug 15, 2024
1 parent 752f979 commit be6e57b
Show file tree
Hide file tree
Showing 4 changed files with 445 additions and 34 deletions.
6 changes: 4 additions & 2 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ warn_redundant_casts = True
warn_unused_configs = True


[mypy-src.MCPClient.lib.client.*,src.MCPClient.*.normalize,src.MCPClient.*.validate_file, src.MCPClient.*.policy_check]
[mypy-src.MCPClient.lib.client.*,src.MCPClient.*.identify_file_format,src.MCPClient.*.normalize,src.MCPClient.*.validate_file, src.MCPClient.*.policy_check]

check_untyped_defs = True
disallow_any_generics = True
disallow_incomplete_defs = True
Expand All @@ -20,7 +21,8 @@ warn_return_any = True
warn_unused_ignores = True


[mypy-tests.MCPClient.conftest,tests.MCPClient.test_normalize,tests.MCPClient.test_validate_file,tests.MCPClient.test_policy_check]
[mypy-tests.MCPClient.conftest,tests.MCPClient.test_identify_file_format,tests.MCPClient.test_normalize,tests.MCPClient.test_validate_file,tests.MCPClient.test_policy_check]

check_untyped_defs = True
disallow_any_generics = True
disallow_incomplete_defs = True
Expand Down
94 changes: 62 additions & 32 deletions src/MCPClient/lib/clientScripts/identify_file_format.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
#!/usr/bin/env python
import argparse
import dataclasses
import multiprocessing
import uuid
from typing import List
from typing import Optional

import django

django.setup()
# dashboard

from client.job import Job
from databaseFunctions import insertIntoEvents
from django.db import transaction
from django.utils import timezone

# archivematicaCommon
from executeOrRunSubProcess import executeOrRun
from fpr.models import FormatVersion
from fpr.models import IDCommand
Expand All @@ -21,12 +23,23 @@
from main.models import FileID
from main.models import UnitVariable

SUCCESS = 0
ERROR = 255


def concurrent_instances():
@dataclasses.dataclass
class IdentifyFileFormatArgs:
idcommand: str
file_path: str
file_uuid: str
disable_reidentify: bool


def concurrent_instances() -> int:
return multiprocessing.cpu_count()


def _save_id_preference(file_, value):
def _save_id_preference(file_: File, value: bool) -> None:
"""
Saves whether file format identification is being used.
Expand All @@ -35,22 +48,25 @@ def _save_id_preference(file_, value):
variable, which will be transformed back into a passVar when a new chain in
the same unit is begun.
"""
value = str(value)

# The unit_uuid foreign key can point to a transfer or SIP, and this tool
# runs in both.
# Check the SIP first - if it hasn't been assigned yet, then this is being
# run during the transfer.
unit = file_.sip or file_.transfer

rd = {"%IDCommand%": value}
rd = {"%IDCommand%": str(value)}

UnitVariable.objects.create(
unituuid=unit.pk, variable="replacementDict", variablevalue=str(rd)
)


def write_identification_event(file_uuid, command, format=None, success=True):
def write_identification_event(
file_uuid: str,
command: IDCommand,
format: Optional[str] = None,
success: bool = True,
) -> None:
event_detail_text = (
f'program="{command.tool.description}"; version="{command.tool.version}"'
)
Expand All @@ -75,7 +91,7 @@ def write_identification_event(file_uuid, command, format=None, success=True):
)


def write_file_id(file_uuid, format, output):
def write_file_id(file_uuid: str, format: FormatVersion, output: str) -> None:
"""
Write the identified format to the DB.
Expand All @@ -102,24 +118,26 @@ def write_file_id(file_uuid, format, output):
)


def _default_idcommand():
def _default_idcommand() -> IDCommand:
"""Retrieve the default ``fpr.IDCommand``.
We only expect to find one command enabled/active.
"""
return IDCommand.active.first()


def main(job, enabled, file_path, file_uuid, disable_reidentify):
enabled = True if enabled == "True" else False
if not enabled:
def main(
job: Job, enabled: str, file_path: str, file_uuid: str, disable_reidentify: bool
) -> int:
enabled_bool = True if enabled == "True" else False
if not enabled_bool:
job.print_output("Skipping file format identification")
return 0
return SUCCESS

command = _default_idcommand()
if command is None:
job.write_error("Unable to determine IDCommand.\n")
return 255
return ERROR

command_uuid = command.uuid
job.print_output("IDCommand:", command.description)
Expand All @@ -138,11 +156,11 @@ def main(job, enabled, file_path, file_uuid, disable_reidentify):
job.print_output(
"This file has already been identified, and re-identification is disabled. Skipping."
)
return 0
return SUCCESS

# Save whether identification was enabled by the user for use in a later
# chain.
_save_id_preference(file_, enabled)
_save_id_preference(file_, enabled_bool)

exitcode, output, err = executeOrRun(
command.script_type,
Expand All @@ -156,49 +174,49 @@ def main(job, enabled, file_path, file_uuid, disable_reidentify):
if exitcode != 0:
job.print_error(f"Error: IDCommand with UUID {command_uuid} exited non-zero.")
job.print_error(f"Error: {err}")
return 255
return ERROR

job.print_output("Command output:", output)
# PUIDs are the same regardless of tool, so PUID-producing tools don't have "rules" per se - we just
# go straight to the FormatVersion table to see if there's a matching PUID
try:
if command.config == "PUID":
version = FormatVersion.active.get(pronom_id=output)
format_version = FormatVersion.active.get(pronom_id=output)
else:
rule = IDRule.active.get(command_output=output, command=command)
version = rule.format
format_version = rule.format
except IDRule.DoesNotExist:
job.print_error(
f'Error: No FPR identification rule for tool output "{output}" found'
)
write_identification_event(file_uuid, command, success=False)
return 255
return ERROR
except IDRule.MultipleObjectsReturned:
job.print_error(
f'Error: Multiple FPR identification rules for tool output "{output}" found'
)
write_identification_event(file_uuid, command, success=False)
return 255
return ERROR
except FormatVersion.DoesNotExist:
job.print_error(f"Error: No FPR format record found for PUID {output}")
write_identification_event(file_uuid, command, success=False)
return 255
return ERROR

(ffv, created) = FileFormatVersion.objects.get_or_create(
file_uuid=file_, defaults={"format_version": version}
file_uuid=file_, defaults={"format_version": format_version}
)
if not created: # Update the version if it wasn't created new
ffv.format_version = version
ffv.format_version = format_version
ffv.save()
job.print_output(f"{file_path} identified as a {version.description}")
job.print_output(f"{file_path} identified as a {format_version.description}")

write_identification_event(file_uuid, command, format=version.pronom_id)
write_file_id(file_uuid=file_uuid, format=version, output=output)
write_identification_event(file_uuid, command, format=format_version.pronom_id)
write_file_id(file_uuid=file_uuid, format=format_version, output=output)

return 0
return SUCCESS


def call(jobs):
def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Identify file formats.")

# Since AM19 the accepted values are "True" or "False" since the ability to
Expand All @@ -215,10 +233,22 @@ def call(jobs):
help="Disable identification if it has already happened for this file.",
)

return parser


def parse_args(parser: argparse.ArgumentParser, job: Job) -> IdentifyFileFormatArgs:
namespace = parser.parse_args(job.args[1:])

return IdentifyFileFormatArgs(**vars(namespace))


def call(jobs: List[Job]) -> None:
parser = get_parser()

with transaction.atomic():
for job in jobs:
with job.JobContext():
args = parser.parse_args(job.args[1:])
args = parse_args(parser, job)
job.set_status(
main(
job,
Expand Down
17 changes: 17 additions & 0 deletions tests/MCPClient/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,35 @@ def fptool() -> fprmodels.FPTool:
return fprmodels.FPTool.objects.create()


@pytest.fixture
def idtool() -> fprmodels.IDTool:
return fprmodels.IDTool.objects.create()


@pytest.fixture
def fpcommand(fptool: fprmodels.FPTool) -> fprmodels.FPCommand:
return fprmodels.FPCommand.objects.create(tool=fptool)


@pytest.fixture
def idcommand(idtool: fprmodels.IDTool) -> fprmodels.IDCommand:
return fprmodels.IDCommand.objects.create(tool=idtool, config="PUID")


@pytest.fixture
def fprule(
fpcommand: fprmodels.FPCommand, format_version: fprmodels.FormatVersion
) -> fprmodels.FPRule:
return fprmodels.FPRule.objects.create(command=fpcommand, format=format_version)


@pytest.fixture
def idrule(
idcommand: fprmodels.IDCommand, format_version: fprmodels.FormatVersion
) -> fprmodels.IDRule:
return fprmodels.IDRule.objects.create(command=idcommand, format=format_version)


@pytest.fixture()
def fprule_characterization(fprule: fprmodels.FPRule) -> fprmodels.FPRule:
fprule.purpose = fprmodels.FPRule.CHARACTERIZATION
Expand Down
Loading

0 comments on commit be6e57b

Please sign in to comment.