-
Notifications
You must be signed in to change notification settings - Fork 54
/
Copy pathwriters.py
76 lines (60 loc) · 2.42 KB
/
writers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from typing import Tuple
import numpy as np
import xarray as xr
import zarr
from .patterns import CombineOp, Index
def _region_for(var: xr.Variable, index: Index) -> Tuple[slice, ...]:
region_slice = []
for dim, dimsize in var.sizes.items():
concat_dimension = index.find_concat_dim(dim)
if concat_dimension:
# we are concatenating over this dimension
position = index[concat_dimension]
assert position.indexed
start = position.value
stop = start + dimsize
region_slice.append(slice(start, stop))
else:
# we are writing the entire dimension
region_slice.append(slice(None))
return tuple(region_slice)
def _store_data(vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group) -> None:
zarr_array = zgroup[vname]
# get encoding for variable from zarr attributes
var_coded = var.copy() # copy needed for test suit to avoid modifying inputs in-place
var_coded.encoding.update(zarr_array.attrs)
var_coded.attrs = {}
var = xr.backends.zarr.encode_zarr_variable(var_coded)
data = np.asarray(var.data)
region = _region_for(var, index)
zarr_array[region] = data
def _is_first_item(index):
for _, v in index.items():
if v.value > 0:
return False
return True
def _is_first_in_merge_dim(index):
for k, v in index.items():
if k.operation == CombineOp.MERGE:
if v.value > 0:
return False
return True
def store_dataset_fragment(
item: Tuple[Index, xr.Dataset], target_store: zarr.storage.FSStore
) -> None:
"""Store a piece of a dataset in a Zarr store.
:param item: The index and dataset to be stored
:param target_store: The destination to store in
"""
index, ds = item
zgroup = zarr.open_group(target_store)
# TODO: check that the dataset and the index are compatible
# only store coords if this is the first item in a merge dim
if _is_first_in_merge_dim(index):
for vname, da in ds.coords.items():
# if this variable contains a concat dim, we always store it
possible_concat_dims = [index.find_concat_dim(dim) for dim in da.dims]
if any(possible_concat_dims) or _is_first_item(index):
_store_data(vname, da.variable, index, zgroup)
for vname, da in ds.data_vars.items():
_store_data(vname, da.variable, index, zgroup)