Skip to content

Commit

Permalink
Merge pull request #138 from scipp/nxmultilog
Browse files Browse the repository at this point in the history
Support NXlog with "sublogs" such as connection_status and alarm
  • Loading branch information
SimonHeybrock authored May 1, 2023
2 parents 1fe28c2 + dc29cbe commit 5c6b54f
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 113 deletions.
2 changes: 1 addition & 1 deletion src/scippnexus/v2/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def _make_child(obj: Union[H5Dataset, H5Group]) -> Union[Field, Group]:
# event_time_offset/event_id fields, instead of the
# event_time_zero/event_index fields). In the case of NXlog they may
# be some utility if we deal with extremely long time-series that
# could be leverage for label-based indexing in the future.
# could be leveraged for label-based indexing in the future.
items = {k: v for k, v in items.items() if not k.startswith('cue_')}
for suffix in ('_errors', '_error'):
field_with_errors = [name for name in items if f'{name}{suffix}' in items]
Expand Down
76 changes: 70 additions & 6 deletions src/scippnexus/v2/nxdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np
import scipp as sc

from .._common import convert_time_to_datetime64, to_child_select
from .._common import _to_canonical_select, convert_time_to_datetime64, to_child_select
from ..typing import H5Dataset, ScippIndex
from .base import (
Group,
Expand Down Expand Up @@ -342,20 +342,84 @@ def _squeeze_trailing(dims: Tuple[str, ...], shape: Tuple[int, ...]) -> Tuple[in


class NXlog(NXdata):
"""
NXlog, a time-series that can be loaded as a DataArray.
In some cases the NXlog may contain additional time series, such as a connection
status or alarm. These cannot be handled in a standard way, since the result cannot
be represented as a single DataArray. Furthermore, they prevent positional
time-indexing, since the time coord of each time-series is different. We can
support label-based indexing for this in the future. If additional time-series
are contained within the NXlog then loading will return a DataGroup of the
individual time-series (DataArrays).
"""

def __init__(self, attrs: Dict[str, Any], children: Dict[str, Union[Field, Group]]):
children = dict(children)
self._sublogs = []
self._sublog_children = {}
for name in children:
if name.endswith('_time'):
self._sublogs.append(name[:-5])
# Extract all fields that belong to sublogs, since they will interfere with the
# setup logic in the base class (NXdata).
for name in self._sublogs:
for k in list(children):
if k.startswith(name):
field = children.pop(k)
self._init_field(field)
field.sizes = {
'time' if i == 0 else f'dim_{i}': size
for i, size in enumerate(field.dataset.shape)
}
self._sublog_children[k] = field

super().__init__(attrs=attrs,
children=children,
fallback_dims=('time', ),
fallback_signal_name='value')

def read_children(self, sel: ScippIndex) -> sc.DataGroup:
# Sublogs have distinct time axes (with a different length). Must disable
# positional indexing.
if self._sublogs and ('time' in _to_canonical_select(list(self.sizes), sel)):
raise sc.DimensionError(
"Cannot positionally select time since there are multiple "
"time fields. Label-based selection is not supported yet.")
dg = super().read_children(sel)
for name, field in self._sublog_children.items():
dg[name] = field[sel]
return dg

def _time_to_datetime(self, mapping):
if (time := mapping.get('time')) is not None:
if time.dtype != sc.DType.datetime64 and _is_time(time):
mapping['time'] = convert_time_to_datetime64(
time, start=sc.epoch(unit=time.unit))

def _assemble_sublog(self,
dg: sc.DataGroup,
name: str,
value_name: Optional[str] = None) -> sc.DataArray:
value_name = name if value_name is None else f'{name}_{value_name}'
da = sc.DataArray(dg.pop(value_name), coords={'time': dg.pop(f'{name}_time')})
for k in list(dg):
if k.startswith(name):
da.coords[k[len(name) + 1:]] = dg.pop(k)
self._time_to_datetime(da.coords)
return da

def assemble(self,
dg: sc.DataGroup) -> Union[sc.DataGroup, sc.DataArray, sc.Dataset]:
if (time := dg.get('time')) is not None:
if time.dtype != sc.DType.datetime64 and _is_time(time):
dg['time'] = convert_time_to_datetime64(time,
start=sc.epoch(unit=time.unit))
return super().assemble(dg)
self._time_to_datetime(dg)
dg = sc.DataGroup(dg)
sublogs = sc.DataGroup()
for name in self._sublogs:
# Somewhat arbitrary definition of which fields is the "value"
value_name = 'severity' if name == 'alarm' else None
sublogs[name] = self._assemble_sublog(dg, name, value_name=value_name)
out = super().assemble(dg)
return out if not sublogs else sc.DataGroup(value=out, **sublogs)


def _find_embedded_nxevent_data(
Expand Down
106 changes: 0 additions & 106 deletions tests/nexus_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,112 +99,6 @@ def test_nx_class_can_be_bytes(h5root):
assert group.nx_class == NXlog


def test_nxobject_log(h5root):
da = sc.DataArray(sc.array(dims=['time'], values=[1.1, 2.2, 3.3]),
coords={
'time':
sc.epoch(unit='ns') +
sc.array(dims=['time'], unit='s', values=[4.4, 5.5, 6.6]).to(
unit='ns', dtype='int64')
})
log = snx.create_class(h5root, 'log', NXlog)
snx.create_field(log, 'value', da.data)
snx.create_field(log, 'time', da.coords['time'] - sc.epoch(unit='ns'))
log = snx.Group(log, definitions=snx.base_definitions())
assert sc.identical(log[...], da)


def test_nxlog_with_missing_value_triggers_fallback(nxroot):
time = sc.epoch(unit='ns') + sc.array(
dims=['time'], unit='s', values=[4.4, 5.5, 6.6]).to(unit='ns', dtype='int64')
log = nxroot['entry'].create_class('log', NXlog)
log['time'] = time - sc.epoch(unit='ns')
loaded = log[()]
# Fallback to DataGroup, but we still have partial info from NXlog: dim is time
assert_identical(loaded, sc.DataGroup(time=time))


def test_nxlog_length_1(h5root):
nxroot = snx.Group(h5root, definitions=snx.base_definitions())
da = sc.DataArray(
sc.array(dims=['time'], values=[1.1]),
coords={
'time':
sc.epoch(unit='ns') +
sc.array(dims=['time'], unit='s', values=[4.4]).to(unit='ns', dtype='int64')
})
log = nxroot.create_class('log', NXlog)
log['value'] = da.data
log['time'] = da.coords['time'] - sc.epoch(unit='ns')
assert sc.identical(log[...], da)


def test_nxlog_length_1_two_dims_no_time_defaults_inner_dim_name(nxroot):
var = sc.array(dims=['time', 'ignored'], values=[[1.1]])
log = nxroot['entry'].create_class('log', NXlog)
log['value'] = var
assert_identical(log[...], sc.DataArray(var.rename(ignored='dim_1')))


def test_nxlog_length_1_two_dims_with_time_defaults_inner_dim_name(nxroot):
da = sc.DataArray(
sc.array(dims=['time', 'ignored'], values=[[1.1]]),
coords={
'time':
sc.epoch(unit='ns') +
sc.array(dims=['time'], unit='s', values=[4.4]).to(unit='ns', dtype='int64')
})
log = nxroot['entry'].create_class('log', NXlog)
log['value'] = da.data
log['time'] = da.coords['time'] - sc.epoch(unit='ns')
assert sc.identical(log[...], da.rename(ignored='dim_1'))


def test_nxlog_axes_replaces_time_dim(nxroot):
da = sc.DataArray(
sc.array(dims=['time', 'ignored'], values=[[1.1]]),
coords={
'time':
sc.epoch(unit='ns') +
sc.array(dims=['time'], unit='s', values=[4.4]).to(unit='ns', dtype='int64')
})
log = nxroot['entry'].create_class('log', NXlog)
log._group.attrs['axes'] = ['yy', 'xx']
log['value'] = da.data
log['time'] = da.coords['time'] - sc.epoch(unit='ns')
expected = sc.DataArray(sc.array(dims=['yy', 'xx'], values=[[1.1]]),
coords={'time': da.coords['time'].squeeze()})
assert sc.identical(log[...], expected)


def test_nxlog_three_dims_with_time_of_length_1(nxroot):
da = sc.DataArray(
sc.array(dims=['time', 'a', 'b'], values=np.arange(9.).reshape(1, 3, 3)),
coords={
'time':
sc.epoch(unit='ns') +
sc.array(dims=['time'], unit='s', values=[4.4]).to(unit='ns', dtype='int64')
})
log = nxroot['entry'].create_class('log', NXlog)
log['value'] = da.data
log['time'] = da.coords['time'] - sc.epoch(unit='ns')
loaded = log[...]
assert_identical(
loaded.data,
sc.array(dims=['time', 'dim_1', 'dim_2'], values=np.arange(9.).reshape(1, 3,
3)))


def test_nxlog_with_shape_0(nxroot):
da = sc.DataArray(sc.ones(dims=['time', 'ignored'], shape=(0, 1)),
coords={'time': sc.ones(dims=['time'], shape=(0, ), unit='s')})
log = nxroot['entry'].create_class('log', NXlog)
log['value'] = da.data
log['time'] = da.coords['time']
da.coords['time'] = sc.datetimes(dims=['time'], values=[], unit='ns')
assert_identical(log[...], da.rename(ignored='dim_1'))


def test_nxobject_event_data(nxroot):
event_data = nxroot['entry'].create_class('events_0', NXevent_data)
assert event_data.nx_class == NXevent_data
Expand Down
Loading

0 comments on commit 5c6b54f

Please sign in to comment.