Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Overlap-add filtering for long signals #10

Merged
merged 6 commits into from
Nov 12, 2011
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 199 additions & 82 deletions mne/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,180 @@
from scipy.fftpack import fft, ifft


def band_pass_filter(x, Fs, Fp1, Fp2):
def _overlap_add_filter(x, h, n_fft=None, zero_phase=True):
""" Filter using overlap-add FFTs.

Filters the signal x using a filter with the impulse response h.
If zero_phase==True, the amplitude response is scaled and the filter is
applied in forward and backward direction, resulting in a zero-phase
filter.

Parameters
----------
x : 1d array
Signal to filter
h : 1d array
Filter impule response (FIR filter coefficients)
n_fft : int
Length of the FFT. If None, the best size is determined automatically.
zero_phase : bool
If True: the filter is applied in forward and backward direction,
resulting in a zero-phase filter

Returns
-------
xf : 1d array
x filtered
"""
n_h = len(h)

# Extend the signal by mirroring the edges to reduce transient filter
# response
n_edge = min(n_h, len(x))

x_ext = np.r_[2 * x[0] - x[n_edge - 1:0:-1],\
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need the \ in parenthesis

x, 2 * x[-1] - x[-2:-n_edge - 1:-1]]

n_x = len(x_ext)

# Determine FFT length to use
if n_fft == None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"n_fft is None" is more standard

if n_x > n_h:
n_tot = 2 * n_x if zero_phase else n_x

N = 2 ** np.arange(np.ceil(np.log2(n_h)),
np.floor(np.log2(n_tot)))
cost = np.ceil(n_tot / (N - n_h + 1)) * N * (np.log2(N) + 1)
n_fft = N[np.argmin(cost)]
else:
# Use only a single block
n_fft = 2 ** np.ceil(np.log2(n_x + n_h - 1))

if n_fft <= 0:
raise ValueError('N_fft is too short, has to be at least len(h)')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe raise a warning also when n_fft is not a power of 2?


# Filter in frequency domain
h_fft = fft(np.r_[h, np.zeros(n_fft - n_h)])

if zero_phase:
# We will apply the filter in forward and backward direction: Scale
# frequency response of the filter so that the shape of the amplitude
# response stays the same when it is applied twice

# be careful not to divide by too small numbers
idx = np.where(np.abs(h_fft) > 1e-6)
h_fft[idx] = h_fft[idx] / np.sqrt(np.abs(h_fft[idx]))

x_filtered = np.zeros_like(x_ext)

# Segment length for signal x
n_seg = n_fft - n_h + 1

# Number of segements (including fractional segments)
n_segments = int(np.ceil(n_x / float(n_seg)))

filter_input = x_ext

for pass_no in range(2) if zero_phase else range(1):

if pass_no == 1:
# second pass: flip signal
filter_input = np.flipud(x_filtered)
x_filtered = np.zeros_like(x_ext)

for seg_idx in range(n_segments):
seg = filter_input[seg_idx * n_seg:(seg_idx + 1) * n_seg]
seg_fft = fft(np.r_[seg, np.zeros(n_fft - len(seg))])

if seg_idx * n_seg + n_fft < n_x:
x_filtered[seg_idx * n_seg:seg_idx * n_seg + n_fft]\
+= np.real(ifft(h_fft * seg_fft))
else:
# Last segment
x_filtered[seg_idx * n_seg:] \
+= np.real(ifft(h_fft * seg_fft))[:n_x - seg_idx * n_seg]

# Remove mirrored edges that we added
x_filtered = x_filtered[n_edge - 1:-n_edge + 1]

if zero_phase:
# flip signal back
x_filtered = np.flipud(x_filtered)

return x_filtered


def _filter(x, Fs, freq, gain, filter_length=None):
""" Filter signal using gain control points in the frequency domain.

The filter impulse response is constructed from a Hamming window (window
used in "firwin2" function) to avoid ripples in the frequency reponse
(windowing is a smoothing in frequency domain). The filter is zero-phase.

Parameters
----------
x : 1d array
Signal to filter
Fs : float
Sampling rate in Hz
freq : 1d array
Frequency sampling points in Hz
gain : 1d array
Filter gain at frequency sampling points
filter_length : int (default: None)
Length of the filter to use. If None or "len(x) < filter_length", the
filter length used is len(x). Otherwise, overlap-add filtering with a
filter of the specified length is used (faster for long signals).

Returns
-------
xf : 1d array
x filtered
"""
assert x.ndim == 1

