From 98c90b727037538f59ebaadf2a1ffb8e30951c3b Mon Sep 17 00:00:00 2001 From: Seung Hyun Kim Date: Sun, 15 May 2022 09:35:03 -0500 Subject: [PATCH 1/7] wip: data management page --- docs/guide/data_management.md | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/docs/guide/data_management.md b/docs/guide/data_management.md index 0735c012..9d103e8e 100644 --- a/docs/guide/data_management.md +++ b/docs/guide/data_management.md @@ -13,6 +13,10 @@ kernelspec: # Data Management +- Data +- DataManager +- load_continuous_data (raw) + ```{code-cell} ipython3 :tags: [hide-cell] @@ -20,18 +24,22 @@ import os import numpy as np import quantities as pq import matplotlib.pyplot as plt - +from glob import glob +from miv.io import * ``` -## 1. Data Load - ```{code-cell} ipython3 -:tags: [hide-cell] +datapath = './2022-03-10_16-19-09' +os.path.exists(datapath) +``` -from miv.io import load_data -from miv.io.data import Data, Dataset +```{code-cell} ipython3 +filepath = './2022-03-10_16-19-09/Record Node 104/experiment1/recording1/continuous/Rhythm_FPGA-100.0/continuous.dat' +os.path.exists(filepath) ``` +## 1. Data Load + ```{code-cell} ipython3 # Load dataset from OpenEphys recording folder_path: str = "~/Open Ephys/2022-03-10-16-19-09" # Data Path From b83a6f9c1cc263bf43ce4b23e7739e2b7cb9679d Mon Sep 17 00:00:00 2001 From: Seung Hyun Kim Date: Sun, 15 May 2022 09:43:11 -0500 Subject: [PATCH 2/7] initial: io module test --- tests/io/__init__.py | 0 tests/io/mock_continuous_signal.py | 0 tests/io/test_binary_io.py | 0 tests/io/test_data_single_module.py | 0 4 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/io/__init__.py create mode 100644 tests/io/mock_continuous_signal.py create mode 100644 tests/io/test_binary_io.py create mode 100644 tests/io/test_data_single_module.py diff --git a/tests/io/__init__.py b/tests/io/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/io/mock_continuous_signal.py b/tests/io/mock_continuous_signal.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/io/test_binary_io.py b/tests/io/test_binary_io.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/io/test_data_single_module.py b/tests/io/test_data_single_module.py new file mode 100644 index 00000000..e69de29b From 82a176b2ed608cfce87f915c779c16eaa5f2ee1b Mon Sep 17 00:00:00 2001 From: Seung Hyun Kim Date: Sun, 15 May 2022 10:04:37 -0500 Subject: [PATCH 3/7] initial: visualization module test --- tests/visualization/__init__.py | 0 tests/visualization/test_fft.py | 5 +++++ tests/visualization/test_waveform.py | 5 +++++ 3 files changed, 10 insertions(+) create mode 100644 tests/visualization/__init__.py create mode 100644 tests/visualization/test_fft.py create mode 100644 tests/visualization/test_waveform.py diff --git a/tests/visualization/__init__.py b/tests/visualization/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/visualization/test_fft.py b/tests/visualization/test_fft.py new file mode 100644 index 00000000..b371874a --- /dev/null +++ b/tests/visualization/test_fft.py @@ -0,0 +1,5 @@ +import pytest + +import numpy as np + +from miv.visualization import plot_frequency_domain diff --git a/tests/visualization/test_waveform.py b/tests/visualization/test_waveform.py new file mode 100644 index 00000000..da6dce3f --- /dev/null +++ b/tests/visualization/test_waveform.py @@ -0,0 +1,5 @@ +import pytest + +import numpy as np + +from miv.visualization import extract_waveforms, plot_waveforms From 8ace5210fddf6fa5f127b76a325c3999c01e94b2 Mon Sep 17 00:00:00 2001 From: Seung Hyun Kim Date: Sun, 15 May 2022 11:54:09 -0500 Subject: [PATCH 4/7] update: unittest for io --- miv/io/binary.py | 167 ++++--------------------------------- tests/io/test_binary_io.py | 156 ++++++++++++++++++++++++++++++++++ 2 files changed, 171 insertions(+), 152 deletions(-) diff --git a/miv/io/binary.py b/miv/io/binary.py index 8e66ade8..2fa14a83 100644 --- a/miv/io/binary.py +++ b/miv/io/binary.py @@ -33,6 +33,13 @@ def apply_channel_mask(signal: np.ndarray, channel_mask: Set[int]): ------- output signal : SignalType + Raises + ------ + IndexError + Typically raise index error when the dimension of the signal is less than 2. + AttributeError + If signal is non numpy array type. + """ num_channels = signal.shape[1] @@ -88,12 +95,17 @@ def load_recording( signal : SignalType, neo.core.AnalogSignal sampling_rate : float + Raises + ------ + AssertionError + If more than one "continuous.dat" file exist in the directory. + """ - file_path: str = glob(os.path.join(folder, "**", "*.dat", recursive=True)) + file_path: str = glob(os.path.join(folder, "**", "*.dat"), recursive=True) assert ( len(file_path) == 1 - ), f"There should be only one 'continuous.dat' file. (There exists {file_path}" + ), f"There should be only one 'continuous.dat' file. (There exists {file_path})" # load structure information dictionary info_file: str = os.path.join(folder, "structure.oebin") @@ -113,155 +125,6 @@ def load_recording( return signal, timestamps, sampling_rate -def _bitsToVolts(Data, ChInfo, Unit): # TODO: need refactor - print("Converting to uV... ", end="") - Data = {R: Rec.astype("float32") for R, Rec in Data.items()} - - if Unit.lower() == "uv": - U = 1 - elif Unit.lower() == "mv": - U = 10 ** -3 - - for R in Data.keys(): - for C in range(len(ChInfo)): - Data[R][:, C] = Data[R][:, C] * ChInfo[C]["bit_volts"] * U - if "ADC" in ChInfo[C]["channel_name"]: - Data[R][:, C] *= 10 ** 6 - - return Data - - -def _load( # TODO: Need refactor - folder, processor=None, experiment=None, recording=None, unit="uV", channel_map=[] -): - """ - Loads data recorded by Open Ephys in Binary format as numpy memmap. - - Here is example usage:: - - from miv.io.Binary import load - - folder = '/home/user//2019-07-27_00-00-00' - Data, Rate = load(folder) - - channel_map = [0,15,1,14] - recording = 3 - Data2, Rate2 = load(folder, recording=recording, channel_map=channel_map, unit='Bits') - - Original Author: - - - open-ephys/analysis-tools/Python3/Binary.py (commit: 871e003) - - original author: malfatti - - date: 2019-07-27 - - last modified by: skim449 - - date: 2022-04-11 - - Parameters - ---------- - folder: str - folder containing at least the subfolder 'experiment1'. - - processor: str or None, optional - Processor number to load, according to subsubsubfolders under - folder>experimentX/recordingY/continuous . The number used is the one - after the processor name. For example, to load data from the folder - 'Channel_Map-109_100.0' the value used should be '109'. - If not set, load all processors. - - experiment: int or None, optional - Experiment number to load, according to subfolders under folder. - If not set, load all experiments. - - recording: int or None, optional - Recording number to load, according to subsubfolders under folder>experimentX . - If not set, load all recordings. - - unit: str or None, optional - Unit to return the data, either 'uV' or 'mV' (case insensitive). In - both cases, return data in float32. Defaults to 'uV'. - If anything else, return data in int16. - - channel_map: list, optional - If empty (default), load all channels. - If not empty, return only channels in channel_map, in the provided order. - CHANNELS ARE COUNTED STARTING AT 0. - - Returns - ------- - Data: dict - Dictionary with data in the structure Data[processor][experiment][recording]. - Rate: dict - Dictionary with sampling rates in the structure Rate[processor][experiment]. - - - """ - - files = sorted(glob(folder + "/**/*.dat", recursive=True)) - info_file = sorted(glob(folder + "/*/*/structure.oebin")) - - Data, Rate = {}, {} - for F, File in enumerate(files): - File = File.replace("\\", "/") # Replace windows file delims - Exp, Rec, _, Proc = File.split("/")[-5:-1] - Exp = str(int(Exp[10:]) - 1) - Rec = str(int(Rec[9:]) - 1) - Proc = Proc.split(".")[0].split("-")[-1] - if "_" in Proc: - Proc = Proc.split("_")[0] - - if Proc not in Data.keys(): - Data[Proc], Rate[Proc] = {}, {} - - if experiment: - if int(Exp) != experiment - 1: - continue - - if recording: - if int(Rec) != recording - 1: - continue - - if processor: - if Proc != processor: - continue - - print("Loading recording", int(Rec) + 1, "...") - if Exp not in Data[Proc]: - Data[Proc][Exp] = {} - Data[Proc][Exp][Rec] = np.memmap(File, dtype="int16", mode="c") - - Info = literal_eval(open(info_file[F]).read()) - ProcIndex = [ - Info["continuous"].index(_) - for _ in Info["continuous"] - if str(_["source_processor_id"]) == Proc - ][ - 0 - ] # Changed to source_processor_id from recorded_processor_id - - ChNo = Info["continuous"][ProcIndex]["num_channels"] - if Data[Proc][Exp][Rec].shape[0] % ChNo: - print("Rec", Rec, "is broken") - del Data[Proc][Exp][Rec] - continue - - SamplesPerCh = Data[Proc][Exp][Rec].shape[0] // ChNo - Data[Proc][Exp][Rec] = Data[Proc][Exp][Rec].reshape((SamplesPerCh, ChNo)) - Rate[Proc][Exp] = Info["continuous"][ProcIndex]["sample_rate"] - - for Proc in Data.keys(): - for Exp in Data[Proc].keys(): - if unit.lower() in ["uv", "mv"]: - ChInfo = Info["continuous"][ProcIndex]["channels"] - Data[Proc][Exp] = _bitsToVolts(Data[Proc][Exp], ChInfo, unit) - - if channel_map: - Data[Proc][Exp] = apply_channel_mask(Data[Proc][Exp], channel_map) - - print("Done.") - - return Data, Rate - - def load_continuous_data( data_path: str, num_channels: int, @@ -326,4 +189,4 @@ def load_continuous_data( if start_at_zero and not np.isclose(timestamps[0], 0.0): timestamps -= timestamps[0] - return timestamps, raw_data + return timestamps, np.array(raw_data) diff --git a/tests/io/test_binary_io.py b/tests/io/test_binary_io.py index e69de29b..44a2f8b9 100644 --- a/tests/io/test_binary_io.py +++ b/tests/io/test_binary_io.py @@ -0,0 +1,156 @@ +import pytest + +import tempfile +import os +import numpy as np + +from miv.io.binary import ( + load_continuous_data, + load_recording, + oebin_read, + apply_channel_mask, +) + + +@pytest.mark.parametrize("signal", [np.arange(10), np.array([])]) +def test_apply_channel_mask_shape_failure(signal): + with pytest.raises(IndexError): + apply_channel_mask(signal, {0}) + + +@pytest.mark.parametrize("signal", [5, [1, 2, 3], (1, 5, 9)]) +def test_apply_channel_mask_non_numpy_ndtype(signal): + with pytest.raises(AttributeError): + apply_channel_mask(signal, {0}) + + +@pytest.mark.parametrize( + "signal, mask, solution", + [ + (np.arange(9).reshape([3, 3]), {0}, np.arange(9).reshape([3, 3])[:, [1, 2]]), + (np.arange(9).reshape([3, 3]), {1}, np.arange(9).reshape([3, 3])[:, [0, 2]]), + (np.arange(9).reshape([3, 3]), {0, 1}, np.arange(9).reshape([3, 3])[:, [2]]), + ( + np.arange(9).reshape([3, 3]), + {1, -1, 5}, + np.arange(9).reshape([3, 3])[:, [0, 2]], + ), + ], +) +def test_apply_channel_mask_functionality(signal, mask, solution): + output = apply_channel_mask(signal, mask) + np.testing.assert_allclose(output, solution) + + +def test_oebin_read_functionality(): + a = {"a": 1, "b": "string data", "c": True} + with tempfile.NamedTemporaryFile("w+t") as fp: + fp.writelines(str(a)) + fp.seek(0) + b = oebin_read(fp.name) + assert a == b + + +@pytest.mark.parametrize("num_channels, signal_length", [(4, 100), (1, 50), (10, 5)]) +def test_load_continuous_data_temp_file_without_timestamps(num_channels, signal_length): + signal = np.arange(signal_length * num_channels).reshape( + [signal_length, num_channels] + ) + filename = os.path.join(tempfile.mkdtemp(), "continuous.dat") + fp = np.memmap(filename, dtype="int16", mode="w+", shape=signal.shape) + fp[:] = signal[:] + fp.flush() + + timestamps, raw_data = load_continuous_data(fp.filename, num_channels, 1) + np.testing.assert_allclose(timestamps, np.arange(signal_length)) + np.testing.assert_allclose(raw_data, signal) + + +@pytest.mark.parametrize( + "num_channels, signal_length, freq", [(4, 100, 1), (1, 50, 5), (10, 5, 2)] +) +def test_load_continuous_data_temp_file_with_timestamps_shift( + num_channels, signal_length, freq +): + signal = np.arange(signal_length * num_channels).reshape( + [signal_length, num_channels] + ) + dirname = tempfile.mkdtemp() + filename = os.path.join(dirname, "continuous.dat") + timestamps_filename = os.path.join(dirname, "timestamps.npy") + # Prepare continuous.dat + fp = np.memmap(filename, dtype="int16", mode="w+", shape=signal.shape) + fp[:] = signal[:] + fp.flush() + # Prepare timestamps.npy + timestamps = np.arange(signal_length) + np.pi + np.save(timestamps_filename, timestamps) + + # With shift + out_timestamps, raw_data = load_continuous_data( + fp.filename, num_channels, freq, start_at_zero=False + ) + np.testing.assert_allclose(out_timestamps, timestamps / freq) + np.testing.assert_allclose(raw_data, signal) + + # Without shift + out_timestamps, raw_data = load_continuous_data( + fp.filename, num_channels, freq, start_at_zero=True + ) + np.testing.assert_allclose(out_timestamps, (timestamps - np.pi) / freq) + np.testing.assert_allclose(raw_data, signal) + + +@pytest.mark.parametrize( + "num_channels, signal_length, freq", [(4, 100, 1), (1, 50, 5), (10, 5, 2)] +) +def test_load_continuous_data_temp_file_timestamps_path_test( + num_channels, signal_length, freq +): + signal = np.arange(signal_length * num_channels).reshape( + [signal_length, num_channels] + ) + dirname = tempfile.mkdtemp() + filename = os.path.join(dirname, "continuous.dat") + timestamps_filename = os.path.join(dirname, "a.npy") + # Prepare continuous.dat + fp = np.memmap(filename, dtype="int16", mode="w+", shape=signal.shape) + fp[:] = signal[:] + fp.flush() + # Prepare timestamps.npy + timestamps = np.arange(signal_length) + np.save(timestamps_filename, timestamps) + + # With shift + out_timestamps, raw_data = load_continuous_data( + fp.filename, num_channels, freq, "a.npy" + ) + np.testing.assert_allclose(out_timestamps, timestamps / freq) + np.testing.assert_allclose(raw_data, signal) + + +@pytest.mark.parametrize( + "num_channels, signal_length, freq", [(4, 100, 1), (1, 50, 5), (10, 5, 2)] +) +def test_load_recording_assertion_single_data_file(num_channels, signal_length, freq): + signal = np.arange(signal_length * num_channels).reshape( + [signal_length, num_channels] + ) + + dirname = tempfile.mkdtemp() + os.makedirs(os.path.join(dirname, "continuous", "temp1")) + os.makedirs(os.path.join(dirname, "continuous", "temp2")) + filename1 = os.path.join(dirname, "continuous", "temp1", "continuous.dat") + filename2 = os.path.join(dirname, "continuous", "temp2", "continuous.dat") + # Prepare continuous.dat + fp1 = np.memmap(filename1, dtype="int16", mode="w+", shape=signal.shape) + fp2 = np.memmap(filename2, dtype="int16", mode="w+", shape=signal.shape) + fp1[:] = 1.0 + fp2[:] = 2.0 + fp1.flush() + fp2.flush() + + with pytest.raises( + AssertionError, match=r"(?=.*temp1.*)(?=.*temp2.*)(?=There should be only one)" + ): + load_recording(dirname) From 6e7f6971cecba5670b61e53f44dfe9a631aae636 Mon Sep 17 00:00:00 2001 From: Seung Hyun Kim Date: Sun, 15 May 2022 21:01:22 -0500 Subject: [PATCH 5/7] add pywavelet dependency --- docs/overview/references.rst | 7 +++++++ requirements.txt | 1 + 2 files changed, 8 insertions(+) diff --git a/docs/overview/references.rst b/docs/overview/references.rst index 680151c6..1d26e3af 100644 --- a/docs/overview/references.rst +++ b/docs/overview/references.rst @@ -8,8 +8,15 @@ Neural Ensemble - Python-Neo [1]_ - Elephant/Viziphant [2]_ +Algorithm +######### + +- PyWavelets [3]_ + --------------- .. [1] Garcia S., Guarino D., Jaillet F., Jennings T.R., Pröpper R., Rautenberg P.L., Rodgers C., Sobolev A.,Wachtler T., Yger P. and Davison A.P. (2014) Neo: an object model for handling electrophysiology data in multiple formats. Frontiers in Neuroinformatics 8:10: doi:10.3389/fninf.2014.00010 .. [2] Denker M, Yegenoglu A, Grün S (2018) Collaborative HPC-enabled workflows on the HBP Collaboratory using the Elephant framework. Neuroinformatics 2018, P19. doi:10.12751/incf.ni2018.0019 + +.. [3] Gregory R. Lee, Ralf Gommers, Filip Wasilewski, Kai Wohlfahrt, Aaron O’Leary (2019). PyWavelets: A Python package for wavelet analysis. Journal of Open Source Software, 4(36), 1237, https://doi.org/10.21105/joss.01237. diff --git a/requirements.txt b/requirements.txt index 85004cdd..b4745d26 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ numpy>=1.19.2 omegaconf pandas Pillow +PyWavelets quantities scikit-learn scipy>=1.5.2 From 3b543161bf65e4b7a0e983ed327d7beb707bc3ac Mon Sep 17 00:00:00 2001 From: Seung Hyun Kim Date: Sun, 15 May 2022 21:40:19 -0500 Subject: [PATCH 6/7] update: resolve mypy --- miv/io/binary.py | 9 +++++---- miv/io/data.py | 10 +++++----- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/miv/io/binary.py b/miv/io/binary.py index 2fa14a83..e3edfb34 100644 --- a/miv/io/binary.py +++ b/miv/io/binary.py @@ -43,8 +43,8 @@ def apply_channel_mask(signal: np.ndarray, channel_mask: Set[int]): """ num_channels = signal.shape[1] - channel_index = set(range(num_channels)) - channel_mask - channel_index = np.array(np.sort(list(channel_index))) + channel_index_set = set(range(num_channels)) - channel_mask + channel_index = np.array(np.sort(list(channel_index_set))) signal = signal[:, channel_index] return signal @@ -102,7 +102,7 @@ def load_recording( """ - file_path: str = glob(os.path.join(folder, "**", "*.dat"), recursive=True) + file_path: List[str] = glob(os.path.join(folder, "**", "*.dat"), recursive=True) assert ( len(file_path) == 1 ), f"There should be only one 'continuous.dat' file. (There exists {file_path})" @@ -114,7 +114,8 @@ def load_recording( sampling_rate: float = info["continuous"][0]["sample_rate"] # channel_info: Dict[str, Any] = info["continuous"][0]["channels"] - signal, timestamps = load_continuous_data(file_path, num_channels, sampling_rate) + # TODO: maybe need to support multiple continuous.dat files + signal, timestamps = load_continuous_data(file_path[0], num_channels, sampling_rate) if channel_mask is not None: signal = apply_channel_mask(signal, channel_mask) diff --git a/miv/io/data.py b/miv/io/data.py index 8a1ee9b8..c6b56e3d 100644 --- a/miv/io/data.py +++ b/miv/io/data.py @@ -24,7 +24,7 @@ """ __all__ = ["Data", "DataManager"] -from typing import Any, Optional, Iterable, Callable, List +from typing import Any, Optional, Iterable, Callable, List, Set from collections.abc import MutableSequence import logging @@ -85,8 +85,8 @@ def __init__( self, data_path: str, ): - self.data_path = data_path - self.masking_channel_set = set() + self.data_path: str = data_path + self.masking_channel_set: Set[int] = set() @contextmanager def load(self): @@ -248,7 +248,7 @@ def get_experiments_recordings(data_paths: str) -> Iterable[str]: # fmt: off list_of_experiments_to_process = [] for path in data_paths: - path_list = [path for path in glob.glob(os.path.join(path, "*", "*", "*")) if "Record Node" in path and "recording" in path and os.path.isdir(path)] + path_list = [path for path in glob(os.path.join(path, "*", "*", "*")) if "Record Node" in path and "recording" in path and os.path.isdir(path)] list_of_experiments_to_process.extend(path_list) # fmt: on return list_of_experiments_to_process @@ -258,7 +258,7 @@ def get_analysis_paths(data_paths: str, output_folder_name: str) -> Iterable[str # fmt: off list_of_analysis_paths = [] for path in data_paths: - path_list = [path for path in glob.glob(os.path.join(path, "*", "*", "*", "*")) if ("Record Node" in path) and ("recording" in path) and (output_folder_name in path) and os.path.isdir(path)] + path_list = [path for path in glob(os.path.join(path, "*", "*", "*", "*")) if ("Record Node" in path) and ("recording" in path) and (output_folder_name in path) and os.path.isdir(path)] list_of_analysis_paths.extend(path_list) # fmt: on return list_of_analysis_paths From 5f03f9a635274a2178d16e746f2af48dc513ba3b Mon Sep 17 00:00:00 2001 From: Seung Hyun Kim Date: Sun, 15 May 2022 22:14:59 -0500 Subject: [PATCH 7/7] update: permission issue for window test --- tests/io/test_binary_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/io/test_binary_io.py b/tests/io/test_binary_io.py index 44a2f8b9..de5f1a62 100644 --- a/tests/io/test_binary_io.py +++ b/tests/io/test_binary_io.py @@ -44,7 +44,7 @@ def test_apply_channel_mask_functionality(signal, mask, solution): def test_oebin_read_functionality(): a = {"a": 1, "b": "string data", "c": True} - with tempfile.NamedTemporaryFile("w+t") as fp: + with tempfile.NamedTemporaryFile("w+t", delete=False) as fp: fp.writelines(str(a)) fp.seek(0) b = oebin_read(fp.name)