-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Observatory dataclass and observatory_to_sorcha_config
- Loading branch information
Showing
4 changed files
with
305 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ dependencies = [ | |
"adam_core==0.2.2", | ||
"ray", | ||
"sorcha==0.9.1", | ||
"quivr==0.7.3a1", | ||
] | ||
|
||
[build-system] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
from dataclasses import dataclass | ||
from typing import Literal, Optional | ||
|
||
|
||
@dataclass | ||
class FieldOfView: | ||
camera_model: str = Literal["footprint", "circle"] | ||
footprint_path: Optional[str] = None | ||
fill_factor: Optional[float] = None | ||
circle_radius: Optional[float] = None | ||
footprint_edge_threshold: Optional[float] = None | ||
|
||
def __post_init__(self): | ||
self.camera_model = self.camera_model | ||
|
||
if self.camera_model == "footprint": | ||
if self.footprint_path is None: | ||
raise ValueError( | ||
"footprint_path is required for camera_model='footprint'" | ||
) | ||
elif self.camera_model == "circle": | ||
if self.fill_factor is None: | ||
raise ValueError("fill_factor is required for camera_model='circle'") | ||
if self.circle_radius is None: | ||
raise ValueError("circle_radius is required for camera_model='circle'") | ||
if self.footprint_edge_threshold is not None: | ||
raise ValueError( | ||
"footprint_edge_threshold is only valid for camera_model='footprint'" | ||
) | ||
else: | ||
raise ValueError(f"Unknown camera model: {self.camera_model}") | ||
|
||
def to_string(self) -> str: | ||
string = "[FOV]\n" | ||
if self.camera_model == "footprint": | ||
string += f"camera_model = {self.camera_model}\nfootprint_path = {self.footprint_path}" | ||
if self.footprint_edge_threshold is not None: | ||
string += ( | ||
f"\nfootprint_edge_threshold = {self.footprint_edge_threshold}" | ||
) | ||
else: | ||
string += f"camera_model = {self.camera_model}\nfill_factor = {self.fill_factor}\ncircle_radius = {self.circle_radius}" | ||
|
||
return string | ||
|
||
|
||
@dataclass | ||
class Simulation: | ||
ang_fov: float | ||
fov_buffer: float | ||
picket: Optional[int] = 1 | ||
healpix_order: Optional[int] = 6 | ||
|
||
def to_string(self, observatory_code) -> str: | ||
return f"""[SIMULATION] | ||
ar_ang_fov = {self.ang_fov} | ||
ar_fov_buffer = {self.fov_buffer} | ||
ar_picket = {self.picket} | ||
ar_obs_code = {observatory_code} | ||
ar_healpix_order = {self.healpix_order}""" | ||
|
||
|
||
@dataclass | ||
class Observatory: | ||
code: str | ||
filters: list[str] | ||
bright_limit: list[float] | ||
fov: FieldOfView | ||
simulation: Simulation | ||
|
||
|
||
def observatory_to_sorcha_config( | ||
observatory: Observatory, | ||
time_range: Optional[list[float]] = None, | ||
) -> str: | ||
""" | ||
Create a Sorcha configuration file from an Observatory object and, optionally, a time range. | ||
Parameters | ||
---------- | ||
observatory : Observatory | ||
The observatory object to create the configuration file for. | ||
time_range : list[float], optional | ||
The time range to filter the pointings by, by default None. | ||
Returns | ||
------- | ||
str | ||
The Sorcha configuration file as a string. | ||
""" | ||
sql_query = f"SELECT * FROM pointings WHERE observatory_code = '{observatory.code}'" | ||
if time_range is not None: | ||
sql_query += f" AND observationStartMJD >= {time_range[0]} AND observationStartMJD <= {time_range[1]}" | ||
|
||
config = f""" | ||
# Sorcha Configuration File - ADAM Test Data - {observatory.code} | ||
[INPUT] | ||
ephemerides_type = ar | ||
eph_format = csv | ||
size_serial_chunk = 5000 | ||
aux_format = csv | ||
pointing_sql_query = {sql_query} | ||
{observatory.simulation.to_string(observatory.code)} | ||
[FILTERS] | ||
observing_filters = {','.join(observatory.filters)} | ||
[BRIGHT_LIMITS] | ||
bright_limit = {','.join(map(str, observatory.bright_limit))} | ||
[PHASECURVES] | ||
phase_function = HG | ||
{observatory.fov.to_string()} | ||
[FADINGFUNCTION] | ||
fading_function_on = False | ||
fading_function_width = 0.1 | ||
fading_function_peak_efficiency = 1 | ||
[OUTPUT] | ||
output_format = csv | ||
output_columns = all | ||
[LIGHTCURVE] | ||
lc_model = none | ||
[ACTIVITY] | ||
comet_activity = none | ||
[EXPERT] | ||
randomization_on = True | ||
vignetting_on = True | ||
trailing_losses_on = True | ||
""" | ||
return config |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
import pytest | ||
|
||
from ..observatory import ( | ||
FieldOfView, | ||
Observatory, | ||
Simulation, | ||
observatory_to_sorcha_config, | ||
) | ||
|
||
|
||
def test_FieldOfView_to_string(): | ||
# Test that FieldOfView.to_string returns the correct string representation of the object. | ||
# These strings are used to create sorcha configuration files. | ||
fov = FieldOfView(camera_model="circle", fill_factor=0.5, circle_radius=1.0) | ||
|
||
assert ( | ||
fov.to_string() | ||
== "[FOV]\ncamera_model = circle\nfill_factor = 0.5\ncircle_radius = 1.0" | ||
) | ||
|
||
fov = FieldOfView(camera_model="footprint", footprint_path="path/to/footprint") | ||
|
||
assert ( | ||
fov.to_string() | ||
== "[FOV]\ncamera_model = footprint\nfootprint_path = path/to/footprint" | ||
) | ||
|
||
fov = FieldOfView( | ||
camera_model="footprint", | ||
footprint_path="path/to/footprint", | ||
footprint_edge_threshold=0.1, | ||
) | ||
|
||
assert ( | ||
fov.to_string() | ||
== "[FOV]\ncamera_model = footprint\nfootprint_path = path/to/footprint\nfootprint_edge_threshold = 0.1" | ||
) | ||
|
||
|
||
def test_FieldOfView_raises(): | ||
# Test that FieldOfView raises the correct exceptions when invalid argument combinations | ||
# are passed. | ||
with pytest.raises(ValueError, match="Unknown camera model: unknown"): | ||
FieldOfView(camera_model="unknown") | ||
|
||
with pytest.raises( | ||
ValueError, match="fill_factor is required for camera_model='circle'" | ||
): | ||
FieldOfView(camera_model="circle", circle_radius=1.0) | ||
|
||
with pytest.raises( | ||
ValueError, match="circle_radius is required for camera_model='circle'" | ||
): | ||
FieldOfView(camera_model="circle", fill_factor=0.5) | ||
|
||
with pytest.raises( | ||
ValueError, match="footprint_path is required for camera_model='footprint'" | ||
): | ||
FieldOfView(camera_model="footprint") | ||
|
||
with pytest.raises( | ||
ValueError, | ||
match="footprint_edge_threshold is only valid for camera_model='footprint'", | ||
): | ||
FieldOfView( | ||
camera_model="circle", | ||
fill_factor=0.5, | ||
circle_radius=1.0, | ||
footprint_edge_threshold=0.1, | ||
) | ||
|
||
|
||
def test_Simulation_to_string(): | ||
# Test that Simulation.to_string returns the correct string representation of the object. | ||
# These strings are used to create sorcha configuration files. | ||
sim = Simulation(ang_fov=1.0, fov_buffer=0.1) | ||
|
||
assert ( | ||
sim.to_string("500") | ||
== """[SIMULATION] | ||
ar_ang_fov = 1.0 | ||
ar_fov_buffer = 0.1 | ||
ar_picket = 1 | ||
ar_obs_code = 500 | ||
ar_healpix_order = 6""" | ||
) | ||
|
||
sim = Simulation(ang_fov=1.0, fov_buffer=0.1, picket=2, healpix_order=7) | ||
|
||
assert ( | ||
sim.to_string("X05") | ||
== """[SIMULATION] | ||
ar_ang_fov = 1.0 | ||
ar_fov_buffer = 0.1 | ||
ar_picket = 2 | ||
ar_obs_code = X05 | ||
ar_healpix_order = 7""" | ||
) | ||
|
||
|
||
def test_observatory_to_sorcha_config(): | ||
|
||
# Test that observatory_to_sorcha_config returns the correct string representation of the | ||
# object. These strings are used to create sorcha configuration files. | ||
obs = Observatory( | ||
code="X05", | ||
filters=["u", "g", "r", "i", "z", "y"], | ||
bright_limit=[16, 16, 16, 16, 16, 16], | ||
fov=FieldOfView(camera_model="circle", fill_factor=0.9, circle_radius=3), | ||
simulation=Simulation(ang_fov=1.0, fov_buffer=0.1), | ||
) | ||
|
||
assert ( | ||
observatory_to_sorcha_config(obs) | ||
== """ | ||
# Sorcha Configuration File - ADAM Test Data - X05 | ||
[INPUT] | ||
ephemerides_type = ar | ||
eph_format = csv | ||
size_serial_chunk = 5000 | ||
aux_format = csv | ||
pointing_sql_query = SELECT * FROM pointings WHERE observatory_code = 'X05' | ||
[SIMULATION] | ||
ar_ang_fov = 1.0 | ||
ar_fov_buffer = 0.1 | ||
ar_picket = 1 | ||
ar_obs_code = X05 | ||
ar_healpix_order = 6 | ||
[FILTERS] | ||
observing_filters = u,g,r,i,z,y | ||
[BRIGHT_LIMITS] | ||
bright_limit = 16,16,16,16,16,16 | ||
[PHASECURVES] | ||
phase_function = HG | ||
[FOV] | ||
camera_model = circle | ||
fill_factor = 0.9 | ||
circle_radius = 3 | ||
[FADINGFUNCTION] | ||
fading_function_on = False | ||
fading_function_width = 0.1 | ||
fading_function_peak_efficiency = 1 | ||
[OUTPUT] | ||
output_format = csv | ||
output_columns = all | ||
[LIGHTCURVE] | ||
lc_model = none | ||
[ACTIVITY] | ||
comet_activity = none | ||
[EXPERT] | ||
randomization_on = True | ||
vignetting_on = True | ||
trailing_losses_on = True | ||
""" | ||
) |