Skip to content

Commit

Permalink
[MRG] Adding get_montage for montage to BaseRaw objects (#7667)
Browse files Browse the repository at this point in the history
* Adding additional containsmixin for montage and ch pos.

* Removing unnecessary code.

* Adding up fixes for comments from agram.

* Adding updated.

* Adding updated test montage.

* Adding updated docstring.

* Adding updated docstring.

* Adding change log.

* Adding get_montage doc.

* Fixing pydoc.

* Update mne/channels/channels.py

Co-authored-by: Alexandre Gramfort <[email protected]>

* Wip.

* Adding unit tests for ch positions.

* Coord frame from chs.

* Adding the documentation.

* Deleting script.

* Removing diff.

* Fix flake8.

* Fixing unit test for codecov.

* Update mne/channels/channels.py

Co-authored-by: Eric Larson <[email protected]>

* Adding updated digitatization.

* Adding updated digitatization.

* Adding channels.

* FIX: Ref

* Fixing unit tests.

* Removing get_ch_positions.

* Fix latest.

* trying.

* Merging.

* Update mne/channels/tests/test_montage.py

Co-authored-by: Alexandre Gramfort <[email protected]>

* Fixing reference location.

* Fixing reference location.

* Merging.

* Show test montage breaks.

* Adding updated unit tests.

* Adding updated montage.

* Fixing order.

* Fixing unit tests for fieldtrip.

* Fixing unit tests for fieldtrip.

* Adding fix.

Co-authored-by: Alexandre Gramfort <[email protected]>
Co-authored-by: Eric Larson <[email protected]>
  • Loading branch information
3 people authored Sep 8, 2020
1 parent 9225419 commit 01750aa
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 16 deletions.
2 changes: 2 additions & 0 deletions doc/changes/latest.inc
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,5 @@ API
- The ``threshold`` argument in :meth:`mne.preprocessing.ICA.find_bads_ecg` defaults to ``None`` in version 0.21 but will change to ``'auto'`` in 0.22 by `Yu-Han Luo`_
- The default argument ``meg=True`` in :func:`mne.pick_types` will change to ``meg=False`` in version 0.22 by `Clemens Brunner`_
- Added :meth:`mne.io.Raw.get_montage`, which obtains the montage that an instance has, by `Adam Li`_
43 changes: 41 additions & 2 deletions mne/channels/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
from scipy import sparse

from ..defaults import HEAD_SIZE_DEFAULT, _handle_default
from ..utils import (verbose, logger, warn, _check_preload, _validate_type,
fill_doc, _check_option)
from ..transforms import _frame_to_str
from ..utils import (verbose, logger, warn,
_check_preload, _validate_type, fill_doc, _check_option)
from ..io.compensator import get_current_comp
from ..io.constants import FIFF
from ..io.meas_info import anonymize_info, Info, MontageMixin, create_info
Expand All @@ -28,6 +29,7 @@
channel_indices_by_type, pick_channels, _picks_to_idx,
_get_channel_types)
from ..io.write import DATE_NONE
from ..io._digitization import _get_data_as_dict_from_dig


def _get_meg_system(info):
Expand Down Expand Up @@ -235,6 +237,43 @@ def get_channel_types(self, picks=None, unique=False, only_data_chs=False):
return _get_channel_types(self.info, picks=picks, unique=unique,
only_data_chs=only_data_chs)

@fill_doc
def get_montage(self):
"""Get a DigMontage from instance.
Returns
-------
%(montage)s
"""
from ..channels.montage import make_dig_montage
if self.info['dig'] is None:
return None
# obtain coord_frame, and landmark coords
# (nasion, lpa, rpa, hsp, hpi) from DigPoints
montage_bunch = _get_data_as_dict_from_dig(self.info['dig'])
coord_frame = _frame_to_str.get(montage_bunch.coord_frame)

# get the channel names and chs data structure
ch_names, chs = self.info['ch_names'], self.info['chs']
picks = pick_types(self.info, meg=False, eeg=True,
seeg=True, ecog=True)

