Skip to content

Commit

Permalink
Update to download/hycom.py. Included validation steps to ensure inte…
Browse files Browse the repository at this point in the history
…grity of the downloaded HYCOM file. Included temporal subsetting to reduce the size of download.
  • Loading branch information
GRautenbach committed Dec 2, 2024
1 parent d798ab8 commit 09a983b
Showing 1 changed file with 197 additions and 20 deletions.
217 changes: 197 additions & 20 deletions download/hycom.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,127 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def check_time_range(start_date, end_date, time_coords):
if (time_coords[0] <= start_date) & (end_date <= time_coords[-1]):
print("Time range is within bounds.")
def is_file_valid(file_path):
"""
Function to check that the file exists and that its not empty.
"""
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
print(f"File {file_path} exists and is non-empty.")
return True
else:
print("Warning: Specified time range exceeds dataset range.")
print(f"File {file_path} does not exist or is empty.")
return False

def is_netcdf_valid(file_path):
"""
Function to check that the netCDF file can open.
"""
try:
with xr.open_dataset(file_path) as ds:
print(f"File {file_path} opened successfully. Variables available: {list(ds.data_vars.keys())}")
return True
except Exception as e:
print(f"Error opening file {file_path}: {e}")
return False

def check_variables(file_path, expected_vars):
"""
Function to check that the netCDF file has the expected variable.
"""
try:
with xr.open_dataset(file_path) as ds:
missing_vars = [var for var in expected_vars if var not in ds.variables]
if not missing_vars:
print(f"All expected variables are present in {file_path}.")
return True
else:
print(f"Missing variables in {file_path}: {missing_vars}")
return False
except Exception as e:
print(f"Error checking variables in file {file_path}: {e}")
return False

def check_time_range(file_path, expected_start, expected_end):
"""
Function to check the time range of the file.
"""
try:
with xr.open_dataset(file_path) as ds:
if 'time' in ds.coords:
file_times = pd.DatetimeIndex(ds['time'].values)
if (file_times.min() <= expected_start) and (file_times.max() >= expected_end):
print(f"Time range in {file_path} is valid: {file_times.min()} to {file_times.max()}")
return True
else:
print(f"Time range in {file_path} is invalid: {file_times.min()} to {file_times.max()}")
return False
else:
print(f"No 'time' coordinate found in {file_path}.")
return False
except Exception as e:
print(f"Error checking time range in file {file_path}: {e}")
return False

def check_data_quality(file_path, variables):
"""
Function to check the quality of the data downloaded.
"""
try:
with xr.open_dataset(file_path) as ds:
for var in variables:
if var in ds:
data = ds[var].values
if data.size == 0:
print(f"Variable {var} has no data in {file_path}.")
return False
else:
pass

if np.ndim(data) == 4:
for dt in range(data[:,0,0,0].size):
if np.isnan(data[dt]).all():
print(f"All values for variable {var} at timestep {dt} are NaN in {file_path}.")
return False
else:
pass
elif np.ndim(data) == 3:
for dt in range(data[:,0,0].size):
if np.isnan(data[dt]).all():
print(f"All values for variable {var} at timestep {dt} are NaN in {file_path}.")
return False
else:
pass

else:
pass

else:
print(f"Variable {var} not found in {file_path}.")
return False

return True

except Exception as e:
print(f"Error checking data quality in file {file_path}: {e}")
return False

def validate_download(file_path, expected_vars, expected_start, expected_end):
"""
Function that calls the validation routines to check the intergrity of the files downloaded.
"""
if not is_file_valid(file_path):
return False
if not is_netcdf_valid(file_path):
return False
if not check_variables(file_path, expected_vars):
return False
if not check_time_range(file_path, expected_start, expected_end):
return False
if not check_data_quality(file_path, expected_vars):
return False
print(f"File {file_path} passed all validation checks.")
return True


def update_var_list(var_list):
var_metadata = {
Expand Down Expand Up @@ -55,11 +171,16 @@ def decode_time_units(time_var):
except Exception as e:
raise RuntimeError(f"Error decoding time units: {e}")

def download_var(var, metadata, domain, depths, save_dir):
def download_var(var, metadata, domain, depths, save_dir, run_date, hdays, fdays):
vars_to_drop = ['salinity_bottom', 'water_temp_bottom', 'water_u_bottom', 'water_v_bottom', 'tau', 'time_offset',
'time_run', 'time1_offset', 'sst', 'sss', 'ssu', 'ssv', 'sic', 'sih', 'siu', 'siv', 'surtx',
'surty', 'steric_ssh']
lon_range, lat_range, depth_range = slice(domain[0], domain[1]), slice(domain[2], domain[3]), slice(depths[0], depths[1])

# Calculate time range for subsetting
start_date = pd.Timestamp(run_date) - timedelta(days=hdays)
end_date = pd.Timestamp(run_date) + timedelta(days=fdays)
time_range = slice(start_date, end_date)

MAX_RETRIES, RETRY_WAIT = 3, 20
variable=None
Expand All @@ -74,7 +195,8 @@ def download_var(var, metadata, domain, depths, save_dir):
engine="netcdf4").sel(lat=lat_range,
lon=lon_range)
if 'time' in ds:
ds['time'] = decode_time_units(ds['time'])
ds['time'] = decode_time_units(ds['time'])
ds = ds.sel(time=time_range)

