Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rcal 860 Allow roman datamodel as input to the exposure pipeline #1296

Merged
merged 9 commits into from
Jul 8, 2024
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Documentation
general
-------

- Update the exposure pipeline to accept a roman datamodel as input [#1296]

- Update okify script to use GA directory structure [#1282]

- pin numpy to <2 [#1275]
Expand Down
14 changes: 12 additions & 2 deletions romancal/datamodels/filetype.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from pathlib import Path
from typing import Union

import roman_datamodels as rdm

from romancal.datamodels import ModelContainer


def check(init: Union[os.PathLike, Path, io.FileIO]) -> str:
"""
Expand All @@ -17,11 +21,11 @@ def check(init: Union[os.PathLike, Path, io.FileIO]) -> str:
Returns
-------
file_type: str
a string with the file type ("asdf" or "asn")
a string with the file type ("asdf", "asn", "DataModel", or "ModelContainer")

"""

supported = ("asdf", "json")
supported = ("asdf", "json", "DataModel")

if isinstance(init, (str, os.PathLike, Path)):
path, ext = os.path.splitext(init)
Expand All @@ -41,6 +45,12 @@ def check(init: Union[os.PathLike, Path, io.FileIO]) -> str:
return "asn"

return ext
elif isinstance(init, rdm.DataModel):
return "DataModel"

elif isinstance(init, ModelContainer):
return "ModelContainer"

elif hasattr(init, "read") and hasattr(init, "seek"):
magic = init.read(5)
init.seek(0, 0)
Expand Down
10 changes: 9 additions & 1 deletion romancal/datamodels/tests/test_filetype.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pathlib import Path

import pytest
import roman_datamodels as rdm

from romancal.datamodels import filetype
from romancal.datamodels import ModelContainer, filetype

DATA_DIRECTORY = Path(__file__).parent / "data"

Expand All @@ -20,6 +21,11 @@ def test_filetype():
with open(DATA_DIRECTORY / "fake.json") as file_h:
file_8 = filetype.check(file_h)
file_9 = filetype.check(str(DATA_DIRECTORY / "pluto.asdf"))
model_container = ModelContainer()
file_10 = filetype.check(model_container)
image_node = rdm.maker_utils.mk_level2_image(shape=(20, 20))
im1 = rdm.datamodels.ImageModel(image_node)
file_11 = filetype.check(im1)

assert file_1 == "asn"
assert file_2 == "asn"
Expand All @@ -30,6 +36,8 @@ def test_filetype():
assert file_7 == "asdf"
assert file_8 == "asn"
assert file_9 == "asdf"
assert file_10 == "ModelContainer"
assert file_11 == "DataModel"

with pytest.raises(ValueError):
filetype.check(DATA_DIRECTORY / "empty.txt")
Expand Down
42 changes: 24 additions & 18 deletions romancal/pipeline/exposure_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,11 @@
else:
input_filename = None

# open the input file
# determine the input type
file_type = filetype.check(input)
if file_type == "asn":
asn = ModelContainer.read_asn(input)

if file_type == "asdf":
elif file_type == "asdf":

Check warning on line 81 in romancal/pipeline/exposure_pipeline.py

View check run for this annotation

Codecov / codecov/patch

romancal/pipeline/exposure_pipeline.py#L81

Added line #L81 was not covered by tests
try:
# set the product name based on the input filename
asn = asn_from_list([input], product_name=input_filename.split(".")[0])
Expand All @@ -92,23 +91,30 @@
expos_file = []
n_members = 0
# extract the members from the asn to run the files through the steps
for product in asn["products"]:
n_members = len(product["members"])
for member in product["members"]:
expos_file.append(member["expname"])

results = ModelContainer()
tweakreg_input = ModelContainer()
for in_file in expos_file:
if isinstance(in_file, str):
input_filename = basename(in_file)
log.info(f"Input file name: {input_filename}")
else:
input_filename = None

# Open the file
input = rdm.open(in_file)
log.info(f"Processing a WFI exposure {in_file}")
if file_type == "asn":
for product in asn["products"]:
n_members = len(product["members"])
for member in product["members"]:
expos_file.append(member["expname"])

Check warning on line 100 in romancal/pipeline/exposure_pipeline.py

View check run for this annotation

Codecov / codecov/patch

romancal/pipeline/exposure_pipeline.py#L96-L100

Added lines #L96 - L100 were not covered by tests

# results = ModelContainer()
# tweakreg_input = ModelContainer()
for in_file in expos_file:
if isinstance(in_file, str):
input_filename = basename(in_file)
log.info(f"Input file name: {input_filename}")

Check warning on line 107 in romancal/pipeline/exposure_pipeline.py

View check run for this annotation

Codecov / codecov/patch

romancal/pipeline/exposure_pipeline.py#L104-L107

Added lines #L104 - L107 were not covered by tests
else:
input_filename = None
elif file_type == "DataModel":
in_file = input

Check warning on line 111 in romancal/pipeline/exposure_pipeline.py

View check run for this annotation

Codecov / codecov/patch

romancal/pipeline/exposure_pipeline.py#L109-L111

Added lines #L109 - L111 were not covered by tests

# check to see if in_file is defined, if not assume we have a datamodel
if in_file is not None:

Check warning on line 114 in romancal/pipeline/exposure_pipeline.py

View check run for this annotation

Codecov / codecov/patch

romancal/pipeline/exposure_pipeline.py#L114

Added line #L114 was not covered by tests
# Open the file
input = rdm.open(in_file)
log.info(f"Processing a WFI exposure {in_file}")

Check warning on line 117 in romancal/pipeline/exposure_pipeline.py

View check run for this annotation

Codecov / codecov/patch

romancal/pipeline/exposure_pipeline.py#L116-L117

Added lines #L116 - L117 were not covered by tests

self.dq_init.suffix = "dq_init"
result = self.dq_init(input)
Expand Down
34 changes: 34 additions & 0 deletions romancal/regtest/test_wfi_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" Roman tests for flat field correction """

import copy
import os

import numpy as np
import pytest
Expand Down Expand Up @@ -481,6 +482,39 @@ def test_level2_grism_processing_pipeline(rtdata, ignore_asdf_paths):
)


@pytest.mark.bigdata
def test_elp_input_dm(rtdata, ignore_asdf_paths):
"""Test for input roman Datamodel to exposure level pipeline"""
input_data = "r0000101001001001001_01101_0001_WFI01_uncal.asdf"
rtdata.get_data(f"WFI/image/{input_data}")
dm_input = rdm.open(rtdata.input)

# Test Pipeline with input datamodel
output = "r0000101001001001001_01101_0001_WFI01_cal.asdf"
rtdata.output = output
ExposurePipeline.call(dm_input, save_results=True)
rtdata.get_truth(f"truth/WFI/image/{output}")

# check that the file exists ( don't duplicate checking contents done above)
pipeline = ExposurePipeline()
pipeline.log.info(
"Check that the output file exists " + passfail(os.path.isfile(rtdata.output))
)

# Ensure step completion is as expected
model = rdm.open(rtdata.output)

assert model.meta.cal_step.dq_init == "COMPLETE"
assert model.meta.cal_step.saturation == "COMPLETE"
assert model.meta.cal_step.linearity == "COMPLETE"
assert model.meta.cal_step.dark == "COMPLETE"
assert model.meta.cal_step.jump == "COMPLETE"
assert model.meta.cal_step.ramp_fit == "COMPLETE"
assert model.meta.cal_step.assign_wcs == "COMPLETE"
assert model.meta.cal_step.flat_field == "COMPLETE"
assert model.meta.cal_step.photom == "COMPLETE"


@pytest.mark.bigdata
def test_processing_pipeline_all_saturated(rtdata, ignore_asdf_paths):
"""Tests for fully saturated data skipping steps in the pipeline"""
Expand Down
Loading