Skip to content

Commit

Permalink
Adapted to hanlde multiple sources: daily data, merge, and rename vars
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesus Perez Curbelo (ame805) committed Aug 20, 2024
1 parent d168a4b commit f1b0f15
Show file tree
Hide file tree
Showing 9 changed files with 2,278 additions and 1,526 deletions.
4 changes: 4 additions & 0 deletions camels_spat2nh-1058854.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Unusable basins: 13
{'05RE002', '07BJ006', '08KH011', '08AA007', '07QD002', '09AA004', '08MG020', '09AE002', '08LD003', '07SB017', '08MG022', '06DA001', '08LF023'}
Basins to process: 1698
Processing USA...
1,452 changes: 0 additions & 1,452 deletions camels_spat2nh-475357.out

This file was deleted.

102 changes: 74 additions & 28 deletions camels_spat2nh.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import sys
import xarray as xr
import pandas as pd
import yaml
import time
from functools import reduce

from functools import partial
import concurrent.futures
Expand All @@ -17,7 +17,8 @@
ROOT_DIR = os.path.abspath(current_dir)
sys.path.append(ROOT_DIR)

MULTIPROCESSING = True
MULTIPROCESSING = 1
ONLY_TESTING = 0

FILTER_BY_CYRIL = True

Expand All @@ -33,6 +34,10 @@ def camels_spat2nh(data_dir, data_gen, unusuable_basins):
basin_data_path = os.path.join(data_dir_src, 'basin_data')
list_basin_files = sorted(os.listdir(basin_data_path))

# Input data
input_vars = data_gen['input_vars']
# Get the input variables that appear repeatedly
input_vars_repeated = set([var for var in input_vars if input_vars.count(var) > 1])

# Drop if file already exists
for basin_f in list_basin_files[:]:
Expand All @@ -58,38 +63,41 @@ def camels_spat2nh(data_dir, data_gen, unusuable_basins):
for country in countries:
basin_data_path_dict[country] = [basin for basin in list_basin_files if basin[:3] == country]

# # Filtering by cyril basins
# if FILTER_BY_CYRIL:
# cyril_list = get_cyril_basins()
# else:
# cyril_list = None
# ## Do not delete unless you know what you are doing
# # # Filtering by cyril basins
# # if FILTER_BY_CYRIL:
# # cyril_list = get_cyril_basins()
# # else:
# # cyril_list = None

# print('Cyril basins:', len(cyril_list))
# print(cyril_list[:5])
# print(cyril_list[-5:])
# # print('Cyril basins:', len(cyril_list))
# # print(cyril_list[:5])
# # print(cyril_list[-5:])

# return
# # return

## Process data for each basin and save to csv file
for country in countries[:]:
# Create a folder for each country
country_dir = os.path.join(data_dir_out, f'CAMELS_spat_{country}')
country_dir = os.path.join(data_dir_out, f'CAMELS_spat_{country}_{len(data_sources)}sources')
if not os.path.exists(country_dir ):
os.makedirs(country_dir)

if MULTIPROCESSING:

print(f"Processing {country}...")

with concurrent.futures.ProcessPoolExecutor() as executor:
max_workers = int(os.environ.get('SLURM_CPUS_PER_TASK', 32))
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
# Define a partial function with fixed non-iterable arguments
partial_process = partial(processBasinSave2CSV, basin_data_path=basin_data_path,
country_dir=country_dir,
relative_path_forc=relative_path_forc,
relative_path_targ=relative_path_targ,
data_sources=data_sources,
data_gen=data_gen,
unusuable_basins=unusuable_basins)
unusuable_basins=unusuable_basins,
input_vars_repeated=input_vars_repeated)

# Process each basin concurrently
futures = [executor.submit(partial_process, basin_f) for basin_f in basin_data_path_dict[country]]
Expand All @@ -104,12 +112,15 @@ def camels_spat2nh(data_dir, data_gen, unusuable_basins):
else:
for basin_f in basin_data_path_dict[country]:
processBasinSave2CSV(basin_f, basin_data_path, country_dir, relative_path_forc,
relative_path_targ, data_sources, data_gen, unusuable_basins)
relative_path_targ, data_sources, data_gen, unusuable_basins, input_vars_repeated)