# channel positions from dig do not match ch_names one to one,
# so use loc[:3] instead
ch_pos = {ch_names[ii]: chs[ii]['loc'][:3] for ii in picks}

# create montage
montage = make_dig_montage(
ch_pos=ch_pos,
coord_frame=coord_frame,
nasion=montage_bunch.nasion,
lpa=montage_bunch.lpa,
rpa=montage_bunch.rpa,
hsp=montage_bunch.hsp,
hpi=montage_bunch.hpi,
)
return montage


# XXX Eventually de-duplicate with _kind_dict of mne/io/meas_info.py
_human2fiff = {'ecg': FIFF.FIFFV_ECG_CH,
Expand Down
53 changes: 47 additions & 6 deletions mne/channels/montage.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,15 +689,35 @@ def _backcompat_value(pos, ref_pos):
else:
return np.concatenate((pos, ref_pos))

# get the channels in the montage in head
ch_pos = mnt_head._get_ch_pos()
refs = set(ch_pos) & {'EEG000', 'REF'}
assert len(refs) <= 1
eeg_ref_pos = np.zeros(3) if not(refs) else ch_pos.pop(refs.pop())

# This raises based on info being subset/superset of montage
# only get the eeg, seeg, ecog channels
_pick_chs = partial(
pick_types, exclude=[], eeg=True, seeg=True, ecog=True, meg=False,
)

# get the reference position from the loc[3:6]
chs = info['chs']
ref_pos = [chs[ii]['loc'][3:6] for ii in _pick_chs(info)]

# keep reference location from EEG/ECoG/SEEG channels if they
# already exist and are all the same.
custom_eeg_ref_dig = False
# Note: ref position is an empty list for fieldtrip data
if ref_pos:
if all([np.equal(ref_pos[0], pos).all() for pos in ref_pos]) \
and not np.equal(ref_pos[0], [0, 0, 0]).all():
eeg_ref_pos = ref_pos[0]
# since we have an EEG reference position, we have
# to add it into the info['dig'] as EEG000
custom_eeg_ref_dig = True
if not custom_eeg_ref_dig:
refs = set(ch_pos) & {'EEG000', 'REF'}
assert len(refs) <= 1
eeg_ref_pos = np.zeros(3) if not(refs) else ch_pos.pop(refs.pop())

# This raises based on info being subset/superset of montage
info_names = [info['ch_names'][ii] for ii in _pick_chs(info)]
dig_names = mnt_head._get_dig_names()
ref_names = [None, 'EEG000', 'REF']
Expand Down Expand Up @@ -746,13 +766,34 @@ def _backcompat_value(pos, ref_pos):
for name, use in zip(info_names, info_names_use):
_loc_view = info['chs'][info['ch_names'].index(name)]['loc']
_loc_view[:6] = _backcompat_value(ch_pos_use[use], eeg_ref_pos)

del ch_pos_use

# XXX this is probably wrong as it uses the order from the montage
# rather than the order of our info['ch_names'] ...
info['dig'] = _format_dig_points([
digpoints = [
mnt_head.dig[ii] for ii, name in enumerate(dig_names_use)
if name in (info_names_use + ref_names)])
if name in (info_names_use + ref_names)]

# get a copy of the old dig
if info['dig'] is not None:
old_dig = info['dig'].copy()
else:
old_dig = []

# determine if needed to add an extra EEG REF DigPoint
if custom_eeg_ref_dig:
# ref_name = 'EEG000' if match_case else 'eeg000'
ref_dig_dict = {'kind': FIFF.FIFFV_POINT_EEG,
'r': eeg_ref_pos,
'ident': 0,
'coord_frame': info['dig'].pop()['coord_frame']}
ref_dig_point = _format_dig_points([ref_dig_dict])[0]
# only append the reference dig point if it was already
# in the old dig
if ref_dig_point in old_dig:
digpoints.append(ref_dig_point)
info['dig'] = _format_dig_points(digpoints, enforce_order=True)

if mnt_head.dev_head_t is not None:
info['dev_head_t'] = Transform('meg', 'head', mnt_head.dev_head_t)
Expand Down
86 changes: 85 additions & 1 deletion mne/channels/tests/test_montage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from mne import __file__ as _mne_file, create_info, read_evokeds, pick_types
from mne.fixes import nullcontext
from mne.utils._testing import _dig_sort_key
from mne.utils._testing import _dig_sort_key, assert_object_equal
from mne.channels import (get_builtin_montages, DigMontage, read_dig_dat,
read_dig_egi, read_dig_captrak, read_dig_fif,
read_dig_captrack, # XXX: remove with 0.22
Expand Down Expand Up @@ -1304,6 +1304,90 @@ def test_set_montage_with_missing_coordinates():
)


def test_get_montage():
"""Test get montage from Instance.
Test with standard montage and then loaded in montage.
"""
# 1. read in testing data and assert montage roundtrip
# for testing dataset: 'test_raw.fif'
raw = read_raw_fif(fif_fname)
raw = raw.rename_channels(lambda name: name.replace('EEG ', 'EEG'))
raw2 = raw.copy()
# get montage and then set montage and
# it should be the same
montage = raw.get_montage()
raw.set_montage(montage, on_missing='raise')
test_montage = raw.get_montage()
assert_object_equal(raw.info['chs'], raw2.info['chs'])
assert_dig_allclose(raw2.info, raw.info)
assert_object_equal(raw2.info['dig'], raw.info['dig'])

# the montage does not change
assert_object_equal(montage.dig, test_montage.dig)

# 2. now do a standard montage
montage = make_standard_montage('mgh60')
# set the montage; note renaming to make standard montage map
raw.set_montage(montage)

# get montage back and set it
# the channel locations should be the same
raw2 = raw.copy()
test_montage = raw.get_montage()
raw.set_montage(test_montage, on_missing='ignore')

# chs should not change
assert_object_equal(raw2.info['chs'], raw.info['chs'])
# dig order might be different after set_montage
assert montage.ch_names == test_montage.ch_names
# note that test_montage will have different coordinate frame
# compared to standard montage
assert_dig_allclose(raw2.info, raw.info)
assert_object_equal(raw2.info['dig'], raw.info['dig'])

# 3. if montage gets set to None
raw.set_montage(None)
assert raw.get_montage() is None

# 4. read in BV test dataset and make sure montage
# fulfills roundtrip on non-standard montage
dig_montage = read_dig_fif(fif_dig_montage_fname)

# Make a BrainVision file like the one the user would have had
# with testing dataset 'test.vhdr'
raw_bv = read_raw_brainvision(bv_fname, preload=True)
raw_bv_2 = raw_bv.copy()

# rename channels to make it have the full set
# of channels
mapping = dict()
for ii, ch_name in enumerate(raw_bv.ch_names):
mapping[ch_name] = 'EEG%03d' % (ii + 1,)
raw_bv.rename_channels(mapping)
for ii, ch_name in enumerate(raw_bv_2.ch_names):
mapping[ch_name] = 'EEG%03d' % (ii + 33,)
raw_bv_2.rename_channels(mapping)
raw_bv.add_channels([raw_bv_2])
for ch in raw_bv.info['chs']:
ch['kind'] = FIFF.FIFFV_EEG_CH

# Set the montage and roundtrip
raw_bv.set_montage(dig_montage)
raw_bv2 = raw_bv.copy()

# reset the montage
test_montage = raw_bv.get_montage()
raw_bv.set_montage(test_montage, on_missing='ignore')
# dig order might be different after set_montage
assert_object_equal(raw_bv2.info['dig'], raw_bv.info['dig'])
assert_dig_allclose(raw_bv2.info, raw_bv.info)

# if dig is not set in the info, then montage returns None
raw.info['dig'] = None
assert raw.get_montage() is None


def test_read_dig_hpts():
"""Test reading .hpts file (from MNE legacy)."""
fname = op.join(
Expand Down
68 changes: 61 additions & 7 deletions mne/io/_digitization.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#
# License: BSD (3-clause)

import heapq
from collections import Counter, OrderedDict

import datetime
Expand Down Expand Up @@ -47,8 +48,49 @@
_cardinal_kind_rev = {1: 'LPA', 2: 'Nasion', 3: 'RPA', 4: 'Inion'}


def _format_dig_points(dig):
def _format_dig_points(dig, enforce_order=False):
"""Format the dig points nicely."""
if enforce_order and dig is not None:
# reorder points based on type:
# Fiducials/HPI, EEG, extra (headshape)
fids_digpoints = []
hpi_digpoints = []
eeg_digpoints = []
extra_digpoints = []
head_digpoints = []

# use a heap to enforce order on FIDS, EEG, Extra
for idx, digpoint in enumerate(dig):
ident = digpoint['ident']
kind = digpoint['kind']

# push onto heap based on 'ident' (for the order) for
# each of the possible DigPoint 'kind's
# keep track of 'idx' in case of any clashes in
# the 'ident' variable, which can occur when
# user passes in DigMontage + DigMontage
if kind == FIFF.FIFFV_POINT_CARDINAL:
heapq.heappush(fids_digpoints, (ident, idx, digpoint))
elif kind == FIFF.FIFFV_POINT_HPI:
heapq.heappush(hpi_digpoints, (ident, idx, digpoint))
elif kind == FIFF.FIFFV_POINT_EEG:
heapq.heappush(eeg_digpoints, (ident, idx, digpoint))
elif kind == FIFF.FIFFV_POINT_EXTRA:
heapq.heappush(extra_digpoints, (ident, idx, digpoint))
elif kind == FIFF.FIFFV_POINT_HEAD:
heapq.heappush(head_digpoints, (ident, idx, digpoint))

# now recreate dig based on sorted order
fids_digpoints.sort(), hpi_digpoints.sort()
eeg_digpoints.sort()
extra_digpoints.sort(), head_digpoints.sort()
new_dig = []
for idx, d in enumerate(fids_digpoints + hpi_digpoints +
extra_digpoints + eeg_digpoints +
head_digpoints):
new_dig.append(d[-1])
dig = new_dig

return [DigPoint(d) for d in dig] if dig is not None else dig


Expand Down Expand Up @@ -182,12 +224,23 @@ def write_dig(fname, pts, coord_frame=None):
}


def _foo_get_data_from_dig(dig):
# XXXX:
# This does something really similar to _read_dig_montage_fif but:
# - does not check coord_frame
# - does not do any operation that implies assumptions with the names
# XXXX:
# This does something really similar to _read_dig_montage_fif but:
# - does not check coord_frame
# - does not do any operation that implies assumptions with the names
def _get_data_as_dict_from_dig(dig):
"""Obtain coordinate data from a Dig.
Parameters
----------
dig : list of dicts
A container of DigPoints to be added to the info['dig'].
Returns
-------
ch_pos : dict
The container of all relevant channel positions inside dig.
"""
# Split up the dig points by category
hsp, hpi, elp = list(), list(), list()
fids, dig_ch_pos_location = dict(), list()
Expand All @@ -203,7 +256,8 @@ def _foo_get_data_from_dig(dig):
hsp.append(d['r'])
elif d['kind'] == FIFF.FIFFV_POINT_EEG:
# XXX: dig_ch_pos['EEG%03d' % d['ident']] = d['r']
dig_ch_pos_location.append(d['r'])
if d['ident'] != 0: # ref channel
dig_ch_pos_location.append(d['r'])

dig_coord_frames = set([d['coord_frame'] for d in dig])
assert len(dig_coord_frames) == 1, \
Expand Down

0 comments on commit 01750aa

Please sign in to comment.