variable = ds[metadata["vars"][0]]
if variable.ndim == 4:
Expand All @@ -100,12 +222,12 @@ def download_var(var, metadata, domain, depths, save_dir):
# Explicitly delete large variables to free up memory
del variable

def download_vars_parallel(variables, domain, depths, workers, save_dir):
def download_vars_parallel(variables, domain, depths, run_date, hdays, fdays, workers, save_dir):
var_metadata = update_var_list(variables)

with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_var = {
executor.submit(download_var, var, metadata, domain, depths, save_dir): var
executor.submit(download_var, var, metadata, domain, depths, save_dir, run_date, hdays, fdays): var
for var, metadata in var_metadata.items()
}

Expand All @@ -116,19 +238,74 @@ def download_vars_parallel(variables, domain, depths, workers, save_dir):
except Exception as e:
print(f"Download failed for {var}: {e}")

def download_hycom(variables, domain, depths, run_date, hdays, fdays, save_dir,workers=None):
def download_hycom(variables, domain, depths, run_date, hdays, fdays, save_dir, workers=None):
"""
Downloads the HYCOM analysis in daily outputs using xarrray opendap.
This function does check the integrity of the file. If the file is corrupt the download retries - max retries is 3.
INPUTS:
variables: List of variables to download (e.g. ['salinity', 'water_temp', 'surface_el', 'water_u', 'water_v'])
domain : List of geographical coordinates to subset the data and download (e.g. [lon_min,lon_max,lat_min,lat_max]).
depths : List of minimum and maximum depths to download. Values must be positive (e.g. [0,5000]).
run_date : Todays datetime to download (e.g. datetime.datetime(YYYY,MM,DD)).
hdays : Days to hindcast (e.g. hdays=5).
fdays : Days to forecast (e.g. fdays=5).
save_dir : Directory to save the downloaded data (eg. save_dir='/path/and/directory/to/save/').
workers : It is the number of variables to download in parallel. Default is None, in which cases it downloads all the variables in paralell.
To note, the number of workers cannot exceed the number of variables.
OUTPUT:
NetCDF file containing the most recent HYCOM forcast run.
"""
if workers is None:
workers=len(variables)
else:
pass
download_vars_parallel(variables, domain, depths, workers, save_dir)
start_date = pd.Timestamp(run_date) - timedelta(days=hdays)
end_date = pd.Timestamp(run_date) + timedelta(days=fdays)
ds = xr.open_mfdataset(os.path.join(save_dir, 'hycom_*.nc'))
check_time_range(start_date, end_date, ds.coords['time'].values)
outfile = os.path.abspath(os.path.join(save_dir, f"HYCOM_{run_date.strftime('%Y%m%d_%H')}.nc"))
if os.path.exists(outfile):
os.remove(outfile)
ds.to_netcdf(outfile, 'w')
subprocess.call(["chmod", "-R", "775", outfile])
print('Created:', outfile)

MAX_TRIES=3
for attempt in range(MAX_TRIES):
print( '--------------------------------------------')
print(f' Attempt {attempt} out of {MAX_TRIES} ')
print( '--------------------------------------------')

download_vars_parallel(variables, domain, depths, run_date, hdays, fdays, workers, save_dir)

ds = xr.open_mfdataset(os.path.join(save_dir, 'hycom_*.nc'))

outfile = os.path.abspath(os.path.join(save_dir, f"HYCOM_{run_date.strftime('%Y%m%d_%H')}.nc"))

if os.path.exists(outfile):
os.remove(outfile)

ds.to_netcdf(outfile, 'w')

subprocess.call(["chmod", "-R", "775", outfile])
print('')
print('Validating download...')
print('')

start_date = pd.Timestamp(run_date) - timedelta(days=hdays)
end_date = pd.Timestamp(run_date) + timedelta(days=fdays)

if validate_download(outfile, variables, start_date, end_date):
print('')
print("File downloaded and validated successfully!")
print('')
print('created: ', outfile)
print('')
break
else:
print('')
print("File validation failed and retrying the download")
print('')


if __name__ == '__main__':
run_date = pd.to_datetime('2024-12-02 12:00:00')
hdays = 5
fdays = 5
variables = ['salinity','water_temp','surf_el','water_u','water_v']
domain = [23,25,-37,-36]
depths = [0,5000]
save_dir = '/home/g.rautenbach/Projects/somisana-croco/DATASETS_CROCOTOOLS/HYCOM/'
download_hycom(variables, domain, depths, run_date, hdays, fdays, save_dir, workers=None)

0 comments on commit 09a983b

Please sign in to comment.