From ccb9c1716d1f501a7fdcd8b5edb00419a1ae1ebd Mon Sep 17 00:00:00 2001 From: Martin Luessi Date: Thu, 13 Oct 2011 13:33:27 -0400 Subject: [PATCH 1/6] wip on overlap add --- mne/filter.py | 95 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 86 insertions(+), 9 deletions(-) diff --git a/mne/filter.py b/mne/filter.py index c2f4d73bdf9..b492e3cd754 100644 --- a/mne/filter.py +++ b/mne/filter.py @@ -3,7 +3,70 @@ from scipy.fftpack import fft, ifft -def band_pass_filter(x, Fs, Fp1, Fp2): +def overlap_add_filter(x, h, N_fft=None): + """ Calculate linear convolution using overlap-add FFTs + + Implements the linear convolution between x and h using overlap-add FFTs. + + Parameters + ---------- + x : 1d array + Signal to filter + h : 1d array + Filter impule response (FIR filter coefficients) + N_fft : int + Length of the FFT. If None, the best size is determined automatically. + """ + + # https://ccrma.stanford.edu/~jos/fp/Forward_Backward_Filtering.html + # http://www.mathworks.com/matlabcentral/fileexchange/17061-filtfilthd/content/filtfilthd.m + N_h = len(h) + N_x = len(x) + + if N_fft == None: + if N_x > N_h: + N = 2**np.arange(np.ceil(np.log2(N_h)), + np.floor(np.log2(N_x))) + cost = np.ceil(N_x / (N - N_h + 1)) * N * (np.log2(N) + 1) + N_fft = N[np.argmin(cost)] + else: + # Use only a single block + N_fft = 2**np.ceil(np.log2(N_x + N_h - 1)) + + print 'FFT length: %d' % (N_fft) + + N_seg = N_fft - N_h + 1 + + if N_fft <= 0: + raise ValueError('N_fft is too short, has to be at least len(h)') + + # Filter in frequency domain + # FIXME: abs? + h_fft = np.abs(fft(np.concatenate((h, np.zeros(N_fft - N_h))))) + + x_filtered = np.zeros_like(x) + + # Go through segments + num_segments = int(np.ceil(N_x / float(N_seg))) + + for seg_ind in range(num_segments): + seg = x[seg_ind*N_seg:(seg_ind+1)*N_seg] + seg_fft = fft(np.concatenate((seg, np.zeros(N_fft - len(seg))))) + + + if seg_ind*N_seg+N_fft < N_x: + x_filtered[seg_ind*N_seg:seg_ind*N_seg+N_fft] \ + += np.real(ifft(h_fft * seg_fft)) + else: + # Last segment + x_filtered[seg_ind*N_seg:] \ + += np.real(ifft(h_fft * seg_fft))[:N_x-seg_ind*N_seg] + + return x_filtered + + + +def band_pass_filter(x, Fs, Fp1, Fp2, ov_add): """Bandpass filter for the signal x. An acausal fft algorithm is applied (i.e. no phase shift). The filter @@ -64,14 +127,28 @@ def band_pass_filter(x, Fs, Fp1, Fp2): Np2 = Fp2 / (Fs / 2) # Construct the filter function H(f) - N = len(x) - - B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0]) - - # Make zero-phase filter function - H = np.abs(fft(B)) - - xf = np.real(ifft(fft(x) * H)) +# N = len(x) +# B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0]) +# +# # Make zero-phase filter function +# H = np.abs(fft(B)) +# xf = np.real(ifft(fft(x) * H)) +# + + if ov_add: + N = int(2 * Fs) + B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0]) + + xf = overlap_add_filter(x, B) + else: + N = int(2 * Fs) + B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0]) + + B = ifft(np.abs(fft(B))) + # Make zero-phase filter function + + xf = (signal.convolve(x, B, 'full')) + xf = xf[:Norig] x = x[:Norig] From e3a2b8a16293c902ed83d5f26765fb8bdddddac3 Mon Sep 17 00:00:00 2001 From: Martin Luessi Date: Fri, 14 Oct 2011 09:13:00 -0400 Subject: [PATCH 2/6] zero phase, not yet working correctly --- mne/filter.py | 91 ++++++++++++++++++++++++++++----------------------- 1 file changed, 50 insertions(+), 41 deletions(-) diff --git a/mne/filter.py b/mne/filter.py index b492e3cd754..90a88e00893 100644 --- a/mne/filter.py +++ b/mne/filter.py @@ -3,7 +3,7 @@ from scipy.fftpack import fft, ifft -def overlap_add_filter(x, h, N_fft=None): +def overlap_add_filter(x, h, N_fft=None, zero_phase=True): """ Calculate linear convolution using overlap-add FFTs Implements the linear convolution between x and h using overlap-add FFTs. @@ -25,9 +25,11 @@ def overlap_add_filter(x, h, N_fft=None): if N_fft == None: if N_x > N_h: + N_tot = 2*N_x if zero_phase else N_x + N = 2**np.arange(np.ceil(np.log2(N_h)), - np.floor(np.log2(N_x))) - cost = np.ceil(N_x / (N - N_h + 1)) * N * (np.log2(N) + 1) + np.floor(np.log2(N_tot))) + cost = np.ceil(N_tot / (N - N_h + 1)) * N * (np.log2(N) + 1) N_fft = N[np.argmin(cost)] else: # Use only a single block @@ -41,32 +43,53 @@ def overlap_add_filter(x, h, N_fft=None): raise ValueError('N_fft is too short, has to be at least len(h)') # Filter in frequency domain - # FIXME: abs? - h_fft = np.abs(fft(np.concatenate((h, np.zeros(N_fft - N_h))))) - + h_fft = fft(np.concatenate((h, np.zeros(N_fft - N_h)))) + + if zero_phase: + # We will apply the filter in forward and backward direction: Scale + # frequency response of the filter so that the shape of the amplitude + # response stays the same when it is applied twice + + # be careful not to divide by too small numbers + #idx = np.where(np.abs(h_fft) > 1e-6) + #h_fft[idx] = h_fft[idx] / np.sqrt(np.abs(h_fft[idx])) + pass + x_filtered = np.zeros_like(x) - # Go through segments + # Number of segements (including fractional segments) num_segments = int(np.ceil(N_x / float(N_seg))) + + filter_input = x - for seg_ind in range(num_segments): - seg = x[seg_ind*N_seg:(seg_ind+1)*N_seg] - seg_fft = fft(np.concatenate((seg, np.zeros(N_fft - len(seg))))) - - - if seg_ind*N_seg+N_fft < N_x: - x_filtered[seg_ind*N_seg:seg_ind*N_seg+N_fft] \ - += np.real(ifft(h_fft * seg_fft)) - else: - # Last segment - x_filtered[seg_ind*N_seg:] \ - += np.real(ifft(h_fft * seg_fft))[:N_x-seg_ind*N_seg] - + for pass_no in range(2) if zero_phase else range(1): + + if pass_no == 1: + # second pass: flip signal + filter_input = x_filtered[::-1] + x_filtered = np.zeros_like(x) + + for seg_ind in range(num_segments): + seg = filter_input[seg_ind*N_seg:(seg_ind+1)*N_seg] + seg_fft = fft(np.concatenate((seg, np.zeros(N_fft - len(seg))))) + + + if seg_ind*N_seg+N_fft < N_x: + x_filtered[seg_ind*N_seg:seg_ind*N_seg+N_fft] \ + += np.real(ifft(h_fft * seg_fft)) + else: + # Last segment + x_filtered[seg_ind*N_seg:] \ + += np.real(ifft(h_fft * seg_fft))[:N_x-seg_ind*N_seg] + if zero_phase: + # flip signal back + x_filtered = x_filtered[::-1] + return x_filtered -def band_pass_filter(x, Fs, Fp1, Fp2, ov_add): +def band_pass_filter(x, Fs, Fp1, Fp2): """Bandpass filter for the signal x. An acausal fft algorithm is applied (i.e. no phase shift). The filter @@ -127,28 +150,14 @@ def band_pass_filter(x, Fs, Fp1, Fp2, ov_add): Np2 = Fp2 / (Fs / 2) # Construct the filter function H(f) -# N = len(x) -# B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0]) -# -# # Make zero-phase filter function -# H = np.abs(fft(B)) -# xf = np.real(ifft(fft(x) * H)) -# - - if ov_add: - N = int(2 * Fs) - B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0]) + N = len(x) - xf = overlap_add_filter(x, B) - else: - N = int(2 * Fs) - B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0]) + B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0]) - B = ifft(np.abs(fft(B))) - # Make zero-phase filter function - - xf = (signal.convolve(x, B, 'full')) - + # Make zero-phase filter function + H = np.abs(fft(B)) + + xf = np.real(ifft(fft(x) * H)) xf = xf[:Norig] x = x[:Norig] From 5e2766fd27935ad4a10a5eef9d241245cfdbdbc9 Mon Sep 17 00:00:00 2001 From: Martin Luessi Date: Fri, 14 Oct 2011 17:06:46 -0400 Subject: [PATCH 3/6] zero-phase using overlap-add working, code needs cleanup --- mne/filter.py | 149 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 123 insertions(+), 26 deletions(-) diff --git a/mne/filter.py b/mne/filter.py index 90a88e00893..7f6f171c0e6 100644 --- a/mne/filter.py +++ b/mne/filter.py @@ -3,7 +3,7 @@ from scipy.fftpack import fft, ifft -def overlap_add_filter(x, h, N_fft=None, zero_phase=True): +def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): """ Calculate linear convolution using overlap-add FFTs Implements the linear convolution between x and h using overlap-add FFTs. @@ -17,16 +17,24 @@ def overlap_add_filter(x, h, N_fft=None, zero_phase=True): N_fft : int Length of the FFT. If None, the best size is determined automatically. """ - - # https://ccrma.stanford.edu/~jos/fp/Forward_Backward_Filtering.html - # http://www.mathworks.com/matlabcentral/fileexchange/17061-filtfilthd/content/filtfilthd.m + N_h = len(h) - N_x = len(x) + + # Extend the signal at the edges (see scipy.signal.filtfitlt) + #if N_h < 3 * len(x): + edge = 3 * N_h + + x_ext = np.r_[2*x[0]-x[edge-1:0:-1], x, 2*x[-1]-x[-2:-edge-1:-1]] + + print 'length x_ext: %d edge: %d' % (len(x_ext), edge) + + N_x = len(x_ext) + if N_fft == None: if N_x > N_h: N_tot = 2*N_x if zero_phase else N_x - + N = 2**np.arange(np.ceil(np.log2(N_h)), np.floor(np.log2(N_tot))) cost = np.ceil(N_tot / (N - N_h + 1)) * N * (np.log2(N) + 1) @@ -34,45 +42,43 @@ def overlap_add_filter(x, h, N_fft=None, zero_phase=True): else: # Use only a single block N_fft = 2**np.ceil(np.log2(N_x + N_h - 1)) - + print 'FFT length: %d' % (N_fft) - + N_seg = N_fft - N_h + 1 if N_fft <= 0: raise ValueError('N_fft is too short, has to be at least len(h)') # Filter in frequency domain - h_fft = fft(np.concatenate((h, np.zeros(N_fft - N_h)))) - + h_fft = fft(np.r_[h, np.zeros(N_fft - N_h)]) + if zero_phase: # We will apply the filter in forward and backward direction: Scale # frequency response of the filter so that the shape of the amplitude # response stays the same when it is applied twice # be careful not to divide by too small numbers - #idx = np.where(np.abs(h_fft) > 1e-6) - #h_fft[idx] = h_fft[idx] / np.sqrt(np.abs(h_fft[idx])) - pass - - x_filtered = np.zeros_like(x) + idx = np.where(np.abs(h_fft) > 1e-6) + h_fft[idx] = h_fft[idx] / np.sqrt(np.abs(h_fft[idx])) + + x_filtered = np.zeros_like(x_ext) # Number of segements (including fractional segments) num_segments = int(np.ceil(N_x / float(N_seg))) - - filter_input = x + + filter_input = x_ext for pass_no in range(2) if zero_phase else range(1): - + if pass_no == 1: # second pass: flip signal - filter_input = x_filtered[::-1] - x_filtered = np.zeros_like(x) - + filter_input = np.flipud(x_filtered) + x_filtered = np.zeros_like(x_ext) + for seg_ind in range(num_segments): seg = filter_input[seg_ind*N_seg:(seg_ind+1)*N_seg] - seg_fft = fft(np.concatenate((seg, np.zeros(N_fft - len(seg))))) - + seg_fft = fft(np.r_[seg, np.zeros(N_fft - len(seg))]) if seg_ind*N_seg+N_fft < N_x: x_filtered[seg_ind*N_seg:seg_ind*N_seg+N_fft] \ @@ -81,13 +87,104 @@ def overlap_add_filter(x, h, N_fft=None, zero_phase=True): # Last segment x_filtered[seg_ind*N_seg:] \ += np.real(ifft(h_fft * seg_fft))[:N_x-seg_ind*N_seg] + + # Remove edges that we added + x_filtered = x_filtered[edge-1:-edge+1] + if zero_phase: # flip signal back - x_filtered = x_filtered[::-1] - + x_filtered = np.flipud(x_filtered) + return x_filtered - + +def _filter(x, Fs, freq, gain): + + assert x.ndim == 1 + + # normalize frequencies + freq = [f / (Fs / 2) for f in freq] + + if len(x) < 10*Fs: + # TODO: how to decide which filter to use + + # Short signal: use direct FFT filter + + # Make x EVEN + Norig = len(x) + if Norig % 2 == 1: + x = np.r_[x, 1] + N = len(x) + + H = signal.firwin2(N, freq, gain) + + # Make zero-phase filter function + B = np.abs(fft(H)) + + xf = np.real(ifft(fft(x) * B)) + xf = xf[:Norig] + x = x[:Norig] + else: + # Use overlap-add filter + N = int(10 * Fs) + H = signal.firwin2(N, freq, gain) + xf = _overlap_add_filter(x, H, zero_phase=True) + + return xf + + +def new_band_pass_filter(x, Fs, Fp1, Fp2): + """Bandpass filter for the signal x. + + An acausal fft algorithm is applied (i.e. no phase shift). The filter + functions is constructed from a Hamming window (window used in "firwin2" + function) to avoid ripples in the frequency reponse (windowing is a + smoothing in frequency domain) + + Parameters + ---------- + x : 1d array + Signal to filter + Fs : float + sampling rate + Fp1 : float + low cut-off frequency + Fp2 : float + high cut-off frequency + + Returns + ------- + xf : array + x filtered + + Notes + ----- + The passbands (Fp1 Fp2) frequencies are defined in Hz as + ---------- + /| | \ + / | | \ + / | | \ + / | | \ + ---------- | | ----------------- + | | + Fs1 Fp1 Fp2 Fs2 + + DEFAULTS values + Fs1 = Fp1 - 0.5 in Hz + Fs2 = Fp2 + 0.5 in Hz + """ + Fs = float(Fs) + Fp1 = float(Fp1) + Fp2 = float(Fp2) + + # Default values in Hz + Fs1 = Fp1 - 0.5 + Fs2 = Fp2 + 0.5 + + xf = _filter(x, Fs, [0, Fs1, Fp1, Fp2, Fs2, Fs/2], [0, 0, 1, 1, 0, 0]) + + return xf + def band_pass_filter(x, Fs, Fp1, Fp2): """Bandpass filter for the signal x. From 02baee5e69fa838e5617b7982768e5322a310c1b Mon Sep 17 00:00:00 2001 From: Martin Luessi Date: Tue, 25 Oct 2011 17:23:08 -0400 Subject: [PATCH 4/6] All filters using overlap-add --- mne/filter.py | 222 +++++++++++++++----------------------------------- 1 file changed, 67 insertions(+), 155 deletions(-) diff --git a/mne/filter.py b/mne/filter.py index 7f6f171c0e6..c37e18d10f9 100644 --- a/mne/filter.py +++ b/mne/filter.py @@ -4,9 +4,12 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): - """ Calculate linear convolution using overlap-add FFTs + """ Filter using overlap-add FFTs. - Implements the linear convolution between x and h using overlap-add FFTs. + Filters the signal x using a filter with the impulse response h. + If zero_phase==True, the amplitude response is scaled and the filter is + applied in forward and backward direction, resulting in a zero-phase + filter. Parameters ---------- @@ -16,21 +19,27 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): Filter impule response (FIR filter coefficients) N_fft : int Length of the FFT. If None, the best size is determined automatically. + zero_phase : boolean + If True: the filter is applied in forward and backward direction, + resulting in a zero-phase filter + + Returns + ------- + xf : 1d array + x filtered """ N_h = len(h) + + # Extend the signal by mirroring the edges to reduce transient filter + # response + N_edge = min(N_h, len(x)) - # Extend the signal at the edges (see scipy.signal.filtfitlt) - #if N_h < 3 * len(x): + x_ext = np.r_[2*x[0]-x[N_edge-1:0:-1], x, 2*x[-1]-x[-2:-N_edge-1:-1]] - edge = 3 * N_h - - x_ext = np.r_[2*x[0]-x[edge-1:0:-1], x, 2*x[-1]-x[-2:-edge-1:-1]] - - print 'length x_ext: %d edge: %d' % (len(x_ext), edge) - N_x = len(x_ext) + # Determine FFT length to use if N_fft == None: if N_x > N_h: N_tot = 2*N_x if zero_phase else N_x @@ -43,10 +52,6 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): # Use only a single block N_fft = 2**np.ceil(np.log2(N_x + N_h - 1)) - print 'FFT length: %d' % (N_fft) - - N_seg = N_fft - N_h + 1 - if N_fft <= 0: raise ValueError('N_fft is too short, has to be at least len(h)') @@ -64,8 +69,11 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): x_filtered = np.zeros_like(x_ext) + # Segment length for signal x + N_seg = N_fft - N_h + 1 + # Number of segements (including fractional segments) - num_segments = int(np.ceil(N_x / float(N_seg))) + num_segments = np.ceil(N_x / float(N_seg)) filter_input = x_ext @@ -88,8 +96,8 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): x_filtered[seg_ind*N_seg:] \ += np.real(ifft(h_fft * seg_fft))[:N_x-seg_ind*N_seg] - # Remove edges that we added - x_filtered = x_filtered[edge-1:-edge+1] + # Remove mirrored edges that we added + x_filtered = x_filtered[N_edge-1:-N_edge+1] if zero_phase: # flip signal back @@ -99,16 +107,39 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): def _filter(x, Fs, freq, gain): + """ Filter signal using gain control points in the frequency domain. + + The filter impulse response is constructed from a Hamming window (window + used in "firwin2" function) to avoid ripples in the frequency reponse + (windowing is a smoothing in frequency domain). + If the signal is shorter than 10 seconds, it is filtered directly using + FFTs (zero-phase). If the signal is longer, zero-phase overlap-add + filtering is used. + + Parameters + ---------- + x : 1d array + Signal to filter + Fs : float + Sampling rate in Hz + freq : 1d array + Frequency sampling points in Hz + gain : 1d array + Filter gain at frequency sampling points + + Returns + ------- + xf : 1d array + x filtered + """ assert x.ndim == 1 # normalize frequencies freq = [f / (Fs / 2) for f in freq] if len(x) < 10*Fs: - # TODO: how to decide which filter to use - - # Short signal: use direct FFT filter + # Use direct FFT filtering for short signals # Make x EVEN Norig = len(x) @@ -133,14 +164,11 @@ def _filter(x, Fs, freq, gain): return xf -def new_band_pass_filter(x, Fs, Fp1, Fp2): +def band_pass_filter(x, Fs, Fp1, Fp2): """Bandpass filter for the signal x. - An acausal fft algorithm is applied (i.e. no phase shift). The filter - functions is constructed from a Hamming window (window used in "firwin2" - function) to avoid ripples in the frequency reponse (windowing is a - smoothing in frequency domain) - + Applies a zero-phase bandpass filter to the signal x. + Parameters ---------- x : 1d array @@ -186,88 +214,10 @@ def new_band_pass_filter(x, Fs, Fp1, Fp2): return xf -def band_pass_filter(x, Fs, Fp1, Fp2): - """Bandpass filter for the signal x. - - An acausal fft algorithm is applied (i.e. no phase shift). The filter - functions is constructed from a Hamming window (window used in "firwin2" - function) to avoid ripples in the frequency reponse (windowing is a - smoothing in frequency domain) - - Parameters - ---------- - x : 1d array - Signal to filter - Fs : float - sampling rate - Fp1 : float - low cut-off frequency - Fp2 : float - high cut-off frequency - - Returns - ------- - xf : array - x filtered - - Notes - ----- - The passbands (Fp1 Fp2) frequencies are defined in Hz as - ---------- - /| | \ - / | | \ - / | | \ - / | | \ - ---------- | | ----------------- - | | - Fs1 Fp1 Fp2 Fs2 - - DEFAULTS values - Fs1 = Fp1 - 0.5 in Hz - Fs2 = Fp2 + 0.5 in Hz - """ - Fp1 = float(Fp1) - Fp2 = float(Fp2) - - # Default values in Hz - Fs1 = Fp1 - 0.5 - Fs2 = Fp2 + 0.5 - - assert x.ndim == 1 - - # Make x EVEN - Norig = len(x) - if Norig % 2 == 1: - x = np.r_[x, 1] - - # Normalize frequencies - Ns1 = Fs1 / (Fs / 2) - Ns2 = Fs2 / (Fs / 2) - Np1 = Fp1 / (Fs / 2) - Np2 = Fp2 / (Fs / 2) - - # Construct the filter function H(f) - N = len(x) - - B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0]) - - # Make zero-phase filter function - H = np.abs(fft(B)) - - xf = np.real(ifft(fft(x) * H)) - xf = xf[:Norig] - x = x[:Norig] - - return xf - - def low_pass_filter(x, Fs, Fp): """Lowpass filter for the signal x. - An acausal fft algorithm is applied (i.e. no phase shift). The filter - functions is constructed from a Hamming window (window used in "firwin2" - function) to avoid ripples in the frequency reponse (windowing is a - smoothing in frequency domain) + Applies a zero-phase lowpass filter to the signal x. Parameters ---------- @@ -296,41 +246,20 @@ def low_pass_filter(x, Fs, Fp): Fp Fp+0.5 """ + Fs = float(Fs) Fp = float(Fp) - - assert x.ndim == 1 - - # Make x EVEN - Norig = len(x) - if Norig % 2 == 1: - x = np.r_[x, 1] - - # Normalize frequencies - Ns = (Fp + 0.5) / (Fs / 2) - Np = Fp / (Fs / 2) - - # Construct the filter function H(f) - N = len(x) - - B = signal.firwin2(N, [0, Np, Ns, 1], [1, 1, 0, 0]) - - # Make zero-phase filter function - H = np.abs(fft(B)) - - xf = np.real(ifft(fft(x) * H)) - xf = xf[:Norig] - x = x[:Norig] - + + Fstop = Fp + 0.5 + + xf = _filter(x, Fs, [0, Fp, Fstop, Fs/2], [1, 1, 0, 0]) + return xf def high_pass_filter(x, Fs, Fp): """Highpass filter for the signal x. - An acausal fft algorithm is applied (i.e. no phase shift). The filter - functions is constructed from a Hamming window (window used in "firwin2" - function) to avoid ripples in the frequency reponse (windowing is a - smoothing in frequency domain) + Applies a zero-phase highpass filter to the signal x. Parameters ---------- @@ -359,29 +288,12 @@ def high_pass_filter(x, Fs, Fp): Fp-0.5 Fp """ + + Fs = float(Fs) Fp = float(Fp) + + Fstop = Fp - 0.5 - assert x.ndim == 1 - - # Make x ODD - Norig = len(x) - if Norig % 2 == 0: - x = np.r_[x, 1] - - # Normalize frequencies - Ns = (Fp - 0.5) / (Fs / 2) - Np = Fp / (Fs / 2) - - # Construct the filter function H(f) - N = len(x) - - B = signal.firwin2(N, [0, Ns, Np, 1], [0, 0, 1, 1]) - - # Make zero-phase filter function - H = np.abs(fft(B)) - - xf = np.real(ifft(fft(x) * H)) - xf = xf[:Norig] - x = x[:Norig] + xf = _filter(x, Fs, [0, Fstop, Fp, Fs/2], [0, 0, 1, 1]) return xf From 4deab5f59f95ba0b0ed9984bc04ff62511004e61 Mon Sep 17 00:00:00 2001 From: Martin Luessi Date: Wed, 26 Oct 2011 10:33:59 -0400 Subject: [PATCH 5/6] ENH: overlap-add filtering for long signals --- mne/filter.py | 69 +++++++++++++++++++++++----------------- mne/tests/test_filter.py | 14 ++++---- 2 files changed, 47 insertions(+), 36 deletions(-) diff --git a/mne/filter.py b/mne/filter.py index c37e18d10f9..d91c5c8e009 100644 --- a/mne/filter.py +++ b/mne/filter.py @@ -9,7 +9,7 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): Filters the signal x using a filter with the impulse response h. If zero_phase==True, the amplitude response is scaled and the filter is applied in forward and backward direction, resulting in a zero-phase - filter. + filter. Parameters ---------- @@ -30,13 +30,13 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): """ N_h = len(h) - + # Extend the signal by mirroring the edges to reduce transient filter # response N_edge = min(N_h, len(x)) x_ext = np.r_[2*x[0]-x[N_edge-1:0:-1], x, 2*x[-1]-x[-2:-N_edge-1:-1]] - + N_x = len(x_ext) # Determine FFT length to use @@ -44,9 +44,9 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): if N_x > N_h: N_tot = 2*N_x if zero_phase else N_x - N = 2**np.arange(np.ceil(np.log2(N_h)), + N = 2**np.arange(np.ceil(np.log2(N_h)), np.floor(np.log2(N_tot))) - cost = np.ceil(N_tot / (N - N_h + 1)) * N * (np.log2(N) + 1) + cost = np.ceil(N_tot / (N - N_h + 1)) * N * (np.log2(N) + 1) N_fft = N[np.argmin(cost)] else: # Use only a single block @@ -55,14 +55,14 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): if N_fft <= 0: raise ValueError('N_fft is too short, has to be at least len(h)') - # Filter in frequency domain + # Filter in frequency domain h_fft = fft(np.r_[h, np.zeros(N_fft - N_h)]) if zero_phase: # We will apply the filter in forward and backward direction: Scale # frequency response of the filter so that the shape of the amplitude # response stays the same when it is applied twice - + # be careful not to divide by too small numbers idx = np.where(np.abs(h_fft) > 1e-6) h_fft[idx] = h_fft[idx] / np.sqrt(np.abs(h_fft[idx])) @@ -73,7 +73,7 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): N_seg = N_fft - N_h + 1 # Number of segements (including fractional segments) - num_segments = np.ceil(N_x / float(N_seg)) + num_segments = int(np.ceil(N_x / float(N_seg))) filter_input = x_ext @@ -87,10 +87,10 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): for seg_ind in range(num_segments): seg = filter_input[seg_ind*N_seg:(seg_ind+1)*N_seg] seg_fft = fft(np.r_[seg, np.zeros(N_fft - len(seg))]) - - if seg_ind*N_seg+N_fft < N_x: + + if seg_ind*N_seg+N_fft < N_x: x_filtered[seg_ind*N_seg:seg_ind*N_seg+N_fft] \ - += np.real(ifft(h_fft * seg_fft)) + += np.real(ifft(h_fft * seg_fft)) else: # Last segment x_filtered[seg_ind*N_seg:] \ @@ -103,18 +103,18 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): # flip signal back x_filtered = np.flipud(x_filtered) - return x_filtered + return x_filtered + - def _filter(x, Fs, freq, gain): """ Filter signal using gain control points in the frequency domain. - The filter impulse response is constructed from a Hamming window (window - used in "firwin2" function) to avoid ripples in the frequency reponse - (windowing is a smoothing in frequency domain). + The filter impulse response is constructed from a Hamming window (window + used in "firwin2" function) to avoid ripples in the frequency reponse + (windowing is a smoothing in frequency domain). - If the signal is shorter than 10 seconds, it is filtered directly using - FFTs (zero-phase). If the signal is longer, zero-phase overlap-add + If the signal is shorter than 1 minute, it is filtered directly using + FFTs (zero-phase). If the signal is longer, zero-phase overlap-add filtering is used. Parameters @@ -138,13 +138,16 @@ def _filter(x, Fs, freq, gain): # normalize frequencies freq = [f / (Fs / 2) for f in freq] - if len(x) < 10*Fs: + if len(x) < 60*Fs: # Use direct FFT filtering for short signals - # Make x EVEN Norig = len(x) - if Norig % 2 == 1: - x = np.r_[x, 1] + + if (gain[-1] == 0.0 and Norig % 2 == 1) \ + or (gain[-1] == 1.0 and Norig % 2 != 1): + # Gain at Nyquist freq: 1: make x EVEN, 0: make x ODD + x = np.r_[x, x[-1]] + N = len(x) H = signal.firwin2(N, freq, gain) @@ -158,6 +161,12 @@ def _filter(x, Fs, freq, gain): else: # Use overlap-add filter N = int(10 * Fs) + + if (gain[-1] == 0.0 and N % 2 == 1) \ + or (gain[-1] == 1.0 and N % 2 != 1): + # Gain at Nyquist freq: 1: make N EVEN, 0: make N ODD + N += 1 + H = signal.firwin2(N, freq, gain) xf = _overlap_add_filter(x, H, zero_phase=True) @@ -167,8 +176,8 @@ def _filter(x, Fs, freq, gain): def band_pass_filter(x, Fs, Fp1, Fp2): """Bandpass filter for the signal x. - Applies a zero-phase bandpass filter to the signal x. - + Applies a zero-phase bandpass filter to the signal x. + Parameters ---------- x : 1d array @@ -217,7 +226,7 @@ def band_pass_filter(x, Fs, Fp1, Fp2): def low_pass_filter(x, Fs, Fp): """Lowpass filter for the signal x. - Applies a zero-phase lowpass filter to the signal x. + Applies a zero-phase lowpass filter to the signal x. Parameters ---------- @@ -248,11 +257,11 @@ def low_pass_filter(x, Fs, Fp): """ Fs = float(Fs) Fp = float(Fp) - + Fstop = Fp + 0.5 - + xf = _filter(x, Fs, [0, Fp, Fstop, Fs/2], [1, 1, 0, 0]) - + return xf @@ -288,10 +297,10 @@ def high_pass_filter(x, Fs, Fp): Fp-0.5 Fp """ - + Fs = float(Fs) Fp = float(Fp) - + Fstop = Fp - 0.5 xf = _filter(x, Fs, [0, Fstop, Fp, Fs/2], [0, 0, 1, 1]) diff --git a/mne/tests/test_filter.py b/mne/tests/test_filter.py index 98f5ff1736f..c071bba83da 100644 --- a/mne/tests/test_filter.py +++ b/mne/tests/test_filter.py @@ -4,9 +4,11 @@ from ..filter import band_pass_filter, high_pass_filter, low_pass_filter def test_filters(): - a = np.random.randn(1000) - Fs = 1000 - bp = band_pass_filter(a, Fs, 4, 8) - lp = low_pass_filter(a, Fs, 8) - hp = high_pass_filter(lp, Fs, 4) - assert_array_almost_equal(hp, bp, 2) + Fs = 250 + # Test short and long signals (direct FFT and overlap-add FFT filtering) + for sig_len_secs in [10, 90]: + a = np.random.randn(sig_len_secs * Fs) + bp = band_pass_filter(a, Fs, 4, 8) + lp = low_pass_filter(a, Fs, 8) + hp = high_pass_filter(lp, Fs, 4) + assert_array_almost_equal(hp, bp, 2) From 67922cd5b1074d161ff8714a0c0645f65956f6a4 Mon Sep 17 00:00:00 2001 From: Martin Luessi Date: Thu, 10 Nov 2011 15:18:09 -0500 Subject: [PATCH 6/6] ENH: added filter_length parameter, improved test, cleaned up code --- mne/filter.py | 109 ++++++++++++++++++++++----------------- mne/tests/test_filter.py | 33 +++++++++--- 2 files changed, 86 insertions(+), 56 deletions(-) diff --git a/mne/filter.py b/mne/filter.py index d91c5c8e009..1b50fe05596 100644 --- a/mne/filter.py +++ b/mne/filter.py @@ -3,11 +3,11 @@ from scipy.fftpack import fft, ifft -def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): +def _overlap_add_filter(x, h, n_fft=None, zero_phase=True): """ Filter using overlap-add FFTs. Filters the signal x using a filter with the impulse response h. - If zero_phase==True, the amplitude response is scaled and the filter is + If zero_phase==True, the amplitude response is scaled and the filter is applied in forward and backward direction, resulting in a zero-phase filter. @@ -17,9 +17,9 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): Signal to filter h : 1d array Filter impule response (FIR filter coefficients) - N_fft : int + n_fft : int Length of the FFT. If None, the best size is determined automatically. - zero_phase : boolean + zero_phase : bool If True: the filter is applied in forward and backward direction, resulting in a zero-phase filter @@ -28,38 +28,38 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): xf : 1d array x filtered """ - - N_h = len(h) + n_h = len(h) # Extend the signal by mirroring the edges to reduce transient filter # response - N_edge = min(N_h, len(x)) + n_edge = min(n_h, len(x)) - x_ext = np.r_[2*x[0]-x[N_edge-1:0:-1], x, 2*x[-1]-x[-2:-N_edge-1:-1]] + x_ext = np.r_[2 * x[0] - x[n_edge - 1:0:-1],\ + x, 2 * x[-1] - x[-2:-n_edge - 1:-1]] - N_x = len(x_ext) + n_x = len(x_ext) # Determine FFT length to use - if N_fft == None: - if N_x > N_h: - N_tot = 2*N_x if zero_phase else N_x - - N = 2**np.arange(np.ceil(np.log2(N_h)), - np.floor(np.log2(N_tot))) - cost = np.ceil(N_tot / (N - N_h + 1)) * N * (np.log2(N) + 1) - N_fft = N[np.argmin(cost)] + if n_fft == None: + if n_x > n_h: + n_tot = 2 * n_x if zero_phase else n_x + + N = 2 ** np.arange(np.ceil(np.log2(n_h)), + np.floor(np.log2(n_tot))) + cost = np.ceil(n_tot / (N - n_h + 1)) * N * (np.log2(N) + 1) + n_fft = N[np.argmin(cost)] else: # Use only a single block - N_fft = 2**np.ceil(np.log2(N_x + N_h - 1)) + n_fft = 2 ** np.ceil(np.log2(n_x + n_h - 1)) - if N_fft <= 0: + if n_fft <= 0: raise ValueError('N_fft is too short, has to be at least len(h)') # Filter in frequency domain - h_fft = fft(np.r_[h, np.zeros(N_fft - N_h)]) + h_fft = fft(np.r_[h, np.zeros(n_fft - n_h)]) if zero_phase: - # We will apply the filter in forward and backward direction: Scale + # We will apply the filter in forward and backward direction: Scale # frequency response of the filter so that the shape of the amplitude # response stays the same when it is applied twice @@ -70,10 +70,10 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): x_filtered = np.zeros_like(x_ext) # Segment length for signal x - N_seg = N_fft - N_h + 1 + n_seg = n_fft - n_h + 1 # Number of segements (including fractional segments) - num_segments = int(np.ceil(N_x / float(N_seg))) + n_segments = int(np.ceil(n_x / float(n_seg))) filter_input = x_ext @@ -84,20 +84,20 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): filter_input = np.flipud(x_filtered) x_filtered = np.zeros_like(x_ext) - for seg_ind in range(num_segments): - seg = filter_input[seg_ind*N_seg:(seg_ind+1)*N_seg] - seg_fft = fft(np.r_[seg, np.zeros(N_fft - len(seg))]) + for seg_idx in range(n_segments): + seg = filter_input[seg_idx * n_seg:(seg_idx + 1) * n_seg] + seg_fft = fft(np.r_[seg, np.zeros(n_fft - len(seg))]) - if seg_ind*N_seg+N_fft < N_x: - x_filtered[seg_ind*N_seg:seg_ind*N_seg+N_fft] \ + if seg_idx * n_seg + n_fft < n_x: + x_filtered[seg_idx * n_seg:seg_idx * n_seg + n_fft]\ += np.real(ifft(h_fft * seg_fft)) else: # Last segment - x_filtered[seg_ind*N_seg:] \ - += np.real(ifft(h_fft * seg_fft))[:N_x-seg_ind*N_seg] + x_filtered[seg_idx * n_seg:] \ + += np.real(ifft(h_fft * seg_fft))[:n_x - seg_idx * n_seg] # Remove mirrored edges that we added - x_filtered = x_filtered[N_edge-1:-N_edge+1] + x_filtered = x_filtered[n_edge - 1:-n_edge + 1] if zero_phase: # flip signal back @@ -106,16 +106,12 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): return x_filtered -def _filter(x, Fs, freq, gain): +def _filter(x, Fs, freq, gain, filter_length=None): """ Filter signal using gain control points in the frequency domain. The filter impulse response is constructed from a Hamming window (window used in "firwin2" function) to avoid ripples in the frequency reponse - (windowing is a smoothing in frequency domain). - - If the signal is shorter than 1 minute, it is filtered directly using - FFTs (zero-phase). If the signal is longer, zero-phase overlap-add - filtering is used. + (windowing is a smoothing in frequency domain). The filter is zero-phase. Parameters ---------- @@ -127,6 +123,10 @@ def _filter(x, Fs, freq, gain): Frequency sampling points in Hz gain : 1d array Filter gain at frequency sampling points + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). Returns ------- @@ -138,7 +138,7 @@ def _filter(x, Fs, freq, gain): # normalize frequencies freq = [f / (Fs / 2) for f in freq] - if len(x) < 60*Fs: + if filter_length == None or len(x) <= filter_length: # Use direct FFT filtering for short signals Norig = len(x) @@ -159,8 +159,8 @@ def _filter(x, Fs, freq, gain): xf = xf[:Norig] x = x[:Norig] else: - # Use overlap-add filter - N = int(10 * Fs) + # Use overlap-add filter with a fixed length + N = filter_length if (gain[-1] == 0.0 and N % 2 == 1) \ or (gain[-1] == 1.0 and N % 2 != 1): @@ -173,7 +173,7 @@ def _filter(x, Fs, freq, gain): return xf -def band_pass_filter(x, Fs, Fp1, Fp2): +def band_pass_filter(x, Fs, Fp1, Fp2, filter_length=None): """Bandpass filter for the signal x. Applies a zero-phase bandpass filter to the signal x. @@ -188,6 +188,10 @@ def band_pass_filter(x, Fs, Fp1, Fp2): low cut-off frequency Fp2 : float high cut-off frequency + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). Returns ------- @@ -210,7 +214,7 @@ def band_pass_filter(x, Fs, Fp1, Fp2): Fs1 = Fp1 - 0.5 in Hz Fs2 = Fp2 + 0.5 in Hz """ - Fs = float(Fs) + Fs = float(Fs) Fp1 = float(Fp1) Fp2 = float(Fp2) @@ -218,12 +222,13 @@ def band_pass_filter(x, Fs, Fp1, Fp2): Fs1 = Fp1 - 0.5 Fs2 = Fp2 + 0.5 - xf = _filter(x, Fs, [0, Fs1, Fp1, Fp2, Fs2, Fs/2], [0, 0, 1, 1, 0, 0]) + xf = _filter(x, Fs, [0, Fs1, Fp1, Fp2, Fs2, Fs / 2], [0, 0, 1, 1, 0, 0], + filter_length) return xf -def low_pass_filter(x, Fs, Fp): +def low_pass_filter(x, Fs, Fp, filter_length=None): """Lowpass filter for the signal x. Applies a zero-phase lowpass filter to the signal x. @@ -236,6 +241,10 @@ def low_pass_filter(x, Fs, Fp): sampling rate Fp : float cut-off frequency + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). Returns ------- @@ -260,12 +269,12 @@ def low_pass_filter(x, Fs, Fp): Fstop = Fp + 0.5 - xf = _filter(x, Fs, [0, Fp, Fstop, Fs/2], [1, 1, 0, 0]) + xf = _filter(x, Fs, [0, Fp, Fstop, Fs / 2], [1, 1, 0, 0], filter_length) return xf -def high_pass_filter(x, Fs, Fp): +def high_pass_filter(x, Fs, Fp, filter_length=None): """Highpass filter for the signal x. Applies a zero-phase highpass filter to the signal x. @@ -278,6 +287,10 @@ def high_pass_filter(x, Fs, Fp): sampling rate Fp : float cut-off frequency + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). Returns ------- @@ -303,6 +316,6 @@ def high_pass_filter(x, Fs, Fp): Fstop = Fp - 0.5 - xf = _filter(x, Fs, [0, Fstop, Fp, Fs/2], [0, 0, 1, 1]) + xf = _filter(x, Fs, [0, Fstop, Fp, Fs / 2], [0, 0, 1, 1], filter_length) - return xf + return xf \ No newline at end of file diff --git a/mne/tests/test_filter.py b/mne/tests/test_filter.py index c071bba83da..83d5bab4578 100644 --- a/mne/tests/test_filter.py +++ b/mne/tests/test_filter.py @@ -3,12 +3,29 @@ from ..filter import band_pass_filter, high_pass_filter, low_pass_filter + def test_filters(): - Fs = 250 - # Test short and long signals (direct FFT and overlap-add FFT filtering) - for sig_len_secs in [10, 90]: - a = np.random.randn(sig_len_secs * Fs) - bp = band_pass_filter(a, Fs, 4, 8) - lp = low_pass_filter(a, Fs, 8) - hp = high_pass_filter(lp, Fs, 4) - assert_array_almost_equal(hp, bp, 2) + Fs = 500 + sig_len_secs = 60 + + # Filtering of short signals (filter length = len(a)) + a = np.random.randn(sig_len_secs * Fs) + bp = band_pass_filter(a, Fs, 4, 8) + lp = low_pass_filter(a, Fs, 8) + hp = high_pass_filter(lp, Fs, 4) + assert_array_almost_equal(hp, bp, 2) + + # Overlap-add filtering with a fixed filter length + filter_length = 8192 + bp_oa = band_pass_filter(a, Fs, 4, 8, filter_length) + lp_oa = low_pass_filter(a, Fs, 8, filter_length) + hp_oa = high_pass_filter(lp_oa, Fs, 4, filter_length) + assert_array_almost_equal(hp_oa, bp_oa, 2) + + # The two methods should give the same result + # As filtering for short signals uses a circular convolution (FFT) and + # the overlap-add filter implements a linear convolution, the signal + # boundary will be slightly different and we ignore it + n_edge_ignore = 1000 + assert_array_almost_equal(hp[n_edge_ignore:-n_edge_ignore], + hp_oa[n_edge_ignore:-n_edge_ignore], 2) \ No newline at end of file