Skip to content

Commit

Permalink
split get_gcm by variable due to this issue with dask: pydata/xarray#…
Browse files Browse the repository at this point in the history
  • Loading branch information
norlandrhagen committed Jun 24, 2022
1 parent 4c9c65a commit f7c9480
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
43 changes: 22 additions & 21 deletions cmip6_downscaling/methods/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def get_obs(run_parameters: RunParameters) -> UPath:
if use_cache and zmetadata_exists(target):
print(f'found existing target: {target}')
return target

print(run_parameters)
ds = open_era5(run_parameters.features, run_parameters.train_period)
subset = subset_dataset(
ds,
Expand Down Expand Up @@ -137,29 +137,30 @@ def get_experiment(run_parameters: RunParameters, time_subset: str) -> UPath:
print(f'found existing target: {target}')
return target

ds = get_gcm(
scenario=run_parameters.scenario,
member_id=run_parameters.member,
table_id=run_parameters.table_id,
grid_label=run_parameters.grid_label,
source_id=run_parameters.model,
variable=run_parameters.features,
time_slice=time_period.time_slice,
)
# The for loop is a workaround for github issue: https://github.com/pydata/xarray/issues/6709
mode = 'w'
for feature in run_parameters.features:
ds = get_gcm(
scenario=run_parameters.scenario,
member_id=run_parameters.member,
table_id=run_parameters.table_id,
grid_label=run_parameters.grid_label,
source_id=run_parameters.model,
variable=feature,
time_slice=time_period.time_slice,
)
subset = subset_dataset(ds, feature, time_period.time_slice, run_parameters.bbox)
# Note: dataset is chunked into time:365 chunks to standardize leap-year chunking.
subset = subset.chunk({'time': 365})
for key in subset.variables:
subset[key].encoding = {}

subset = subset_dataset(
ds, run_parameters.features, time_period.time_slice, run_parameters.bbox
)
subset.attrs.update({'title': title}, **get_cf_global_attrs(version=version))

# Note: dataset is chunked into time:365 chunks to standardize leap-year chunking.
subset = subset.chunk({'time': 365})
for key in subset.variables:
subset[key].encoding = {}

subset.attrs.update({'title': title}, **get_cf_global_attrs(version=version))
subset = set_zarr_encoding(subset)
subset[[feature]].to_zarr(target, mode=mode)
mode = 'a'

subset = set_zarr_encoding(subset)
blocking_to_zarr(ds=subset, target=target, validate=True, write_empty_chunks=True)
return target


Expand Down
2 changes: 2 additions & 0 deletions cmip6_downscaling/methods/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pathlib
import re

import dask
import fsspec
import geopandas as gpd
import numpy as np
Expand Down Expand Up @@ -80,6 +81,7 @@ def blocking_to_zarr(

for variable in ds.data_vars:
ds[variable].encoding['write_empty_chunks'] = True
ds = dask.optimize(ds)[0]
t = ds.to_zarr(target, mode='w', compute=False)
t.compute(retries=5)
zarr.consolidate_metadata(target)
Expand Down

0 comments on commit f7c9480

Please sign in to comment.