Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid df/str copies and unneeded iterables #55

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion reView/pages/rev/controller/element_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ def _distributions(self, data, y_var, nbins=100):
pdf = count / sum(count)
cdf = np.cumsum(pdf)
df = pd.DataFrame({y_var: cbins, "cdf": cdf, "pdf": pdf})
df = df.reset_index()
df.reset_index(inplace=True)
return df

def _histogram(self, main_df, y_var, bins):
Expand Down
2 changes: 1 addition & 1 deletion reView/pages/rev/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ def composite(dfs, composite_variable="total_lcoe",
"""Return a single least cost df from a list dfs."""
# Make one big data frame
bdf = pd.concat(dfs)
bdf = bdf.reset_index(drop=True)
bdf.reset_index(drop=True, inplace=True)

# Group, find minimum, and subset
if composite_function == "min":
Expand Down
87 changes: 45 additions & 42 deletions reView/utils/bespoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ def to_wgs(self, rdf):
always_xy=True
)
lons, lats = transformer.transform(xs, ys)
rdf["longitude"] = lons
rdf["latitude"] = lats
del rdf["x"]
del rdf["y"]
rdf["x"] = lons
rdf["y"] = lats
rdf.rename(columns={'x': 'longitude', 'y': 'latitude'}, inplace=True)
rdf = rdf[self.df.columns]
return rdf

Expand All @@ -110,8 +109,7 @@ def unpack_turbines(self, drop_sc_points=False):

# Get coordinates from equal area projection
x, y = self.get_xy(row)
del row["longitude"]
del row["latitude"]
row.drop(['longitude', 'latitude'], inplace=True)

# Get bottom left coordinates
blx = x - (self.spacing / 2)
Expand All @@ -123,28 +121,21 @@ def unpack_turbines(self, drop_sc_points=False):
xs = [x + blx for x in xs]
ys = [y + bly for y in ys]

# Build new data frame entries for each turbine
nrows = []

# use len(xs) to determine number of turbines because
# nturbines does not appear to be a standard column
turbine_capacity_mw = row['capacity'] / len(xs)

for i, x in enumerate(xs):
nrow = row.copy()
# overwrite existing capacity column (which is typically system
# capacity in mw) with turbine capacity in kw for this turbine row.
# This maintains compatibility with how capacity is summed and
# displayed in the dashboard
nrow["capacity"] = turbine_capacity_mw
nrow["x"] = x
nrow["y"] = ys[i]
nrows.append(nrow)

# Build new data frame
rdf = pd.DataFrame(nrows)
rdf = rdf.reset_index(drop=True)
rdf.index = df.index[-1] + rdf.index + 1
# Build new data frame with a row for each turbine
new_index = range(df.index[-1] + 1, df.index[-1] + 1 + len(xs))
rdf = pd.DataFrame([row]*len(xs), index=new_index)

# overwrite existing capacity column (which is typically system
# capacity in mw) with turbine capacity in kw for this turbine row.
# This maintains compatibility with how capacity is summed and
# displayed in the dashboard
rdf['capacity'] = turbine_capacity_mw
rdf['x'] = xs
rdf['y'] = ys

# Convert back to WGS84
rdf = self.to_wgs(rdf)
Expand All @@ -154,7 +145,7 @@ def unpack_turbines(self, drop_sc_points=False):

# Replace the original row with one of the new rows.
df.iloc[self.index] = rdf.iloc[-1]
rdf = rdf.iloc[:-1]
rdf.drop(rdf.index[-1], inplace=True)
df = pd.concat([df, rdf])

return df
Expand All @@ -179,7 +170,8 @@ def batch_unpack_from_supply_curve(sc_df, n_workers=1):
Parameters
----------
sc_df : pd.core.frame.DataFrame
A reV supply curve pandas data frame.
A reV supply curve pandas data frame. This will get modified in
place.
n_workers : int
Number of workers to use for parallel processing.
Default is 1 which will run in serial (and will be slow).
Expand All @@ -193,52 +185,63 @@ def batch_unpack_from_supply_curve(sc_df, n_workers=1):
"""

# cap nb_workers to the total CPUs on the machine/node
if n_workers > cpu_count():
n_workers = cpu_count()
n_workers = min(cpu_count(), n_workers)

if n_workers > 1:
# initialize functionality for parallela dataframe.apply
pandarallel.initialize(
progress_bar=True, nb_workers=n_workers, use_memory_fs=False)

# filter out supply curve points with no capacity (i.e., no turbines)
sc_developable_df = sc_df[sc_df['capacity'] > 0].copy()
sc_df = sc_df[sc_df['capacity'] > 0]
# reset the index because otherwise the unpacker will get messed up
sc_developable_df.reset_index(drop=True, inplace=True)
sc_df.reset_index(drop=True, inplace=True)

# unpack the turbine coordinates
if n_workers > 1:
# run in parallel
all_turbines = sc_developable_df.parallel_apply(
all_turbines = sc_df.parallel_apply(
lambda row:
BespokeUnpacker(
sc_developable_df,
sc_df,
sc_point_gid=row['sc_point_gid']
).unpack_turbines(drop_sc_points=True),
axis=1
)
else:
# run in serial
all_turbines = sc_developable_df.apply(
all_turbines = sc_df.apply(
lambda row:
BespokeUnpacker(
sc_developable_df,
sc_df,
sc_point_gid=row['sc_point_gid']
).unpack_turbines(drop_sc_points=True),
axis=1
)

# stack the results back into a single df
all_turbines_df = pd.concat(all_turbines.tolist())
all_turbines_df = pd.concat(all_turbines.values)

# extract the geometries
all_turbines_df['geometry'] = all_turbines_df.apply(
lambda row: geometry.Point(
row['longitude'],
row['latitude']
),
axis=1
)
if n_workers > 1:
# run in parallel
all_turbines_df['geometry'] = all_turbines_df.parallel_apply(
lambda row: geometry.Point(
row['longitude'],
row['latitude']
),
axis=1
)
else:
# run in serial
all_turbines_df['geometry'] = all_turbines_df.apply(
lambda row: geometry.Point(
row['longitude'],
row['latitude']
),
axis=1
)

# turn into a geodataframe
all_turbines_gdf = gpd.GeoDataFrame(all_turbines_df, crs='EPSG:4326')

Expand Down
19 changes: 7 additions & 12 deletions reView/utils/characterizations.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,13 @@ def recast_categories(df, col, lkup, cell_size_sq_km):
col_df = pd.DataFrame(col_data)
col_df.fillna(0, inplace=True)
col_df.drop(
columns=[c for c in col_df.columns if c not in lkup.keys()],
columns=[c for c in col_df.columns if c not in lkup],
inplace=True
)
col_df.rename(columns=lkup, inplace=True)
if cell_size_sq_km is not None:
col_df *= cell_size_sq_km
col_df.rename(
columns={c: f"{c}_area_sq_km" for c in col_df.columns},
inplace=True
)
col_df.columns += "_area_sq_km"

col_df.index = df.index

Expand Down Expand Up @@ -185,8 +182,6 @@ def unpack_characterizations( # noqa: C901
elif method is None:
warnings.warn(f"Skipping {char_col}: No method provided")

in_df = in_df.copy()

return in_df


Expand Down Expand Up @@ -216,14 +211,14 @@ def validate_characterization_remapper( # noqa: C901
parameters are encountered in characterization_remapper.
"""

