diff --git a/CHANGES.rst b/CHANGES.rst index c20f01241..bf166dc74 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -9,6 +9,8 @@ Documentation general ------- +- Update the exposure pipeline to accept a roman datamodel as input [#1296] + - Update okify script to use GA directory structure [#1282] - pin numpy to <2 [#1275] diff --git a/romancal/datamodels/filetype.py b/romancal/datamodels/filetype.py index 9cc8a4f62..42d578ec3 100644 --- a/romancal/datamodels/filetype.py +++ b/romancal/datamodels/filetype.py @@ -3,6 +3,10 @@ from pathlib import Path from typing import Union +import roman_datamodels as rdm + +from romancal.datamodels import ModelContainer + def check(init: Union[os.PathLike, Path, io.FileIO]) -> str: """ @@ -17,11 +21,11 @@ def check(init: Union[os.PathLike, Path, io.FileIO]) -> str: Returns ------- file_type: str - a string with the file type ("asdf" or "asn") + a string with the file type ("asdf", "asn", "DataModel", or "ModelContainer") """ - supported = ("asdf", "json") + supported = ("asdf", "json", "DataModel") if isinstance(init, (str, os.PathLike, Path)): path, ext = os.path.splitext(init) @@ -41,6 +45,12 @@ def check(init: Union[os.PathLike, Path, io.FileIO]) -> str: return "asn" return ext + elif isinstance(init, rdm.DataModel): + return "DataModel" + + elif isinstance(init, ModelContainer): + return "ModelContainer" + elif hasattr(init, "read") and hasattr(init, "seek"): magic = init.read(5) init.seek(0, 0) diff --git a/romancal/datamodels/tests/test_filetype.py b/romancal/datamodels/tests/test_filetype.py index f06085282..8b938c714 100644 --- a/romancal/datamodels/tests/test_filetype.py +++ b/romancal/datamodels/tests/test_filetype.py @@ -1,8 +1,9 @@ from pathlib import Path import pytest +import roman_datamodels as rdm -from romancal.datamodels import filetype +from romancal.datamodels import ModelContainer, filetype DATA_DIRECTORY = Path(__file__).parent / "data" @@ -20,6 +21,11 @@ def test_filetype(): with open(DATA_DIRECTORY / "fake.json") as file_h: file_8 = filetype.check(file_h) file_9 = filetype.check(str(DATA_DIRECTORY / "pluto.asdf")) + model_container = ModelContainer() + file_10 = filetype.check(model_container) + image_node = rdm.maker_utils.mk_level2_image(shape=(20, 20)) + im1 = rdm.datamodels.ImageModel(image_node) + file_11 = filetype.check(im1) assert file_1 == "asn" assert file_2 == "asn" @@ -30,6 +36,8 @@ def test_filetype(): assert file_7 == "asdf" assert file_8 == "asn" assert file_9 == "asdf" + assert file_10 == "ModelContainer" + assert file_11 == "DataModel" with pytest.raises(ValueError): filetype.check(DATA_DIRECTORY / "empty.txt") diff --git a/romancal/pipeline/exposure_pipeline.py b/romancal/pipeline/exposure_pipeline.py index c6a6ecc84..56b510a9a 100644 --- a/romancal/pipeline/exposure_pipeline.py +++ b/romancal/pipeline/exposure_pipeline.py @@ -74,12 +74,11 @@ def process(self, input): else: input_filename = None - # open the input file + # determine the input type file_type = filetype.check(input) if file_type == "asn": asn = ModelContainer.read_asn(input) - - if file_type == "asdf": + elif file_type == "asdf": try: # set the product name based on the input filename asn = asn_from_list([input], product_name=input_filename.split(".")[0]) @@ -92,23 +91,30 @@ def process(self, input): expos_file = [] n_members = 0 # extract the members from the asn to run the files through the steps - for product in asn["products"]: - n_members = len(product["members"]) - for member in product["members"]: - expos_file.append(member["expname"]) - results = ModelContainer() tweakreg_input = ModelContainer() - for in_file in expos_file: - if isinstance(in_file, str): - input_filename = basename(in_file) - log.info(f"Input file name: {input_filename}") - else: - input_filename = None - - # Open the file - input = rdm.open(in_file) - log.info(f"Processing a WFI exposure {in_file}") + if file_type == "asn": + for product in asn["products"]: + n_members = len(product["members"]) + for member in product["members"]: + expos_file.append(member["expname"]) + + # results = ModelContainer() + # tweakreg_input = ModelContainer() + for in_file in expos_file: + if isinstance(in_file, str): + input_filename = basename(in_file) + log.info(f"Input file name: {input_filename}") + else: + input_filename = None + elif file_type == "DataModel": + in_file = input + + # check to see if in_file is defined, if not assume we have a datamodel + if in_file is not None: + # Open the file + input = rdm.open(in_file) + log.info(f"Processing a WFI exposure {in_file}") self.dq_init.suffix = "dq_init" result = self.dq_init(input) diff --git a/romancal/regtest/test_wfi_pipeline.py b/romancal/regtest/test_wfi_pipeline.py index 07eb8baaa..73911cab7 100644 --- a/romancal/regtest/test_wfi_pipeline.py +++ b/romancal/regtest/test_wfi_pipeline.py @@ -1,6 +1,7 @@ """ Roman tests for flat field correction """ import copy +import os import numpy as np import pytest @@ -481,6 +482,39 @@ def test_level2_grism_processing_pipeline(rtdata, ignore_asdf_paths): ) +@pytest.mark.bigdata +def test_elp_input_dm(rtdata, ignore_asdf_paths): + """Test for input roman Datamodel to exposure level pipeline""" + input_data = "r0000101001001001001_01101_0001_WFI01_uncal.asdf" + rtdata.get_data(f"WFI/image/{input_data}") + dm_input = rdm.open(rtdata.input) + + # Test Pipeline with input datamodel + output = "r0000101001001001001_01101_0001_WFI01_cal.asdf" + rtdata.output = output + ExposurePipeline.call(dm_input, save_results=True) + rtdata.get_truth(f"truth/WFI/image/{output}") + + # check that the file exists ( don't duplicate checking contents done above) + pipeline = ExposurePipeline() + pipeline.log.info( + "Check that the output file exists " + passfail(os.path.isfile(rtdata.output)) + ) + + # Ensure step completion is as expected + model = rdm.open(rtdata.output) + + assert model.meta.cal_step.dq_init == "COMPLETE" + assert model.meta.cal_step.saturation == "COMPLETE" + assert model.meta.cal_step.linearity == "COMPLETE" + assert model.meta.cal_step.dark == "COMPLETE" + assert model.meta.cal_step.jump == "COMPLETE" + assert model.meta.cal_step.ramp_fit == "COMPLETE" + assert model.meta.cal_step.assign_wcs == "COMPLETE" + assert model.meta.cal_step.flat_field == "COMPLETE" + assert model.meta.cal_step.photom == "COMPLETE" + + @pytest.mark.bigdata def test_processing_pipeline_all_saturated(rtdata, ignore_asdf_paths): """Tests for fully saturated data skipping steps in the pipeline"""