Skip to content

Commit

Permalink
Updated script - loose unsuable basins
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesus Perez Curbelo (ame805) committed May 28, 2024
1 parent 1912856 commit d168a4b
Show file tree
Hide file tree
Showing 11 changed files with 6,511 additions and 34 deletions.
1,452 changes: 1,452 additions & 0 deletions camels_spat2nh-475357.out

Large diffs are not rendered by default.

2,870 changes: 2,870 additions & 0 deletions camels_spat2nh-475358.out

Large diffs are not rendered by default.

113 changes: 85 additions & 28 deletions camels_spat2nh.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

MULTIPROCESSING = True

FILTER_BY_CYRIL = True

def camels_spat2nh(data_dir, data_gen, unusuable_basins):

Expand All @@ -31,11 +32,23 @@ def camels_spat2nh(data_dir, data_gen, unusuable_basins):
## Basins data
basin_data_path = os.path.join(data_dir_src, 'basin_data')
list_basin_files = sorted(os.listdir(basin_data_path))

print(list_basin_files)

aux = input("Press Enter to continue...")



# Drop if file already exists
for basin_f in list_basin_files[:]:
# Check if file exists
csv_file_name = os.path.join(data_dir_out, f'CAMELS_spat_{basin_f[0:3]}', basin_f[4:] + '.csv')
if os.path.exists(csv_file_name):
print(f"File {csv_file_name} already exists")
if basin_f[4:] in unusuable_basins:
# Delete file
os.remove(csv_file_name)

# Remove from list
list_basin_files.remove(basin_f)

print('Basins to process:', len(list_basin_files))

## General data
countries = data_gen['countries']
data_sources = data_gen['data_sources']
Expand All @@ -44,69 +57,93 @@ def camels_spat2nh(data_dir, data_gen, unusuable_basins):
basin_data_path_dict = {}
for country in countries:
basin_data_path_dict[country] = [basin for basin in list_basin_files if basin[:3] == country]


# # Filtering by cyril basins
# if FILTER_BY_CYRIL:
# cyril_list = get_cyril_basins()
# else:
# cyril_list = None

# print('Cyril basins:', len(cyril_list))
# print(cyril_list[:5])
# print(cyril_list[-5:])

# return

## Process data for each basin and save to csv file
for country in countries[:]:
# Create a folder for each country
country_dir = os.path.join(data_dir_out, f'CAMELS_spat_{country}')
if not os.path.exists(country_dir ):
os.makedirs(country_dir)
# else:
# print(f"Directory {country_dir} already exists")

# for basin_f in basin_data_path_dict[country]:
# processBasinSave2CSV(basin_f, basin_data_path, country_dir, relative_path_forc,
# relative_path_targ, data_sources, data_gen)

if MULTIPROCESSING:

print(f"Processing {country}...")

with concurrent.futures.ProcessPoolExecutor() as executor:
# Define a partial function with fixed non-iterable arguments
partial_process = partial(processBasinSave2CSV, basin_data_path=basin_data_path,
country_dir=country_dir, relative_path_forc=relative_path_forc,
relative_path_targ=relative_path_targ, data_sources=data_sources,
data_gen=data_gen, unusuable_basins=unusuable_basins)
country_dir=country_dir,
relative_path_forc=relative_path_forc,
relative_path_targ=relative_path_targ,
data_sources=data_sources,
data_gen=data_gen,
unusuable_basins=unusuable_basins)

# Process each basin concurrently
futures = [executor.submit(partial_process, basin_f) for basin_f in basin_data_path_dict[country]]

# Wait for all tasks to complete
# Wait for all tasks to complete and handle exceptions
for future in concurrent.futures.as_completed(futures):
_ = future.result() # Get the result if needed
try:
result = future.result() # Get the result if needed
except Exception as e:
print(f"Error processing {future}: {e}")

else:
for basin_f in basin_data_path_dict[country]:
processBasinSave2CSV(basin_f, basin_data_path, country_dir, relative_path_forc,
relative_path_targ, data_sources, data_gen, unusuable_basins)

def processBasinSave2CSV(basin_f, basin_data_path, country_dir,
relative_path_forc, relative_path_targ,
data_sources, data_gen, unusuable_basins):
data_sources, data_gen, unusuable_basins,
cyril_list=None):

basin_id = basin_f.split('_')[-1]
csv_file_name = os.path.join(country_dir, basin_f + '.csv')
if os.path.exists(csv_file_name):
# if 5>10:
print(f"File {csv_file_name} already exists")
if basin_f[4:] in unusuable_basins:
# Delete file
os.remove(csv_file_name)
os.remove(csv_file_name)
elif basin_f[4:] in unusuable_basins:
print(f"Skipping basin {basin_f} - unusable basin")
else:
print('\n', basin_f)
print('\n', basin_f[:3], '->', basin_id)
df_src_dict = {}
for src in data_sources:
folder2load = os.path.join(basin_data_path, basin_f, relative_path_forc)
# print('folder2load', folder2load)
eras_files = sorted([f for f in os.listdir(folder2load) if src in f])

