Skip to content

Commit

Permalink
update csv to parquet to use multiprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshCu authored and shorvath-noaa committed Jul 10, 2024
1 parent ae9aeae commit ed8a105
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions src/troute-network/troute/AbstractNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from functools import partial
import pandas as pd
import numpy as np
import multiprocessing
from datetime import datetime, timedelta

import os
Expand Down Expand Up @@ -927,14 +928,16 @@ def get_timesteps_from_nex(nexus_files):
return output_file_timestamps


def split_csv_file(nexus_file, catchment_id, binary_folder):
def split_csv_file(nexus_file, binary_folder):
catchment_id = get_id_from_filename(nexus_file)
# Split the csv file into multiple csv files
# Unescaped command: awk -F ', ' '{ filename="test/tempfile_"$1".csv"; print "114085, "$NF >> filename; close(filename)}' nex-114085_output.csv
cmd = f'awk -F \', \' \'{{ filename="{binary_folder}/tempfile_"$1".csv"; print "{catchment_id}, "$NF >> filename; close(filename) }}\' {nexus_file}'
os.system(cmd)


def rewrite_to_parquet(tempfile_id, output_file_id, binary_folder):
def rewrite_to_parquet(file_args, binary_folder):
tempfile_id, output_file_id = file_args
# Rewrite the csv file to parquet
df = pd.read_csv(f'{binary_folder}/tempfile_{tempfile_id}.csv', names=['feature_id', output_file_id])
df.set_index('feature_id', inplace=True) # Set feature_id as the index
Expand All @@ -946,15 +949,19 @@ def rewrite_to_parquet(tempfile_id, output_file_id, binary_folder):
def nex_files_to_binary(nexus_files, binary_folder):
# Get the output files
output_timesteps = get_timesteps_from_nex(nexus_files)

partial_split_csv_file = partial(split_csv_file, binary_folder=binary_folder)
# Split the csv file into multiple csv files
for nexus_file in nexus_files:
catchment_id = get_id_from_filename(nexus_file)
split_csv_file(nexus_file, catchment_id, binary_folder)
with multiprocessing.Pool() as pool:
pool.map(partial_split_csv_file, nexus_files)

# create a list of tuples to simplify pool.map call
temp_to_timestep_list = list(enumerate(output_timesteps))

partial_rewrite_to_parquet = partial(rewrite_to_parquet, binary_folder=binary_folder)
# Rewrite the temp csv files to parquet
for tempfile_id, nexus_file in enumerate(output_timesteps):
rewrite_to_parquet(tempfile_id, nexus_file, binary_folder)
with multiprocessing.Pool() as pool:
pool.map(partial_rewrite_to_parquet, temp_to_timestep_list)


# Clean up the temp files
os.system(f'rm -rf {binary_folder}/tempfile_*.csv')
Expand Down

0 comments on commit ed8a105

Please sign in to comment.