diff --git a/doc/changes/latest.inc b/doc/changes/latest.inc index 96bde9ed148..230dd6821d6 100644 --- a/doc/changes/latest.inc +++ b/doc/changes/latest.inc @@ -27,6 +27,8 @@ Changelog - Add :meth:`mne.MixedSourceEstimate.surface` and :meth:`mne.MixedSourceEstimate.volume` methods to allow surface and volume extraction by `Eric Larson`_ +- Add :meth:`mne.VectorSourceEstimate.project` to project vector source estimates onto the direction of maximum source power by `Eric Larson`_ + - Add support to :func:`mne.extract_label_time_course` for vector-valued and volumetric source estimates by `Eric Larson`_ - Add method :meth:`mne.VolSourceEstimate.in_label` by `Eric Larson`_ @@ -233,6 +235,8 @@ API - The method ``stc_mixed.plot_surface`` for a :class:`mne.MixedSourceEstimate` has been deprecated in favor of :meth:`stc.surface().plot(...) ` by `Eric Larson`_ +- The method ``stc.normal`` for :class:`mne.VectorSourceEstimate` has been deprecated in favor of :meth:`stc.project('nn', src) ` by `Eric Larson`_ + - Add ``use_dev_head_trans`` parameter to :func:`mne.preprocessing.annotate_movement` to allow choosing the device to head transform is used to define the fixed cHPI coordinates by `Luke Bloy`_ - The function ``mne.channels.read_dig_captrack`` will be deprecated in version 0.22 in favor of :func:`mne.channels.read_dig_captrak` to correct the spelling error: "captraCK" -> "captraK", by `Stefan Appelhoff`_ diff --git a/examples/inverse/plot_vector_mne_solution.py b/examples/inverse/plot_vector_mne_solution.py index 8ed50bf87ae..90d5d851151 100644 --- a/examples/inverse/plot_vector_mne_solution.py +++ b/examples/inverse/plot_vector_mne_solution.py @@ -21,6 +21,7 @@ # # License: BSD (3-clause) +import numpy as np import mne from mne.datasets import sample from mne.minimum_norm import read_inverse_operator, apply_inverse @@ -52,6 +53,21 @@ brain = stc.plot( initial_time=peak_time, hemi='lh', subjects_dir=subjects_dir) +############################################################################### +# Plot the activation in the direction of maximal power for this data: + +stc_max, directions = stc.project('pca', src=inv['src']) +# These directions must by design be close to the normals because this +# inverse was computed with loose=0.2: +print('Absolute cosine similarity between source normals and directions: ' + f'{np.abs(np.sum(directions * inv["source_nn"][2::3], axis=-1)).mean()}') +brain_max = stc_max.plot( + initial_time=peak_time, hemi='lh', subjects_dir=subjects_dir, + time_label='Max power') +brain_normal = stc.project('normal', inv['src'])[0].plot( + initial_time=peak_time, hemi='lh', subjects_dir=subjects_dir, + time_label='Normal') + ############################################################################### # You can also do this with a fixed-orientation inverse. It looks a lot like # the result above because the ``loose=0.2`` orientation constraint keeps diff --git a/mne/minimum_norm/tests/test_inverse.py b/mne/minimum_norm/tests/test_inverse.py index f19d1f9892c..2c7f44f6d17 100644 --- a/mne/minimum_norm/tests/test_inverse.py +++ b/mne/minimum_norm/tests/test_inverse.py @@ -335,15 +335,28 @@ def test_localization_bias_fixed(bias_params_fixed, method, lower, upper, ('eLORETA', 99, 100, 0.8, 0.2), ('eLORETA', 99, 100, 0.8, 0.001), ]) +@pytest.mark.parametrize('pick_ori', (None, 'vector')) def test_localization_bias_loose(bias_params_fixed, method, lower, upper, - depth, loose): + depth, loose, pick_ori): """Test inverse localization bias for loose minimum-norm solvers.""" + if pick_ori == 'vector' and method == 'eLORETA': # works, but save cycles + return evoked, fwd, noise_cov, _, want = bias_params_fixed fwd = convert_forward_solution(fwd, surf_ori=False, force_fixed=False) assert not is_fixed_orient(fwd) inv_loose = make_inverse_operator(evoked.info, fwd, noise_cov, loose=loose, depth=depth) - loc = apply_inverse(evoked, inv_loose, lambda2, method).data + loc = apply_inverse( + evoked, inv_loose, lambda2, method, pick_ori=pick_ori) + if pick_ori is not None: + assert loc.data.ndim == 3 + loc, directions = loc.project('pca', src=fwd['src']) + abs_cos_sim = np.abs(np.sum( + directions * inv_loose['source_nn'][2::3], axis=1)) + assert np.percentile(abs_cos_sim, 10) > 0.9 # most very aligned + loc = abs(loc).data + else: + loc = loc.data assert (loc >= 0).all() # Compute the percentage of sources for which there is no loc bias: perc = (want == np.argmax(loc, axis=0)).mean() * 100 @@ -535,7 +548,7 @@ def test_orientation_prior(bias_params_free, method, looses, vmin, vmax, [_get_src_nn(s) for s in inv['src']])) vec_stc_surf = np.matmul(rot, vec_stc.data) if 0. in looses: - vec_stc_normal = vec_stc.normal(inv['src']) + vec_stc_normal, _ = vec_stc.project('normal', inv['src']) assert_allclose(stcs[1].data, vec_stc_normal.data) del vec_stc assert_allclose(vec_stc_normal.data, vec_stc_surf[:, 2]) @@ -1065,8 +1078,9 @@ def test_inverse_mixed(all_src_types_inv_evoked): stcs[kind].magnitude().data) assert_allclose(getattr(stcs['mixed'], kind)().magnitude().data, stcs[kind].magnitude().data) - assert_allclose(stcs['mixed'].surface().normal(surf_src).data, - stcs['surface'].normal(surf_src).data) + with pytest.deprecated_call(): + assert_allclose(stcs['mixed'].surface().normal(surf_src).data, + stcs['surface'].normal(surf_src).data) run_tests_if_main() diff --git a/mne/source_estimate.py b/mne/source_estimate.py index f4ab1f84d46..c7b0f55e0b8 100644 --- a/mne/source_estimate.py +++ b/mne/source_estimate.py @@ -17,7 +17,6 @@ from .cov import Covariance from .evoked import _get_peak from .filter import resample -from .fixes import einsum from .io.constants import FIFF from .surface import read_surface, _get_ico_surface, mesh_edges from .source_space import (_ensure_src, _get_morph_src_reordering, @@ -1786,6 +1785,8 @@ def magnitude(self): data_mag, self.vertices, self.tmin, self.tstep, self.subject, self.verbose) + @deprecated('stc.normal(src) is deprecated and will be removed in 0.22, ' + 'use stc.project("normal", src)[0] instead') @fill_doc def normal(self, src, use_cps=True): """Compute activity orthogonal to the cortex. @@ -1806,13 +1807,97 @@ def normal(self, src, use_cps=True): The source estimate only retaining the activity orthogonal to the cortex. """ - _check_src_normal('normal', src) + return self.project('normal', src, use_cps)[0] + + def _get_src_normals(self, src, use_cps): normals = np.vstack([_get_src_nn(s, use_cps, v) for s, v in - zip(src, self.vertices)]) - data_norm = einsum('ijk,ij->ik', self.data, normals) - return self._scalar_class( + zip(src, self.vertices)]) + return normals + + @fill_doc + def project(self, directions, src=None, use_cps=True): + """Project the data for each vertex in a given direction. + + Parameters + ---------- + directions : ndarray, shape (n_vertices, 3) | str + Can be: + + - ``'normal'`` + Project onto the source space normals. + - ``'pca'`` + SVD will be used to project onto the direction of maximal + power for each source. + - :class:`~numpy.ndarray`, shape (n_vertices, 3) + Projection directions for each source. + src : instance of SourceSpaces | None + The source spaces corresponding to the source estimate. + Not used when ``directions`` is an array, optional when + ``directions='pca'``. + %(use_cps)s + Should be the same value that was used when the forward model + was computed (typically True). + + Returns + ------- + stc : instance of SourceEstimate + The projected source estimate. + directions : ndarray, shape (n_vertices, 3) + The directions that were computed (or just used). + + Notes + ----- + When using SVD, there is a sign ambiguity for the direction of maximal + power. When ``src is None``, the direction is chosen that makes the + resulting time waveform sum positive (i.e., have positive amplitudes). + When ``src`` is provided, the directions are flipped in the direction + of the source normals, i.e., outward from cortex for surface source + spaces and in the +Z / superior direction for volume source spaces. + + .. versionadded:: 0.21 + """ + _validate_type(directions, (str, np.ndarray), 'directions') + _validate_type(src, (None, SourceSpaces), 'src') + if isinstance(directions, str): + _check_option('directions', directions, ('normal', 'pca'), + extra='when str') + + if directions == 'normal': + if src is None: + raise ValueError( + 'If directions="normal", src cannot be None') + _check_src_normal('normal', src) + directions = self._get_src_normals(src, use_cps) + else: + assert directions == 'pca' + x = self.data + if not np.isrealobj(self.data): + _check_option('stc.data.dtype', self.data.dtype, + (np.complex64, np.complex128)) + dtype = \ + np.float32 if x.dtype == np.complex64 else np.float64 + x = x.view(dtype) + assert x.shape[-1] == 2 * self.data.shape[-1] + u, _, v = np.linalg.svd(x, full_matrices=False) + directions = u[:, :, 0] + # The sign is arbitrary, so let's flip it in the direction that + # makes the resulting time series the most positive: + if src is None: + signs = np.sum(v[:, 0].real, axis=1, keepdims=True) + else: + normals = self._get_src_normals(src, use_cps) + signs = np.sum(directions * normals, axis=1, keepdims=True) + assert signs.shape == (self.data.shape[0], 1) + signs = np.sign(signs) + signs[signs == 0] = 1. + directions *= signs + _check_option( + 'directions.shape', directions.shape, [(self.data.shape[0], 3)]) + data_norm = np.matmul(directions[:, np.newaxis], self.data)[:, 0] + stc = self._scalar_class( data_norm, self.vertices, self.tmin, self.tstep, self.subject, self.verbose) + return stc, directions class _BaseVolSourceEstimate(_BaseSourceEstimate): diff --git a/mne/tests/test_source_estimate.py b/mne/tests/test_source_estimate.py index c44dd0f6837..0ff1ece8060 100644 --- a/mne/tests/test_source_estimate.py +++ b/mne/tests/test_source_estimate.py @@ -11,6 +11,7 @@ assert_allclose, assert_equal) import pytest from scipy import sparse +from scipy.optimize import fmin_cobyla from mne import (stats, SourceEstimate, VectorSourceEstimate, VolSourceEstimate, Label, read_source_spaces, @@ -1119,23 +1120,32 @@ def test_mixed_stc(tmpdir): (VolVectorSourceEstimate, 'discrete'), (MixedVectorSourceEstimate, 'mixed'), ]) -def test_vec_stc_basic(tmpdir, klass, kind): +@pytest.mark.parametrize('dtype', [ + np.float32, np.float64, np.complex64, np.complex128]) +def test_vec_stc_basic(tmpdir, klass, kind, dtype): """Test (vol)vector source estimate.""" nn = np.array([ [1, 0, 0], [0, 1, 0], - [0, 0, 1], + [np.sqrt(1. / 2.), 0, np.sqrt(1. / 2.)], [np.sqrt(1 / 3.)] * 3 - ]) + ], np.float32) data = np.array([ [1, 0, 0], [0, 2, 0], - [3, 0, 0], + [-3, 0, 0], [1, 1, 1], - ])[:, :, np.newaxis] - magnitudes = np.linalg.norm(data, axis=1)[:, 0] - normals = np.array([1, 2, 0, np.sqrt(3)]) + ], dtype)[:, :, np.newaxis] + amplitudes = np.array([1, 2, 3, np.sqrt(3)], dtype) + magnitudes = amplitudes.copy() + normals = np.array([1, 2, -3. / np.sqrt(2), np.sqrt(3)], dtype) + if dtype in (np.complex64, np.complex128): + data *= 1j + amplitudes *= 1j + normals *= 1j + directions = np.array( + [[1, 0, 0], [0, 1, 0], [-1, 0, 0], [1. / np.sqrt(3)] * 3]) vol_kind = kind if kind in ('discrete', 'vol') else 'vol' vol_src = SourceSpaces([dict(nn=nn, type=vol_kind)]) assert vol_src.kind == dict(vol='volume').get(vol_kind, vol_kind) @@ -1155,20 +1165,35 @@ def test_vec_stc_basic(tmpdir, klass, kind): verts = surf_verts + vol_verts assert src.kind == 'mixed' data = np.tile(data, (2, 1, 1)) + amplitudes = np.tile(amplitudes, 2) magnitudes = np.tile(magnitudes, 2) normals = np.tile(normals, 2) + directions = np.tile(directions, (2, 1)) stc = klass(data, verts, 0, 1, 'foo') + amplitudes = amplitudes[:, np.newaxis] + magnitudes = magnitudes[:, np.newaxis] # Magnitude of the vectors - assert_array_equal(stc.magnitude().data[:, 0], magnitudes) + assert_array_equal(stc.magnitude().data, magnitudes) # Vector components projected onto the vertex normals if kind in ('vol', 'mixed'): with pytest.raises(RuntimeError, match='surface or discrete'): - stc.normal(src) + stc.project('normal', src)[0] else: - normal = stc.normal(src) - assert_array_equal(normal.data[:, 0], normals) + normal = stc.project('normal', src)[0] + assert_allclose(normal.data[:, 0], normals) + + # Maximal-variance component, either to keep amps pos or to align to src-nn + projected, got_directions = stc.project('pca') + assert_allclose(got_directions, directions) + assert_allclose(projected.data, amplitudes) + projected, got_directions = stc.project('pca', src) + flips = np.array([[1], [1], [-1.], [1]]) + if klass is MixedVectorSourceEstimate: + flips = np.tile(flips, (2, 1)) + assert_allclose(got_directions, directions * flips) + assert_allclose(projected.data, amplitudes * flips) out_name = tmpdir.join('temp.h5') stc.save(out_name) @@ -1188,6 +1213,37 @@ def test_vec_stc_basic(tmpdir, klass, kind): klass(data, verts, 0, 1) +@pytest.mark.parametrize('real', (True, False)) +def test_source_estime_project(real): + """Test projecting a source estimate onto direction of max power.""" + n_src, n_times = 4, 100 + rng = np.random.RandomState(0) + data = rng.randn(n_src, 3, n_times) + if not real: + data = data + 1j * rng.randn(n_src, 3, n_times) + assert data.dtype == np.complex128 + else: + assert data.dtype == np.float64 + + # Make sure that the normal we get maximizes the power + # (i.e., minimizes the negative power) + want_nn = np.empty((n_src, 3)) + for ii in range(n_src): + x0 = np.ones(3) + + def objective(x): + x = x / np.linalg.norm(x) + return -np.linalg.norm(np.dot(x, data[ii])) + want_nn[ii] = fmin_cobyla(objective, x0, (), rhobeg=0.1, rhoend=1e-6) + want_nn /= np.linalg.norm(want_nn, axis=1, keepdims=True) + + stc = VolVectorSourceEstimate(data, [np.arange(n_src)], 0, 1) + stc_max, directions = stc.project('pca') + flips = np.sign(np.sum(directions * want_nn, axis=1, keepdims=True)) + directions *= flips + assert_allclose(directions, want_nn, atol=1e-6) + + @pytest.fixture(scope='module', params=[testing._pytest_param()]) def invs(): """Inverses of various amounts of loose.""" @@ -1251,7 +1307,8 @@ def test_vec_stc_inv_fixed(invs, pick_ori): evoked, _, _, _, fixed, fixedish = invs stc_fixed = apply_inverse(evoked, fixed) stc_fixed_vector = apply_inverse(evoked, fixed, pick_ori='vector') - assert_allclose(stc_fixed.data, stc_fixed_vector.normal(fixed['src']).data) + assert_allclose(stc_fixed.data, + stc_fixed_vector.project('normal', fixed['src'])[0].data) stc_fixedish = apply_inverse(evoked, fixedish, pick_ori=pick_ori) if pick_ori == 'vector': assert_allclose(stc_fixed_vector.data, stc_fixedish.data, atol=1e-2) @@ -1259,7 +1316,7 @@ def test_vec_stc_inv_fixed(invs, pick_ori): assert_allclose( abs(stc_fixed).data, stc_fixedish.magnitude().data, atol=1e-2) # ... and when picking the normal (signed) - stc_fixedish = stc_fixedish.normal(fixedish['src']) + stc_fixedish = stc_fixedish.project('normal', fixedish['src'])[0] elif pick_ori is None: stc_fixed = abs(stc_fixed) else: diff --git a/mne/utils/check.py b/mne/utils/check.py index 28cba60d276..352b6e03148 100644 --- a/mne/utils/check.py +++ b/mne/utils/check.py @@ -557,11 +557,11 @@ def _check_option(parameter, value, allowed_values, extra=''): '{options}, but got {value!r} instead.') allowed_values = list(allowed_values) # e.g., if a dict was given if len(allowed_values) == 1: - options = 'The only allowed value is %r' % allowed_values[0] + options = f'The only allowed value is {repr(allowed_values[0])}' else: options = 'Allowed values are ' - options += ', '.join(['%r' % v for v in allowed_values[:-1]]) - options += ' and %r' % allowed_values[-1] + options += ', '.join([f'{repr(v)}' for v in allowed_values[:-1]]) + options += f' and {repr(allowed_values[-1])}' raise ValueError(msg.format(parameter=parameter, options=options, value=value, extra=extra))