print(f'{src}_files', len(eras_files), '->', folder2load)

# Check whether there are files to load
if len(eras_files) == 0:
continue

# Initialize an empty list to store the xarray datasets
datasets = []
# Iterate over the files and load each dataset
for file2load in eras_files[:5]:
for file2load in eras_files[:]:

# If not .temp file
if '.tmp' not in file2load:

sys.stdout.write(f'\r>> {file2load}')
sys.stdout.flush()
# sys.stdout.write(f'\r>> {file2load}')
# sys.stdout.flush()

basin_data = xr.open_dataset(os.path.join(folder2load, file2load))
datasets.append(basin_data)
Expand All @@ -123,8 +160,13 @@ def processBasinSave2CSV(basin_f, basin_data_path, country_dir,
# Save to dict
df_src_dict[src] = basin_data_df

# Merge dataframes in the dictionary
df_merged_inp = df_src_dict[data_sources[0]].merge(df_src_dict[data_sources[1]], on='time')
print('basin', basin_f, '->', df_src_dict.keys())
# Check if there are two data sources in df_src_dict.keys() (expected ERA5 and EM_EARTH)
if len(df_src_dict.keys()) == 2:
# Merge dataframes in the dictionary
df_merged_inp = df_src_dict[data_sources[0]].merge(df_src_dict[data_sources[1]], on='time')
elif len(df_src_dict.keys()) == 1:
df_merged_inp = df_src_dict[list(df_src_dict.keys())[0]]

# Rename time by date
df_merged_inp.rename(columns={'time': 'date'}, inplace=True)
Expand All @@ -146,8 +188,20 @@ def processBasinSave2CSV(basin_f, basin_data_path, country_dir,
df_merged = df_merged_inp.merge(df_target, on='date')

# Save to file
print("Saving to file...")
df_merged.to_csv(os.path.join(country_dir, basin_f + '.csv'), index=False)
print("Saving to file...", os.path.join(country_dir, basin_id + '.csv'))
df_merged.to_csv(os.path.join(country_dir, basin_f[4:] + '.csv'), index=False)

def get_cyril_basins():

# Load cyril basins from data / liste_BV_CAMELS-spat_928.txt
cyril_file = os.path.join(ROOT_DIR, 'data', 'liste_BV_CAMELS-spat_928.txt')
with open(cyril_file, 'r') as f:
cyril_list = f.readlines()
# Remove 'XXX_' from the beginning of each line
cyril_list = [line[4:].strip() for line in cyril_list]
return cyril_list




if __name__ == '__main__':
Expand All @@ -159,6 +213,9 @@ def processBasinSave2CSV(basin_f, basin_data_path, country_dir,
# Load Unusable basins
unusuable_basins = get_unusable_basins(data_dir['data_dir_camels_spat_nh'], data_gen['camels_spat_unusable'])

print('Unusable basins:', len(unusuable_basins))
print(unusuable_basins)

## Let's profile the loop
start_time = time.time()
camels_spat2nh(data_dir, data_gen, unusuable_basins)
Expand Down
16 changes: 16 additions & 0 deletions camels_spat2nh.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

#SBATCH --account=hpc_c_giws_spiteri
#SBATCH --job-name=camels_spat2nh
#SBATCH --time=2-00:00:00
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=32
#SBATCH --mem=64G
#SBATCH --output=camels_spat2nh-%j.out

module load python/3.11.5

# Path to your virtual environment's activation script
source venv-camelsspat/bin/activate

python3 camels_spat2nh.py
37 changes: 37 additions & 0 deletions camels_spat_attributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import os
import sys
import pandas as pd
from pathlib import Path

from utils.utils import load_util_data, get_unusable_basins

# Get the root directory of the project with Path
ROOT_DIR = Path(__file__).resolve().parents[0]

if __name__ == '__main__':

print(ROOT_DIR)
print(os.listdir(ROOT_DIR))

# camels_spat2nh()
data_dir, data_gen = load_util_data(str(ROOT_DIR))

# Load Unusable basins
unusuable_basins = get_unusable_basins(data_dir['data_dir_camels_spat_nh'], data_gen['camels_spat_unusable'])

# # Load files in data_dir['data_dir_camels_spat'] / 'attributes'
# attributes_dir = Path(data_dir['data_dir_camels_spat']) / 'attributes'

# # Loas 1st file in attributes_dir
# file = sorted(os.listdir(attributes_dir))[0]
# df = pd.read_csv(attributes_dir / file)
# print(df.head())

att_file_dir = Path(data_dir['data_dir_camels_spat']) / 'camels_spat_attributes.csv'
df = pd.read_csv(att_file_dir)
print(df.head())

# Copy attribute file to ROOT_DIR / 'data'
att_file_dir_out = ROOT_DIR / 'data' / 'camels_spat_attributes.csv'
df.to_csv(att_file_dir_out, index=False)

1,113 changes: 1,113 additions & 0 deletions data/camels_spat_attributes.csv

Large diffs are not rendered by default.

Loading

0 comments on commit d168a4b

Please sign in to comment.