Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support NXlog with "sublogs" such as connection_status and alarm #138

Merged
merged 6 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/scippnexus/v2/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def _make_child(obj: Union[H5Dataset, H5Group]) -> Union[Field, Group]:
# event_time_offset/event_id fields, instead of the
# event_time_zero/event_index fields). In the case of NXlog they may
# be some utility if we deal with extremely long time-series that
# could be leverage for label-based indexing in the future.
# could be leveraged for label-based indexing in the future.
items = {k: v for k, v in items.items() if not k.startswith('cue_')}
for suffix in ('_errors', '_error'):
field_with_errors = [name for name in items if f'{name}{suffix}' in items]
Expand Down
74 changes: 68 additions & 6 deletions src/scippnexus/v2/nxdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np
import scipp as sc

from .._common import convert_time_to_datetime64, to_child_select
from .._common import _to_canonical_select, convert_time_to_datetime64, to_child_select
from ..typing import H5Dataset, ScippIndex
from .base import (
Group,
Expand Down Expand Up @@ -342,20 +342,82 @@ def _squeeze_trailing(dims: Tuple[str, ...], shape: Tuple[int, ...]) -> Tuple[in


class NXlog(NXdata):
"""
NXlog, a time-series that can be loaded as a DataArray.

In some cases the NXlog may contain additional time series, such as a connection
status or alarm. These cannot be handled in a standard way, since the result cannot
be represented as a single DataArray. Furthermore, they prevent positional
time-indexing, since the time coord of each time-series is different. We can
support label-based indexing for this in the future. If additional time-series
are contained within the NXlog then loading will return a DataGroup of the
individual time-series (DataArrays).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we had some form of flexible structured dtypes, could this data be merged into a single data array by binning? (e.g. with the main log's time coord)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. What do you have in mind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

da['time', i] contains the value of the main log in this bin as well as all values of sublogs that fall into this bin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically the main log has many more values than the sublogs (often by orders of magnitude), so I don't see how that would be useful/feasible?

"""

def __init__(self, attrs: Dict[str, Any], children: Dict[str, Union[Field, Group]]):
self._sublogs = []
self._sublog_children = {}
for name in children:
if name.endswith('_time'):
self._sublogs.append(name[:-5])
# Extract all fields that belong to sublogs, since they will interfere with the
# setup logic in the base class (NXdata).
for name in self._sublogs:
for k in list(children):
if k.startswith(name):
field = children.pop(k)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This modifies the argument children. Please make a copy first.

self._init_field(field)
field.sizes = {
'time' if i == 0 else f'dim_{i}': size
for i, size in enumerate(field.dataset.shape)
}
self._sublog_children[k] = field

super().__init__(attrs=attrs,
children=children,
fallback_dims=('time', ),
fallback_signal_name='value')

def read_children(self, sel: ScippIndex) -> sc.DataGroup:
# Sublogs have distinct time axes (with a different length). Must disable
# positional indexing.
if self._sublogs and ('time' in _to_canonical_select(list(self.sizes), sel)):
raise sc.DimensionError(
"Cannot positionally select time since there are multiple "
"time fields. Label-based selection is not supported yet.")
dg = super().read_children(sel)
for name, field in self._sublog_children.items():
dg[name] = field[sel]
return dg

def _time_to_datetime(self, mapping):
if (time := mapping.get('time')) is not None:
if time.dtype != sc.DType.datetime64 and _is_time(time):
mapping['time'] = convert_time_to_datetime64(
time, start=sc.epoch(unit=time.unit))

def _assemble_sublog(self,
dg: sc.DataGroup,
name: str,
value_name: Optional[str] = None) -> sc.DataArray:
value_name = name if value_name is None else f'{name}_{value_name}'
da = sc.DataArray(dg.pop(value_name), coords={'time': dg.pop(f'{name}_time')})
for k in list(dg):
if k.startswith(name):
da.coords[k[len(name) + 1:]] = dg.pop(k)
self._time_to_datetime(da.coords)
return da

def assemble(self,
dg: sc.DataGroup) -> Union[sc.DataGroup, sc.DataArray, sc.Dataset]:
if (time := dg.get('time')) is not None:
if time.dtype != sc.DType.datetime64 and _is_time(time):
dg['time'] = convert_time_to_datetime64(time,
start=sc.epoch(unit=time.unit))
return super().assemble(dg)
sublogs = sc.DataGroup()
for name in self._sublogs:
# Somewhat arbitrary definition of which fields is the "value"
value_name = 'severity' if name == 'alarm' else None
sublogs[name] = self._assemble_sublog(dg, name, value_name=value_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_assemble_sublog also modifies an argument (dg)

self._time_to_datetime(dg)
out = super().assemble(dg)
return out if not sublogs else sc.DataGroup(value=out, **sublogs)


def _find_embedded_nxevent_data(
Expand Down
106 changes: 0 additions & 106 deletions tests/nexus_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,112 +99,6 @@ def test_nx_class_can_be_bytes(h5root):
assert group.nx_class == NXlog


def test_nxobject_log(h5root):
da = sc.DataArray(sc.array(dims=['time'], values=[1.1, 2.2, 3.3]),
coords={
'time':
sc.epoch(unit='ns') +
sc.array(dims=['time'], unit='s', values=[4.4, 5.5, 6.6]).to(
unit='ns', dtype='int64')
})
log = snx.create_class(h5root, 'log', NXlog)
snx.create_field(log, 'value', da.data)
snx.create_field(log, 'time', da.coords['time'] - sc.epoch(unit='ns'))
log = snx.Group(log, definitions=snx.base_definitions())
assert sc.identical(log[...], da)


def test_nxlog_with_missing_value_triggers_fallback(nxroot):
time = sc.epoch(unit='ns') + sc.array(
dims=['time'], unit='s', values=[4.4, 5.5, 6.6]).to(unit='ns', dtype='int64')
log = nxroot['entry'].create_class('log', NXlog)
log['time'] = time - sc.epoch(unit='ns')
loaded = log[()]
# Fallback to DataGroup, but we still have partial info from NXlog: dim is time
assert_identical(loaded, sc.DataGroup(time=time))


def test_nxlog_length_1(h5root):
nxroot = snx.Group(h5root, definitions=snx.base_definitions())
da = sc.DataArray(
sc.array(dims=['time'], values=[1.1]),
coords={
'time':
sc.epoch(unit='ns') +
sc.array(dims=['time'], unit='s', values=[4.4]).to(unit='ns', dtype='int64')
})
log = nxroot.create_class('log', NXlog)
log['value'] = da.data
log['time'] = da.coords['time'] - sc.epoch(unit='ns')
assert sc.identical(log[...], da)


def test_nxlog_length_1_two_dims_no_time_defaults_inner_dim_name(nxroot):
var = sc.array(dims=['time', 'ignored'], values=[[1.1]])
log = nxroot['entry'].create_class('log', NXlog)
log['value'] = var
assert_identical(log[...], sc.DataArray(var.rename(ignored='dim_1')))


def test_nxlog_length_1_two_dims_with_time_defaults_inner_dim_name(nxroot):
da = sc.DataArray(
sc.array(dims=['time', 'ignored'], values=[[1.1]]),
coords={
'time':
sc.epoch(unit='ns') +
sc.array(dims=['time'], unit='s', values=[4.4]).to(unit='ns', dtype='int64')
})
log = nxroot['entry'].create_class('log', NXlog)
log['value'] = da.data
log['time'] = da.coords['time'] - sc.epoch(unit='ns')
assert sc.identical(log[...], da.rename(ignored='dim_1'))


def test_nxlog_axes_replaces_time_dim(nxroot):
da = sc.DataArray(
sc.array(dims=['time', 'ignored'], values=[[1.1]]),
coords={
'time':
sc.epoch(unit='ns') +
sc.array(dims=['time'], unit='s', values=[4.4]).to(unit='ns', dtype='int64')
})
log = nxroot['entry'].create_class('log', NXlog)
log._group.attrs['axes'] = ['yy', 'xx']
log['value'] = da.data
log['time'] = da.coords['time'] - sc.epoch(unit='ns')
expected = sc.DataArray(sc.array(dims=['yy', 'xx'], values=[[1.1]]),
coords={'time': da.coords['time'].squeeze()})
assert sc.identical(log[...], expected)


def test_nxlog_three_dims_with_time_of_length_1(nxroot):
da = sc.DataArray(
sc.array(dims=['time', 'a', 'b'], values=np.arange(9.).reshape(1, 3, 3)),
coords={
'time':
sc.epoch(unit='ns') +
sc.array(dims=['time'], unit='s', values=[4.4]).to(unit='ns', dtype='int64')
})
log = nxroot['entry'].create_class('log', NXlog)
log['value'] = da.data
log['time'] = da.coords['time'] - sc.epoch(unit='ns')
loaded = log[...]
assert_identical(
loaded.data,
sc.array(dims=['time', 'dim_1', 'dim_2'], values=np.arange(9.).reshape(1, 3,
3)))


def test_nxlog_with_shape_0(nxroot):
da = sc.DataArray(sc.ones(dims=['time', 'ignored'], shape=(0, 1)),
coords={'time': sc.ones(dims=['time'], shape=(0, ), unit='s')})
log = nxroot['entry'].create_class('log', NXlog)
log['value'] = da.data
log['time'] = da.coords['time']
da.coords['time'] = sc.datetimes(dims=['time'], values=[], unit='ns')
assert_identical(log[...], da.rename(ignored='dim_1'))


def test_nxobject_event_data(nxroot):
event_data = nxroot['entry'].create_class('events_0', NXevent_data)
assert event_data.nx_class == NXevent_data
Expand Down
Loading