# normalize frequencies
freq = [f / (Fs / 2) for f in freq]

if filter_length == None or len(x) <= filter_length:
# Use direct FFT filtering for short signals

Norig = len(x)

if (gain[-1] == 0.0 and Norig % 2 == 1) \
or (gain[-1] == 1.0 and Norig % 2 != 1):
# Gain at Nyquist freq: 1: make x EVEN, 0: make x ODD
x = np.r_[x, x[-1]]

N = len(x)

H = signal.firwin2(N, freq, gain)

# Make zero-phase filter function
B = np.abs(fft(H))

xf = np.real(ifft(fft(x) * B))
xf = xf[:Norig]
x = x[:Norig]
else:
# Use overlap-add filter with a fixed length
N = filter_length

if (gain[-1] == 0.0 and N % 2 == 1) \
or (gain[-1] == 1.0 and N % 2 != 1):
# Gain at Nyquist freq: 1: make N EVEN, 0: make N ODD
N += 1

H = signal.firwin2(N, freq, gain)
xf = _overlap_add_filter(x, H, zero_phase=True)

return xf


def band_pass_filter(x, Fs, Fp1, Fp2, filter_length=None):
"""Bandpass filter for the signal x.

An acausal fft algorithm is applied (i.e. no phase shift). The filter
functions is constructed from a Hamming window (window used in "firwin2"
function) to avoid ripples in the frequency reponse (windowing is a
smoothing in frequency domain)
Applies a zero-phase bandpass filter to the signal x.

Parameters
----------
Expand All @@ -21,6 +188,10 @@ def band_pass_filter(x, Fs, Fp1, Fp2):
low cut-off frequency
Fp2 : float
high cut-off frequency
filter_length : int (default: None)
Length of the filter to use. If None or "len(x) < filter_length", the
filter length used is len(x). Otherwise, overlap-add filtering with a
filter of the specified length is used (faster for long signals).

Returns
-------
Expand All @@ -43,48 +214,24 @@ def band_pass_filter(x, Fs, Fp1, Fp2):
Fs1 = Fp1 - 0.5 in Hz
Fs2 = Fp2 + 0.5 in Hz
"""
Fs = float(Fs)
Fp1 = float(Fp1)
Fp2 = float(Fp2)

# Default values in Hz
Fs1 = Fp1 - 0.5
Fs2 = Fp2 + 0.5

assert x.ndim == 1

# Make x EVEN
Norig = len(x)
if Norig % 2 == 1:
x = np.r_[x, 1]

# Normalize frequencies
Ns1 = Fs1 / (Fs / 2)
Ns2 = Fs2 / (Fs / 2)
Np1 = Fp1 / (Fs / 2)
Np2 = Fp2 / (Fs / 2)

# Construct the filter function H(f)
N = len(x)

B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0])

# Make zero-phase filter function
H = np.abs(fft(B))

xf = np.real(ifft(fft(x) * H))
xf = xf[:Norig]
x = x[:Norig]
xf = _filter(x, Fs, [0, Fs1, Fp1, Fp2, Fs2, Fs / 2], [0, 0, 1, 1, 0, 0],
filter_length)

return xf


def low_pass_filter(x, Fs, Fp):
def low_pass_filter(x, Fs, Fp, filter_length=None):
"""Lowpass filter for the signal x.

An acausal fft algorithm is applied (i.e. no phase shift). The filter
functions is constructed from a Hamming window (window used in "firwin2"
function) to avoid ripples in the frequency reponse (windowing is a
smoothing in frequency domain)
Applies a zero-phase lowpass filter to the signal x.

Parameters
----------
Expand All @@ -94,6 +241,10 @@ def low_pass_filter(x, Fs, Fp):
sampling rate
Fp : float
cut-off frequency
filter_length : int (default: None)
Length of the filter to use. If None or "len(x) < filter_length", the
filter length used is len(x). Otherwise, overlap-add filtering with a
filter of the specified length is used (faster for long signals).

Returns
-------
Expand All @@ -113,41 +264,20 @@ def low_pass_filter(x, Fs, Fp):
Fp Fp+0.5

"""
Fs = float(Fs)
Fp = float(Fp)