characterization_cols = list(characterization_remapper.keys())
df_cols = supply_curve_df.columns.tolist()
cols_not_in_df = list(set(characterization_cols).difference(set(df_cols)))
if len(cols_not_in_df) > 0:
if any(key not in supply_curve_df.columns
for key in characterization_remapper):
keys = [key not in supply_curve_df.columns
for key in characterization_remapper]
raise KeyError(
"Invalid column name(s) in characterization_remapper. "
"The following column name(s) were not found in the input "
f"dataframe: {cols_not_in_df}."
f"dataframe: {keys}."
)

for col_name, col_remapper in characterization_remapper.items():
Expand Down
36 changes: 17 additions & 19 deletions reView/utils/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

logger = logging.getLogger(__name__)


TIME_PATTERN = "%Y-%m-%d %H:%M:%S+00:00"


Expand Down Expand Up @@ -486,12 +485,6 @@ def read_timeseries(file, gids=None, nsteps=None):
if "bespoke" not in str(file):
# Break down time entries
time = [t.decode() for t in ds["time_index"][:nsteps]]
dtime = [dt.datetime.strptime(t, TIME_PATTERN) for t in time]
minutes = [t.minute for t in dtime]
hours = [t.hour for t in dtime]
days = [t.timetuple().tm_yday for t in dtime]
weeks = [t.isocalendar().week for t in dtime]
months = [t.month for t in dtime]

# Process generation data
cf = ds["rep_profiles_0"][:nsteps, idx]
Expand Down Expand Up @@ -525,15 +518,16 @@ def read_timeseries(file, gids=None, nsteps=None):

# This will only take the average across the year
time = [t.decode() for t in time]
dtime = [dt.datetime.strptime(t, TIME_PATTERN) for t in time]
days = [t.timetuple().tm_yday for t in dtime]
weeks = [t.isocalendar().week for t in dtime]
months = [t.month for t in dtime]
hours = [t.hour for t in dtime]
minutes = [t.minute for t in dtime]

ds.close()

dtime = [dt.datetime.strptime(t, TIME_PATTERN) for t in time]
minutes = [t.minute for t in dtime]
hours = [t.hour for t in dtime]
days = [t.timetuple().tm_yday for t in dtime]
weeks = [t.isocalendar().week for t in dtime]
months = [t.month for t in dtime]

data = pd.DataFrame({
"time": time,
"minute": minutes,
Expand Down Expand Up @@ -696,17 +690,19 @@ def to_geo(df, dst, layer):
"%": "pct",
"&": "and"
}
replace_columns = False
new_columns = []
for col in df.columns:
# Remove columns that start with numbers
if is_int(col[0]):
del df[col]
print(col)

# This happens when you save the index
if "Unnamed:" in col:
elif "Unnamed:" in col:
del df[col]
else:
# Remove unnacceptable characters
# Remove unacceptable characters
ncol = col
for char, repl in replacements.items():
ncol = ncol.replace(char, repl)
Expand All @@ -722,9 +718,12 @@ def to_geo(df, dst, layer):
# npart2 = ncol.split("_")[0]
# ncol = "_".join([npart1, npart2])

# Rename column
new_columns.append(ncol)
if col != ncol:
df = df.rename({col: ncol}, axis=1)
replace_columns = True

if replace_columns:
df.columns = new_columns

# Create fields and set types
fields = []
Expand Down Expand Up @@ -761,8 +760,7 @@ def to_geo(df, dst, layer):
lat = row["latitude"]
lon = row["longitude"]
wkb = point_to_gpkg_point(header, lon, lat)
values = list(row.values)
values.insert(0, wkb)
values = [wkb, *row.values]
rows.append(values)

# Finally insert rows
Expand Down
Loading