Skip to content

Commit

Permalink
Merge branch 'main' into fix-wcs
Browse files Browse the repository at this point in the history
  • Loading branch information
schlafly authored Jul 9, 2024
2 parents 55eed26 + 8a7f497 commit dedc0cc
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Documentation
general
-------

- Update the exposure pipeline to accept a roman datamodel as input [#1296]

- Update okify script to use GA directory structure [#1282]

- pin numpy to <2 [#1275]
Expand Down
14 changes: 12 additions & 2 deletions romancal/datamodels/filetype.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from pathlib import Path
from typing import Union

import roman_datamodels as rdm

from romancal.datamodels import ModelContainer


def check(init: Union[os.PathLike, Path, io.FileIO]) -> str:
"""
Expand All @@ -17,11 +21,11 @@ def check(init: Union[os.PathLike, Path, io.FileIO]) -> str:
Returns
-------
file_type: str
a string with the file type ("asdf" or "asn")
a string with the file type ("asdf", "asn", "DataModel", or "ModelContainer")
"""

supported = ("asdf", "json")
supported = ("asdf", "json", "DataModel")

if isinstance(init, (str, os.PathLike, Path)):
path, ext = os.path.splitext(init)
Expand All @@ -41,6 +45,12 @@ def check(init: Union[os.PathLike, Path, io.FileIO]) -> str:
return "asn"

return ext
elif isinstance(init, rdm.DataModel):
return "DataModel"

elif isinstance(init, ModelContainer):
return "ModelContainer"

elif hasattr(init, "read") and hasattr(init, "seek"):
magic = init.read(5)
init.seek(0, 0)
Expand Down
10 changes: 9 additions & 1 deletion romancal/datamodels/tests/test_filetype.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pathlib import Path

import pytest
import roman_datamodels as rdm

from romancal.datamodels import filetype
from romancal.datamodels import ModelContainer, filetype

DATA_DIRECTORY = Path(__file__).parent / "data"

Expand All @@ -20,6 +21,11 @@ def test_filetype():
with open(DATA_DIRECTORY / "fake.json") as file_h:
file_8 = filetype.check(file_h)
file_9 = filetype.check(str(DATA_DIRECTORY / "pluto.asdf"))
model_container = ModelContainer()
file_10 = filetype.check(model_container)
image_node = rdm.maker_utils.mk_level2_image(shape=(20, 20))
im1 = rdm.datamodels.ImageModel(image_node)
file_11 = filetype.check(im1)

assert file_1 == "asn"
assert file_2 == "asn"
Expand All @@ -30,6 +36,8 @@ def test_filetype():
assert file_7 == "asdf"
assert file_8 == "asn"
assert file_9 == "asdf"
assert file_10 == "ModelContainer"
assert file_11 == "DataModel"

with pytest.raises(ValueError):
filetype.check(DATA_DIRECTORY / "empty.txt")
Expand Down
42 changes: 24 additions & 18 deletions romancal/pipeline/exposure_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,11 @@ def process(self, input):
else:
input_filename = None

# open the input file
# determine the input type
file_type = filetype.check(input)
if file_type == "asn":
asn = ModelContainer.read_asn(input)

if file_type == "asdf":
elif file_type == "asdf":
try:
# set the product name based on the input filename
asn = asn_from_list([input], product_name=input_filename.split(".")[0])
Expand All @@ -92,23 +91,30 @@ def process(self, input):
expos_file = []
n_members = 0
# extract the members from the asn to run the files through the steps
for product in asn["products"]:
n_members = len(product["members"])
for member in product["members"]:
expos_file.append(member["expname"])

results = ModelContainer()
tweakreg_input = ModelContainer()
for in_file in expos_file:
if isinstance(in_file, str):
input_filename = basename(in_file)
log.info(f"Input file name: {input_filename}")
else:
input_filename = None

# Open the file
input = rdm.open(in_file)
log.info(f"Processing a WFI exposure {in_file}")
if file_type == "asn":
for product in asn["products"]:
n_members = len(product["members"])
for member in product["members"]:
expos_file.append(member["expname"])

# results = ModelContainer()
# tweakreg_input = ModelContainer()
for in_file in expos_file:
if isinstance(in_file, str):
input_filename = basename(in_file)
log.info(f"Input file name: {input_filename}")
else:
input_filename = None
elif file_type == "DataModel":
in_file = input

# check to see if in_file is defined, if not assume we have a datamodel
if in_file is not None:
# Open the file
input = rdm.open(in_file)
log.info(f"Processing a WFI exposure {in_file}")

self.dq_init.suffix = "dq_init"
result = self.dq_init(input)
Expand Down
34 changes: 34 additions & 0 deletions romancal/regtest/test_wfi_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" Roman tests for flat field correction """

import copy
import os

import numpy as np
import pytest
Expand Down Expand Up @@ -481,6 +482,39 @@ def test_level2_grism_processing_pipeline(rtdata, ignore_asdf_paths):
)


@pytest.mark.bigdata
def test_elp_input_dm(rtdata, ignore_asdf_paths):
"""Test for input roman Datamodel to exposure level pipeline"""
input_data = "r0000101001001001001_01101_0001_WFI01_uncal.asdf"
rtdata.get_data(f"WFI/image/{input_data}")
dm_input = rdm.open(rtdata.input)

# Test Pipeline with input datamodel
output = "r0000101001001001001_01101_0001_WFI01_cal.asdf"
rtdata.output = output
ExposurePipeline.call(dm_input, save_results=True)
rtdata.get_truth(f"truth/WFI/image/{output}")

# check that the file exists ( don't duplicate checking contents done above)
pipeline = ExposurePipeline()
pipeline.log.info(
"Check that the output file exists " + passfail(os.path.isfile(rtdata.output))
)

# Ensure step completion is as expected
model = rdm.open(rtdata.output)

assert model.meta.cal_step.dq_init == "COMPLETE"
assert model.meta.cal_step.saturation == "COMPLETE"
assert model.meta.cal_step.linearity == "COMPLETE"
assert model.meta.cal_step.dark == "COMPLETE"
assert model.meta.cal_step.jump == "COMPLETE"
assert model.meta.cal_step.ramp_fit == "COMPLETE"
assert model.meta.cal_step.assign_wcs == "COMPLETE"
assert model.meta.cal_step.flat_field == "COMPLETE"
assert model.meta.cal_step.photom == "COMPLETE"


@pytest.mark.bigdata
def test_processing_pipeline_all_saturated(rtdata, ignore_asdf_paths):
"""Tests for fully saturated data skipping steps in the pipeline"""
Expand Down

0 comments on commit dedc0cc

Please sign in to comment.