assert x.ndim == 1

# Make x EVEN
Norig = len(x)
if Norig % 2 == 1:
x = np.r_[x, 1]

# Normalize frequencies
Ns = (Fp + 0.5) / (Fs / 2)
Np = Fp / (Fs / 2)

# Construct the filter function H(f)
N = len(x)
Fstop = Fp + 0.5

B = signal.firwin2(N, [0, Np, Ns, 1], [1, 1, 0, 0])

# Make zero-phase filter function
H = np.abs(fft(B))

xf = np.real(ifft(fft(x) * H))
xf = xf[:Norig]
x = x[:Norig]
xf = _filter(x, Fs, [0, Fp, Fstop, Fs / 2], [1, 1, 0, 0], filter_length)

return xf


def high_pass_filter(x, Fs, Fp):
def high_pass_filter(x, Fs, Fp, filter_length=None):
"""Highpass filter for the signal x.

An acausal fft algorithm is applied (i.e. no phase shift). The filter
functions is constructed from a Hamming window (window used in "firwin2"
function) to avoid ripples in the frequency reponse (windowing is a
smoothing in frequency domain)
Applies a zero-phase highpass filter to the signal x.

Parameters
----------
Expand All @@ -157,6 +287,10 @@ def high_pass_filter(x, Fs, Fp):
sampling rate
Fp : float
cut-off frequency
filter_length : int (default: None)
Length of the filter to use. If None or "len(x) < filter_length", the
filter length used is len(x). Otherwise, overlap-add filtering with a
filter of the specified length is used (faster for long signals).

Returns
-------
Expand All @@ -176,29 +310,12 @@ def high_pass_filter(x, Fs, Fp):
Fp-0.5 Fp

"""
Fp = float(Fp)

assert x.ndim == 1

# Make x ODD
Norig = len(x)
if Norig % 2 == 0:
x = np.r_[x, 1]

# Normalize frequencies
Ns = (Fp - 0.5) / (Fs / 2)
Np = Fp / (Fs / 2)

# Construct the filter function H(f)
N = len(x)

B = signal.firwin2(N, [0, Ns, Np, 1], [0, 0, 1, 1])
Fs = float(Fs)
Fp = float(Fp)

# Make zero-phase filter function
H = np.abs(fft(B))
Fstop = Fp - 0.5

xf = np.real(ifft(fft(x) * H))
xf = xf[:Norig]
x = x[:Norig]
xf = _filter(x, Fs, [0, Fstop, Fp, Fs / 2], [0, 0, 1, 1], filter_length)

return xf
return xf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newline

23 changes: 21 additions & 2 deletions mne/tests/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,29 @@

from ..filter import band_pass_filter, high_pass_filter, low_pass_filter


def test_filters():
a = np.random.randn(1000)
Fs = 1000
Fs = 500
sig_len_secs = 60

# Filtering of short signals (filter length = len(a))
a = np.random.randn(sig_len_secs * Fs)
bp = band_pass_filter(a, Fs, 4, 8)
lp = low_pass_filter(a, Fs, 8)
hp = high_pass_filter(lp, Fs, 4)
assert_array_almost_equal(hp, bp, 2)

# Overlap-add filtering with a fixed filter length
filter_length = 8192
bp_oa = band_pass_filter(a, Fs, 4, 8, filter_length)
lp_oa = low_pass_filter(a, Fs, 8, filter_length)
hp_oa = high_pass_filter(lp_oa, Fs, 4, filter_length)
assert_array_almost_equal(hp_oa, bp_oa, 2)

# The two methods should give the same result
# As filtering for short signals uses a circular convolution (FFT) and
# the overlap-add filter implements a linear convolution, the signal
# boundary will be slightly different and we ignore it
n_edge_ignore = 1000
assert_array_almost_equal(hp[n_edge_ignore:-n_edge_ignore],
hp_oa[n_edge_ignore:-n_edge_ignore], 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line missing