def processBasinSave2CSV(basin_f, basin_data_path, country_dir,
relative_path_forc, relative_path_targ,
data_sources, data_gen, unusuable_basins,
input_vars_repeated,
cyril_list=None):

print(f"Let's try {basin_f}...")

basin_id = basin_f.split('_')[-1]
csv_file_name = os.path.join(country_dir, basin_f + '.csv')
Expand All @@ -124,20 +135,25 @@ def processBasinSave2CSV(basin_f, basin_data_path, country_dir,
print('\n', basin_f[:3], '->', basin_id)
df_src_dict = {}
for src in data_sources:

folder2load = os.path.join(basin_data_path, basin_f, relative_path_forc)
# print('folder2load', folder2load)
eras_files = sorted([f for f in os.listdir(folder2load) if src in f])

print(f'{src}_files', len(eras_files), '->', folder2load)

# Check if only testing
if ONLY_TESTING:
continue

# Check whether there are files to load
if len(eras_files) == 0:
continue

# Initialize an empty list to store the xarray datasets
datasets = []
# Iterate over the files and load each dataset
for file2load in eras_files[:]:
for file2load in eras_files: ### [:5] for testing

# If not .temp file
if '.tmp' not in file2load:
Expand All @@ -153,23 +169,48 @@ def processBasinSave2CSV(basin_f, basin_data_path, country_dir,

# Reduce basin_data to daily values
basin_data_reduced = reduceDataByDay(concatenated_dataset, data_gen['input_vars'],
data_gen['sum_vars'])
data_gen['sum_vars'], input_vars_repeated, src.lower())
# Convert the reduced basin_data to a DataFrame, dropping the 'hru' dimension
basin_data_df = basin_data_reduced.to_dataframe().droplevel('hru').reset_index()


print('basin_data_df', basin_data_df.head())

# Save to dict
df_src_dict[src] = basin_data_df


# Check if only testing
if ONLY_TESTING:
return None

print('basin', basin_f, '->', df_src_dict.keys())
# Check if there are two data sources in df_src_dict.keys() (expected ERA5 and EM_EARTH)
if len(df_src_dict.keys()) == 2:
# Merge dataframes in the dictionary
df_merged_inp = df_src_dict[data_sources[0]].merge(df_src_dict[data_sources[1]], on='time')
# Check if there are len(data_sources) data sources in df_src_dict.keys() (expected ERA5, EM_EARTH, daymet, and RDRS)

# if len(df_src_dict.keys()) == len(data_sources):
# # Merge dataframes in the dictionary
# df_merged_inp = df_src_dict[data_sources[0]].merge(df_src_dict[data_sources[1]], on='time')
# elif len(df_src_dict.keys()) == 1:
# df_merged_inp = df_src_dict[list(df_src_dict.keys())[0]]

if len(df_src_dict.keys()) == len(data_sources):
# Dynamically merge all dataframes in the dictionary based on the 'time' column using an outer join
df_merged_inp = reduce(lambda left, right: pd.merge(left, right, on='time', how='outer'),
[df_src_dict[src] for src in data_sources])
elif len(df_src_dict.keys()) == 1:
df_merged_inp = df_src_dict[list(df_src_dict.keys())[0]]

else:
raise ValueError("The number of data sources does not match the keys in the dictionary.")

# Rename time by date
df_merged_inp.rename(columns={'time': 'date'}, inplace=True)

# print('df_merged_inp', df_merged_inp.head())
# # Print data_vars
# for var in df_merged_inp.columns:
# print(var)
# if var not in data_gen['input_vars']:
# print('Variable not in inputs:', var)
# aux = input('Enter to continue')

## Load target data
target_data = xr.open_dataset(os.path.join(basin_data_path, basin_f, relative_path_targ,
Expand All @@ -186,6 +227,15 @@ def processBasinSave2CSV(basin_f, basin_data_path, country_dir,

# Merge input and target dataframes
df_merged = df_merged_inp.merge(df_target, on='date')


# print('df_merged', df_merged_inp.head())
# # Print data_vars
# for var in df_merged.columns:
# print(var)
# if var not in data_gen['input_vars']:
# print('Variable not in inputs:', var)
# aux = input('Enter to continue')

# Save to file
print("Saving to file...", os.path.join(country_dir, basin_id + '.csv'))
Expand All @@ -201,13 +251,9 @@ def get_cyril_basins():
cyril_list = [line[4:].strip() for line in cyril_list]
return cyril_list




if __name__ == '__main__':


# camels_spat2nh()
# Load data
data_dir, data_gen = load_util_data(ROOT_DIR)

# Load Unusable basins
Expand Down
2 changes: 1 addition & 1 deletion camels_spat2nh.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#SBATCH --job-name=camels_spat2nh
#SBATCH --time=2-00:00:00
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=32
#SBATCH --cpus-per-task=32
#SBATCH --mem=64G
#SBATCH --output=camels_spat2nh-%j.out

Expand Down
88 changes: 77 additions & 11 deletions camels_spat_exploring.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,18 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 3,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The autoreload extension is already loaded. To reload it, use:\n",
" %reload_ext autoreload\n"
]
}
],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
Expand Down Expand Up @@ -46,7 +55,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 4,
"metadata": {},
"outputs": [
{
Expand All @@ -55,7 +64,7 @@
"text": [
"xarray version: 2023.1.0\n",
"pandas version: 2.0.3\n",
"numpy version: 1.24.4\n",
"numpy version: 1.25.2\n",
"yaml version: 6.0.1\n"
]
}
Expand Down Expand Up @@ -84,11 +93,54 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 6,
"metadata": {},
"outputs": [],
"outputs": [
{
"data": {
"text/plain": [
"({'data_dir_camels_spat': '/project/gwf/gwf_cmt/wknoben/camels_spat/camels-spat-data',\n",
" 'relative_path_forcing': 'forcing/lumped',\n",
" 'relative_path_target': 'observations',\n",
" 'countries': ['CAN', 'USA'],\n",
" 'camels_spat_metadata': 'camels_spat_metadata.csv',\n",
" 'camels_spat_unusable': 'camels_spat_unusable.csv',\n",
" 'summa_ml_models_dir': '/gladwell/hydrology/SUMMA/summa-ml-models',\n",
" 'data_dir_camels_spat_nh': '../CAMELS_spat_NH'},\n",
" {'countries': ['CAN'],\n",
" 'camels_spat_metadata': 'camels_spat_metadata.csv',\n",
" 'camels_spat_unusable': 'camels_spat_unusable.csv',\n",
" 'camels_spat_dates_stats': 'camels_spat_1426_dates_stats.csv',\n",
" 'data_sources': ['daymet'],\n",
" 'input_vars': ['prcp',\n",
" 'tmean',\n",
" 'mtpr',\n",
" 'msdwswrf',\n",
" 'msdwlwrf',\n",
" 'msnswrf',\n",
" 'msnlwrf',\n",
" 'mper',\n",
" 't',\n",
" 'u',\n",
" 'v',\n",
" 'q',\n",
" 'sp',\n",
" 'e',\n",
" 'rh',\n",
" 'w',\n",
" 'phi'],\n",
" 'target_vars': ['q_obs'],\n",
" 'sum_vars': ['prcp', 'mtpr', 'q_obs']})"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"data_dir, data_gen = load_util_data(CURRENT_DIR)"
"data_dir, data_gen = load_util_data(CURRENT_DIR)\n",
"data_dir, data_gen"
]
},
{
Expand All @@ -100,7 +152,7 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -110,9 +162,23 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 8,
"metadata": {},
"outputs": [],
"outputs": [
{
"ename": "FileNotFoundError",
"evalue": "[Errno 2] No such file or directory: '../CAMELS_spat_NH/CAMELS_spat_CAN'",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mFileNotFoundError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[8], line 2\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;66;03m# Call the function to calculate statistics and plot histograms\u001b[39;00m\n\u001b[0;32m----> 2\u001b[0m time_stats \u001b[38;5;241m=\u001b[39m \u001b[43mcalculate_and_plot_time_statistics\u001b[49m\u001b[43m(\u001b[49m\u001b[43mbasins_dir\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcountries\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m/gpfs/mdiops/gwf/gwf_cmt/jcurbelo/camels-spat-to-nh/utils/utils.py:147\u001b[0m, in \u001b[0;36mcalculate_and_plot_time_statistics\u001b[0;34m(basins_dir, countries)\u001b[0m\n\u001b[1;32m 145\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m country \u001b[38;5;129;01min\u001b[39;00m countries:\n\u001b[1;32m 146\u001b[0m country_dir \u001b[38;5;241m=\u001b[39m os\u001b[38;5;241m.\u001b[39mpath\u001b[38;5;241m.\u001b[39mjoin(basins_dir, \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mCAMELS_spat_\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mcountry\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m'\u001b[39m)\n\u001b[0;32m--> 147\u001b[0m basin_ids, start_dates, end_dates, total_time_lengths \u001b[38;5;241m=\u001b[39m \u001b[43mcalculate_time_stats\u001b[49m\u001b[43m(\u001b[49m\u001b[43mcountry_dir\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 148\u001b[0m time_stats[country] \u001b[38;5;241m=\u001b[39m {\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mStation_id\u001b[39m\u001b[38;5;124m'\u001b[39m: basin_ids, \u001b[38;5;124m'\u001b[39m\u001b[38;5;124mStart_date\u001b[39m\u001b[38;5;124m'\u001b[39m: start_dates, \u001b[38;5;124m'\u001b[39m\u001b[38;5;124mEnd_date\u001b[39m\u001b[38;5;124m'\u001b[39m: end_dates, \u001b[38;5;124m'\u001b[39m\u001b[38;5;124mTotal_years\u001b[39m\u001b[38;5;124m'\u001b[39m: total_time_lengths}\n\u001b[1;32m 150\u001b[0m \u001b[38;5;66;03m# Extract data for plotting\u001b[39;00m\n",
"File \u001b[0;32m/gpfs/mdiops/gwf/gwf_cmt/jcurbelo/camels-spat-to-nh/utils/utils.py:94\u001b[0m, in \u001b[0;36mcalculate_time_stats\u001b[0;34m(country_dir)\u001b[0m\n\u001b[1;32m 84\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcalculate_time_stats\u001b[39m(country_dir):\n\u001b[1;32m 85\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m'''\u001b[39;00m\n\u001b[1;32m 86\u001b[0m \u001b[38;5;124;03m Calculate start dates, end dates, and total time lengths for each basin\u001b[39;00m\n\u001b[1;32m 87\u001b[0m \u001b[38;5;124;03m Args:\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 92\u001b[0m \u001b[38;5;124;03m total_time_lengths: list, total time lengths for each basin\u001b[39;00m\n\u001b[1;32m 93\u001b[0m \u001b[38;5;124;03m '''\u001b[39;00m\n\u001b[0;32m---> 94\u001b[0m files \u001b[38;5;241m=\u001b[39m \u001b[43mos\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mlistdir\u001b[49m\u001b[43m(\u001b[49m\u001b[43mcountry_dir\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 95\u001b[0m basin_ids \u001b[38;5;241m=\u001b[39m []\n\u001b[1;32m 96\u001b[0m start_dates \u001b[38;5;241m=\u001b[39m []\n",
"\u001b[0;31mFileNotFoundError\u001b[0m: [Errno 2] No such file or directory: '../CAMELS_spat_NH/CAMELS_spat_CAN'"
]
}
],
"source": [
"# Call the function to calculate statistics and plot histograms\n",
"time_stats = calculate_and_plot_time_statistics(basins_dir, countries)\n"
Expand Down Expand Up @@ -1214,7 +1280,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"version": "3.11.5"
}
},
"nbformat": 4,
Expand Down
Loading

0 comments on commit f1b0f15

Please sign in to comment.