Skip to content

Commit

Permalink
Ix roundtrip tests (#14)
Browse files Browse the repository at this point in the history
* First draft of lock-control module.

* compare_datasets handle NaNs in attributes.

* Generalise some utils; first xarray and iris-xarray roundtrips (with fails TBD).

* style fixes.

* Various tidying + rewrites for clarity.

* WIP: struggling with various problems.

* Fix printout of NcVariable with unset dtype.

* Small fixes to dataset comparison.

* Load xarray(ncdata) via backend : test_roundtrips_xarray.py::test_load_direct_vs_viancdata MOSTLY working with scalars fix.

* Tidy 'save-direct-vs' test : remaining fails now ~ masked-data ; string-data ; boundsvar-attrs

* Replace np.product with np.prod (numpy deprecation).

* Fix xarray via-ncdata saving equivalence. Xarray roundtrip tests now passing, with exclusions.

* fix to ixi-xix.

* Brutally restrict all ixi/xix tests to just what currently passes.
  • Loading branch information
pp-mo authored Dec 1, 2023
1 parent 87ebb7f commit c0c850c
Show file tree
Hide file tree
Showing 14 changed files with 1,198 additions and 161 deletions.
3 changes: 2 additions & 1 deletion lib/ncdata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
It is also provided with read/write conversion interfaces to Xarray, Iris and netCDF4,
thus acting as an efficient exchange channel between any of those forms.
N.B. this file excluded from isort, as we want a specific class order for the docs
"""
# N.B. this file excluded from isort, as we want a specific class order for the docs

from ._core import NcAttribute, NcData, NcDimension, NcVariable

__all__ = ["NcAttribute", "NcData", "NcDimension", "NcVariable"]
3 changes: 2 additions & 1 deletion lib/ncdata/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ def __init__(
def _print_content(self):
global _indent
dimstr = ", ".join(self.dimensions)
hdr = f"<NcVariable: {self.name}({dimstr})"
typestr = str(self.dtype) if self.dtype else "<no-dtype>"
hdr = f"<NcVariable({typestr}): {self.name}({dimstr})"
if not self.attributes:
hdr += ">"
lines = [hdr]
Expand Down
140 changes: 140 additions & 0 deletions lib/ncdata/threadlock_sharing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
r"""
Support for "unifying" the thread-safety mechanisms between ncdata and other packages.
Each of the data-format packages (ncdata, iris and xarray) uses its own locking
mechanism to prevent overlapping calls into the netcdf library when called by
multi-threaded code.
Most commonly, this occurs when netcdf file data is read to
compute a Dask array, or written in a Dask delayed write operation.
All 3 data-format packages can map variable data into Dask lazy arrays. Iris and
Xarray can also create delayed write operations (but ncdata currently does not).
However, those mechanisms cannot protect an operation of that package from
overlapping with one in *another* package.
This module can ensure that all of the enabled packages use the *same* thread lock,
so that any and all of them can safely co-operate in parallel operations.
sample code::
from ncdata.threadlock_sharing import enable_lockshare, disable_lockshare
from ncdata.xarray import from_xarray
from ncdata.iris import from_iris
from ncdata.netcdf4 import to_nc4
enable_lockshare(iris=True, xarray=True)
ds = from_xarray(xarray.open_dataset(file1))
ds2 = from_iris(iris.load(file2))
ds.variables['x'].data /= ds2.variables['acell'].data
to_nc4(ds, output_filepath)
disable_lockshare()
or::
with lockshare_context(iris=True):
ncdata = NcData(source_filepath)
ncdata.variables['x'].attributes['units'] = 'K'
cubes = ncdata.iris.to_iris(ncdata)
iris.save(cubes, output_filepath)
"""
from contextlib import contextmanager
from unittest import mock

_SHARE_PATCHES = []


# Patch targets in ncdata and iris.
# N.B. we don't ever patch xarray, we only *copy* that lock tp the others.
_IRIS_TARGET = "iris.fileformats.netcdf._thread_safe_nc._GLOBAL_NETCDF4_LOCK"
_NCDATA_TARGET = "ncdata.netcdf4._GLOBAL_NETCDF4_LIBRARY_THREADLOCK"


def enable_lockshare(iris: bool = False, xarray: bool = False):
"""
Begin lock-sharing between ncdata and the requested other package(s).
Does nothing if an existing sharing is already in place.
Parameters
----------
iris : bool, default False
make ncdata use the same netcdf lock as iris
xarray : bool, default False
make ncdata use the same netcdf lock as xarray
Notes
-----
If an 'enable_lockshare' call was already established, the function does nothing,
i.e. it is not possible to modify an existing share. Instead, you must call
:func:`disable_lockshare` to cancel the current sharing, before you can establish
a new one.
While sharing with *both* iris and xarray, iris is modified to use the same netcdf
lock as xarray.
"""
global _SHARE_PATCHES
# N.B. implement sharing *only if* none already exists
if not _SHARE_PATCHES and (iris or xarray):
if iris and not xarray:
# set ncdata to use the Iris lock
from iris.fileformats.netcdf._thread_safe_nc import (
_GLOBAL_NETCDF4_LOCK as IRIS_LOCK,
)

patches = [mock.patch(_NCDATA_TARGET, new=IRIS_LOCK)]
elif xarray and not iris:
# set ncdata to use the Xarray lock
from xarray.backends.netCDF4_ import (
NETCDF4_PYTHON_LOCK as XARRAY_LOCK,
)

patches = [mock.patch(_NCDATA_TARGET, new=XARRAY_LOCK)]
else:
assert iris and xarray
# set both ncdata AND Iris to use the Xarray lock
from xarray.backends.netCDF4_ import (
NETCDF4_PYTHON_LOCK as XARRAY_LOCK,
)

patches = [
mock.patch(_NCDATA_TARGET, new=XARRAY_LOCK),
mock.patch(_IRIS_TARGET, new=XARRAY_LOCK),
]

for patch in patches:
patch.start()
_SHARE_PATCHES = patches


def disable_lockshare():
"""
Remove any enabled lock-sharing.
Does nothing if no lock share is in operation.
"""
global _SHARE_PATCHES
for patch in _SHARE_PATCHES:
patch.stop()
_SHARE_PATCHES = []


@contextmanager
def lockshare_context(iris: bool = False, xarray: bool = False):
"""
Make a context with lock-sharing between the ncdata and the requested packages.
This allows safe netcdf access when using a combination of ncdata/iris/xarray
packages.
"""
try:
enable_lockshare(iris=iris, xarray=xarray)
yield
finally:
disable_lockshare()
66 changes: 40 additions & 26 deletions lib/ncdata/xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
from typing import AnyStr, Union

import xarray as xr
from xarray.backends import NetCDF4DataStore

from . import NcAttribute, NcData, NcDimension, NcVariable


class _XarrayNcDataStore:
class _XarrayNcDataStore(NetCDF4DataStore):
"""
An adapter class presenting ncdata as an xarray datastore.
Expand All @@ -25,16 +26,29 @@ class _XarrayNcDataStore:
This requires some knowledge of Xarray, but it is very small.
This code originated from @TomekTrzeciak.
This approach originated from @TomekTrzeciak.
See https://gist.github.com/TomekTrzeciak/b00ff6c9dc301ed6f684990e400d1435
"""

# This property ensures that variables are adjusted for netCDF4 output
# (rather than netCDF3) in the call to
# "xarray.backends.netCDF4_.NetCDF4DataStore.encode_variable".
# This 'encode_variable' routine is invoked by "self.encode"
# -- which is actually "xarray.backends.common.WritableCFDataStore.encode".
format = "NETCDF4"

def __init__(self, ncdata: NcData = None):
if ncdata is None:
ncdata = NcData()
self.ncdata = ncdata

def load(self):
"""
Return Xarray variables + attributes representing the contained 'self.ncdata'.
Called, indirectly, by :meth:`to_xarray` via
:meth:`xarray.backends.store.StoreBackendEntrypoint.open_dataset`.
"""
variables = {}
for k, v in self.ncdata.variables.items():
attrs = {
Expand All @@ -44,9 +58,6 @@ def load(self):
xr_var = xr.Variable(
v.dimensions, v.data, attrs, getattr(v, "encoding", {})
)
# TODO: ?possibly? need to apply usual Xarray "encodings" to convert raw
# cf-encoded data into 'normal', interpreted xr.Variables.
xr_var = xr.conventions.decode_cf_variable(k, xr_var)
variables[k] = xr_var
attributes = {
name: attr.as_python_value()
Expand All @@ -62,30 +73,25 @@ def store(
writer=None,
unlimited_dims=None,
):
"""
Populate the stored `self.ncdata` from given Xarray variables + attributes.
Called, indirectly, by :meth:`from_xarray` via
:meth:`xr.Dataset.dump_to_store`.
"""
# Encode the xarray data as-if-for netcdf4 output, so we convert internal forms
# (such as strings and timedates) to file-relevant forms.
variables, attributes = self.encode(variables, attributes)

# Install (global) attributes into self.
for attrname, v in attributes.items():
if (
attrname in self.ncdata.attributes
): # and self.attributes[k] != v:
msg = (
f're-setting of attribute "{attrname}" : '
f"was={self.ncdata.attributes[attrname]}, now={v}"
)
raise ValueError(msg)
else:
self.ncdata.attributes[attrname] = NcAttribute(attrname, v)
self.ncdata.attributes[attrname] = NcAttribute(attrname, v)

# Install variables, creating dimensions as we go.
for varname, var in variables.items():
if varname in self.ncdata.variables:
raise ValueError(f'duplicate variable : "{varname}"')

# An xr.Variable : remove all the possible Xarray encodings
# These are all the ones potentially used by
# :func:`xr.conventions.decode_cf_variable`, in the order in which they
# would be applied.
var = xr.conventions.encode_cf_variable(
var, name=varname, needs_copy=False
)

for dim_name, size in zip(var.dims, var.shape):
if dim_name in self.ncdata.dimensions:
if self.ncdata.dimensions[dim_name].size != size:
Expand All @@ -102,15 +108,20 @@ def store(
name: NcAttribute(name, value)
for name, value in var.attrs.items()
}

data = var.data
nc_var = NcVariable(
name=varname,
dimensions=var.dims,
attributes=attrs,
data=var.data,
data=data,
group=self.ncdata,
)
self.ncdata.variables[varname] = nc_var

def get_encoding(self):
return {}

def close(self):
pass

Expand All @@ -132,8 +143,11 @@ def from_xarray(
dataset_or_file.dump_to_store(nc_data, **xr_load_kwargs)
return nc_data

def to_xarray(self, **xr_save_kwargs) -> xr.Dataset:
ds = xr.Dataset.load_store(self, **xr_save_kwargs)
def to_xarray(self, **xr_load_kwargs) -> xr.Dataset:
from xarray.backends.store import StoreBackendEntrypoint

store_entrypoint = StoreBackendEntrypoint()
ds = store_entrypoint.open_dataset(self, **xr_load_kwargs)
return ds


Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
line-length = 79
target-version = ['py310']
include = '\.pyi?$'

[tool.isort]
line_length = "79"
profile = "black"
38 changes: 33 additions & 5 deletions tests/_compare_nc_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,26 @@ def _isncdata(obj):
return hasattr(obj, "_print_content")


def _array_eq(a1, a2):
"""
Test equality of array values in attributes.
Assumes values (attributes) are presented as numpy arrays (not lazy).
Matches any NaNs.
Does *NOT* handle masked data -- which does not occur in attributes.
"""
result = True
result &= a1.shape == a2.shape
result &= a1.dtype == a2.dtype
if result:
if a1.dtype.kind in ("S", "U", "b"):
result = np.all(a1 == a2)
else:
# array_equal handles possible NaN cases
result = np.array_equal(a1, a2, equal_nan=True)
return result


def _compare_attributes(
errs,
obj1,
Expand Down Expand Up @@ -161,30 +181,38 @@ def fix_orders(attrlist):

attr, attr2 = [
(
obj.attributes[attrname].value
obj.attributes[attrname].as_python_value()
if _isncdata(obj)
else obj.getncattr(attrname)
)
for obj in (obj1, obj2)
]

# TODO: this still doesn't work well for strings : for those, we should ignore
# exact "type" (including length), and just compare the content.
# TODO: get a good testcase going to check this behaviour
dtype, dtype2 = [
# Get x.dtype, or fallback on type(x) -- basically, for strings.
getattr(attr, "dtype", type(attr))
for attr in (attr, attr2)
]

if all(
isinstance(dt, np.dtype) and dt.kind in "SUb"
for dt in (dtype, dtype2)
):
dtype = dtype2 = "string"
if dtype != dtype2:
msg = (
f'{elemname} "{attrname}" attribute datatypes differ : '
f"{dtype!r} != {dtype2!r}"
)
errs.append(msg)
else:
# If values match (only then), compare datatypes
# If datatypes match (only then), compare values
# Cast attrs, which might be strings, to arrays for comparison
arr, arr2 = [np.asarray(attr) for attr in (attr, attr2)]
if arr.shape != arr2.shape or not np.all(arr == arr2):
if not _array_eq(arr, arr2):
# N.B. special comparison to handle strings and NaNs
msg = (
f'{elemname} "{attrname}" attribute values differ : '
f"{attr!r} != {attr2!r}"
Expand Down Expand Up @@ -290,7 +318,7 @@ def _compare_nc_groups(
errs.append(msg)

# data values
is_str, is_str2 = (dt.kind in ("U", "S") for dt in (dtype, dtype2))
is_str, is_str2 = (dt.kind in "SUb" for dt in (dtype, dtype2))
# TODO: is this correct check to allow compare between different dtypes?
if data_equality and dims == dims2 and is_str == is_str2:
# N.B. don't check shapes here: we already checked dimensions.
Expand Down
Loading

0 comments on commit c0c850c

Please sign in to comment.