-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: Overlap-add filtering for long signals #10
Changes from all commits
ccb9c17
e3a2b8a
5e2766f
02baee5
4deab5f
67922cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,13 +3,180 @@ | |
from scipy.fftpack import fft, ifft | ||
|
||
|
||
def band_pass_filter(x, Fs, Fp1, Fp2): | ||
def _overlap_add_filter(x, h, n_fft=None, zero_phase=True): | ||
""" Filter using overlap-add FFTs. | ||
|
||
Filters the signal x using a filter with the impulse response h. | ||
If zero_phase==True, the amplitude response is scaled and the filter is | ||
applied in forward and backward direction, resulting in a zero-phase | ||
filter. | ||
|
||
Parameters | ||
---------- | ||
x : 1d array | ||
Signal to filter | ||
h : 1d array | ||
Filter impule response (FIR filter coefficients) | ||
n_fft : int | ||
Length of the FFT. If None, the best size is determined automatically. | ||
zero_phase : bool | ||
If True: the filter is applied in forward and backward direction, | ||
resulting in a zero-phase filter | ||
|
||
Returns | ||
------- | ||
xf : 1d array | ||
x filtered | ||
""" | ||
n_h = len(h) | ||
|
||
# Extend the signal by mirroring the edges to reduce transient filter | ||
# response | ||
n_edge = min(n_h, len(x)) | ||
|
||
x_ext = np.r_[2 * x[0] - x[n_edge - 1:0:-1],\ | ||
x, 2 * x[-1] - x[-2:-n_edge - 1:-1]] | ||
|
||
n_x = len(x_ext) | ||
|
||
# Determine FFT length to use | ||
if n_fft == None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "n_fft is None" is more standard |
||
if n_x > n_h: | ||
n_tot = 2 * n_x if zero_phase else n_x | ||
|
||
N = 2 ** np.arange(np.ceil(np.log2(n_h)), | ||
np.floor(np.log2(n_tot))) | ||
cost = np.ceil(n_tot / (N - n_h + 1)) * N * (np.log2(N) + 1) | ||
n_fft = N[np.argmin(cost)] | ||
else: | ||
# Use only a single block | ||
n_fft = 2 ** np.ceil(np.log2(n_x + n_h - 1)) | ||
|
||
if n_fft <= 0: | ||
raise ValueError('N_fft is too short, has to be at least len(h)') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe raise a warning also when n_fft is not a power of 2? |
||
|
||
# Filter in frequency domain | ||
h_fft = fft(np.r_[h, np.zeros(n_fft - n_h)]) | ||
|
||
if zero_phase: | ||
# We will apply the filter in forward and backward direction: Scale | ||
# frequency response of the filter so that the shape of the amplitude | ||
# response stays the same when it is applied twice | ||
|
||
# be careful not to divide by too small numbers | ||
idx = np.where(np.abs(h_fft) > 1e-6) | ||
h_fft[idx] = h_fft[idx] / np.sqrt(np.abs(h_fft[idx])) | ||
|
||
x_filtered = np.zeros_like(x_ext) | ||
|
||
# Segment length for signal x | ||
n_seg = n_fft - n_h + 1 | ||
|
||
# Number of segements (including fractional segments) | ||
n_segments = int(np.ceil(n_x / float(n_seg))) | ||
|
||
filter_input = x_ext | ||
|
||
for pass_no in range(2) if zero_phase else range(1): | ||
|
||
if pass_no == 1: | ||
# second pass: flip signal | ||
filter_input = np.flipud(x_filtered) | ||
x_filtered = np.zeros_like(x_ext) | ||
|
||
for seg_idx in range(n_segments): | ||
seg = filter_input[seg_idx * n_seg:(seg_idx + 1) * n_seg] | ||
seg_fft = fft(np.r_[seg, np.zeros(n_fft - len(seg))]) | ||
|
||
if seg_idx * n_seg + n_fft < n_x: | ||
x_filtered[seg_idx * n_seg:seg_idx * n_seg + n_fft]\ | ||
+= np.real(ifft(h_fft * seg_fft)) | ||
else: | ||
# Last segment | ||
x_filtered[seg_idx * n_seg:] \ | ||
+= np.real(ifft(h_fft * seg_fft))[:n_x - seg_idx * n_seg] | ||
|
||
# Remove mirrored edges that we added | ||
x_filtered = x_filtered[n_edge - 1:-n_edge + 1] | ||
|
||
if zero_phase: | ||
# flip signal back | ||
x_filtered = np.flipud(x_filtered) | ||
|
||
return x_filtered | ||
|
||
|
||
def _filter(x, Fs, freq, gain, filter_length=None): | ||
""" Filter signal using gain control points in the frequency domain. | ||
|
||
The filter impulse response is constructed from a Hamming window (window | ||
used in "firwin2" function) to avoid ripples in the frequency reponse | ||
(windowing is a smoothing in frequency domain). The filter is zero-phase. | ||
|
||
Parameters | ||
---------- | ||
x : 1d array | ||
Signal to filter | ||
Fs : float | ||
Sampling rate in Hz | ||
freq : 1d array | ||
Frequency sampling points in Hz | ||
gain : 1d array | ||
Filter gain at frequency sampling points | ||
filter_length : int (default: None) | ||
Length of the filter to use. If None or "len(x) < filter_length", the | ||
filter length used is len(x). Otherwise, overlap-add filtering with a | ||
filter of the specified length is used (faster for long signals). | ||
|
||
Returns | ||
------- | ||
xf : 1d array | ||
x filtered | ||
""" | ||
assert x.ndim == 1 | ||
|
||
# normalize frequencies | ||
freq = [f / (Fs / 2) for f in freq] | ||
|
||
if filter_length == None or len(x) <= filter_length: | ||
# Use direct FFT filtering for short signals | ||
|
||
Norig = len(x) | ||
|
||
if (gain[-1] == 0.0 and Norig % 2 == 1) \ | ||
or (gain[-1] == 1.0 and Norig % 2 != 1): | ||
# Gain at Nyquist freq: 1: make x EVEN, 0: make x ODD | ||
x = np.r_[x, x[-1]] | ||
|
||
N = len(x) | ||
|
||
H = signal.firwin2(N, freq, gain) | ||
|
||
# Make zero-phase filter function | ||
B = np.abs(fft(H)) | ||
|
||
xf = np.real(ifft(fft(x) * B)) | ||
xf = xf[:Norig] | ||
x = x[:Norig] | ||
else: | ||
# Use overlap-add filter with a fixed length | ||
N = filter_length | ||
|
||
if (gain[-1] == 0.0 and N % 2 == 1) \ | ||
or (gain[-1] == 1.0 and N % 2 != 1): | ||
# Gain at Nyquist freq: 1: make N EVEN, 0: make N ODD | ||
N += 1 | ||
|
||
H = signal.firwin2(N, freq, gain) | ||
xf = _overlap_add_filter(x, H, zero_phase=True) | ||
|
||
return xf | ||
|
||
|
||
def band_pass_filter(x, Fs, Fp1, Fp2, filter_length=None): | ||
"""Bandpass filter for the signal x. | ||
|
||
An acausal fft algorithm is applied (i.e. no phase shift). The filter | ||
functions is constructed from a Hamming window (window used in "firwin2" | ||
function) to avoid ripples in the frequency reponse (windowing is a | ||
smoothing in frequency domain) | ||
Applies a zero-phase bandpass filter to the signal x. | ||
|
||
Parameters | ||
---------- | ||
|
@@ -21,6 +188,10 @@ def band_pass_filter(x, Fs, Fp1, Fp2): | |
low cut-off frequency | ||
Fp2 : float | ||
high cut-off frequency | ||
filter_length : int (default: None) | ||
Length of the filter to use. If None or "len(x) < filter_length", the | ||
filter length used is len(x). Otherwise, overlap-add filtering with a | ||
filter of the specified length is used (faster for long signals). | ||
|
||
Returns | ||
------- | ||
|
@@ -43,48 +214,24 @@ def band_pass_filter(x, Fs, Fp1, Fp2): | |
Fs1 = Fp1 - 0.5 in Hz | ||
Fs2 = Fp2 + 0.5 in Hz | ||
""" | ||
Fs = float(Fs) | ||
Fp1 = float(Fp1) | ||
Fp2 = float(Fp2) | ||
|
||
# Default values in Hz | ||
Fs1 = Fp1 - 0.5 | ||
Fs2 = Fp2 + 0.5 | ||
|
||
assert x.ndim == 1 | ||
|
||
# Make x EVEN | ||
Norig = len(x) | ||
if Norig % 2 == 1: | ||
x = np.r_[x, 1] | ||
|
||
# Normalize frequencies | ||
Ns1 = Fs1 / (Fs / 2) | ||
Ns2 = Fs2 / (Fs / 2) | ||
Np1 = Fp1 / (Fs / 2) | ||
Np2 = Fp2 / (Fs / 2) | ||
|
||
# Construct the filter function H(f) | ||
N = len(x) | ||
|
||
B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0]) | ||
|
||
# Make zero-phase filter function | ||
H = np.abs(fft(B)) | ||
|
||
xf = np.real(ifft(fft(x) * H)) | ||
xf = xf[:Norig] | ||
x = x[:Norig] | ||
xf = _filter(x, Fs, [0, Fs1, Fp1, Fp2, Fs2, Fs / 2], [0, 0, 1, 1, 0, 0], | ||
filter_length) | ||
|
||
return xf | ||
|
||
|
||
def low_pass_filter(x, Fs, Fp): | ||
def low_pass_filter(x, Fs, Fp, filter_length=None): | ||
"""Lowpass filter for the signal x. | ||
|
||
An acausal fft algorithm is applied (i.e. no phase shift). The filter | ||
functions is constructed from a Hamming window (window used in "firwin2" | ||
function) to avoid ripples in the frequency reponse (windowing is a | ||
smoothing in frequency domain) | ||
Applies a zero-phase lowpass filter to the signal x. | ||
|
||
Parameters | ||
---------- | ||
|
@@ -94,6 +241,10 @@ def low_pass_filter(x, Fs, Fp): | |
sampling rate | ||
Fp : float | ||
cut-off frequency | ||
filter_length : int (default: None) | ||
Length of the filter to use. If None or "len(x) < filter_length", the | ||
filter length used is len(x). Otherwise, overlap-add filtering with a | ||
filter of the specified length is used (faster for long signals). | ||
|
||
Returns | ||
------- | ||
|
@@ -113,41 +264,20 @@ def low_pass_filter(x, Fs, Fp): | |
Fp Fp+0.5 | ||
|
||
""" | ||
Fs = float(Fs) | ||
Fp = float(Fp) | ||
|
||
assert x.ndim == 1 | ||
|
||
# Make x EVEN | ||
Norig = len(x) | ||
if Norig % 2 == 1: | ||
x = np.r_[x, 1] | ||
|
||
# Normalize frequencies | ||
Ns = (Fp + 0.5) / (Fs / 2) | ||
Np = Fp / (Fs / 2) | ||
|
||
# Construct the filter function H(f) | ||
N = len(x) | ||
Fstop = Fp + 0.5 | ||
|
||
B = signal.firwin2(N, [0, Np, Ns, 1], [1, 1, 0, 0]) | ||
|
||
# Make zero-phase filter function | ||
H = np.abs(fft(B)) | ||
|
||
xf = np.real(ifft(fft(x) * H)) | ||
xf = xf[:Norig] | ||
x = x[:Norig] | ||
xf = _filter(x, Fs, [0, Fp, Fstop, Fs / 2], [1, 1, 0, 0], filter_length) | ||
|
||
return xf | ||
|
||
|
||
def high_pass_filter(x, Fs, Fp): | ||
def high_pass_filter(x, Fs, Fp, filter_length=None): | ||
"""Highpass filter for the signal x. | ||
|
||
An acausal fft algorithm is applied (i.e. no phase shift). The filter | ||
functions is constructed from a Hamming window (window used in "firwin2" | ||
function) to avoid ripples in the frequency reponse (windowing is a | ||
smoothing in frequency domain) | ||
Applies a zero-phase highpass filter to the signal x. | ||
|
||
Parameters | ||
---------- | ||
|
@@ -157,6 +287,10 @@ def high_pass_filter(x, Fs, Fp): | |
sampling rate | ||
Fp : float | ||
cut-off frequency | ||
filter_length : int (default: None) | ||
Length of the filter to use. If None or "len(x) < filter_length", the | ||
filter length used is len(x). Otherwise, overlap-add filtering with a | ||
filter of the specified length is used (faster for long signals). | ||
|
||
Returns | ||
------- | ||
|
@@ -176,29 +310,12 @@ def high_pass_filter(x, Fs, Fp): | |
Fp-0.5 Fp | ||
|
||
""" | ||
Fp = float(Fp) | ||
|
||
assert x.ndim == 1 | ||
|
||
# Make x ODD | ||
Norig = len(x) | ||
if Norig % 2 == 0: | ||
x = np.r_[x, 1] | ||
|
||
# Normalize frequencies | ||
Ns = (Fp - 0.5) / (Fs / 2) | ||
Np = Fp / (Fs / 2) | ||
|
||
# Construct the filter function H(f) | ||
N = len(x) | ||
|
||
B = signal.firwin2(N, [0, Ns, Np, 1], [0, 0, 1, 1]) | ||
Fs = float(Fs) | ||
Fp = float(Fp) | ||
|
||
# Make zero-phase filter function | ||
H = np.abs(fft(B)) | ||
Fstop = Fp - 0.5 | ||
|
||
xf = np.real(ifft(fft(x) * H)) | ||
xf = xf[:Norig] | ||
x = x[:Norig] | ||
xf = _filter(x, Fs, [0, Fstop, Fp, Fs / 2], [0, 0, 1, 1], filter_length) | ||
|
||
return xf | ||
return xf | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. newline |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,10 +3,29 @@ | |
|
||
from ..filter import band_pass_filter, high_pass_filter, low_pass_filter | ||
|
||
|
||
def test_filters(): | ||
a = np.random.randn(1000) | ||
Fs = 1000 | ||
Fs = 500 | ||
sig_len_secs = 60 | ||
|
||
# Filtering of short signals (filter length = len(a)) | ||
a = np.random.randn(sig_len_secs * Fs) | ||
bp = band_pass_filter(a, Fs, 4, 8) | ||
lp = low_pass_filter(a, Fs, 8) | ||
hp = high_pass_filter(lp, Fs, 4) | ||
assert_array_almost_equal(hp, bp, 2) | ||
|
||
# Overlap-add filtering with a fixed filter length | ||
filter_length = 8192 | ||
bp_oa = band_pass_filter(a, Fs, 4, 8, filter_length) | ||
lp_oa = low_pass_filter(a, Fs, 8, filter_length) | ||
hp_oa = high_pass_filter(lp_oa, Fs, 4, filter_length) | ||
assert_array_almost_equal(hp_oa, bp_oa, 2) | ||
|
||
# The two methods should give the same result | ||
# As filtering for short signals uses a circular convolution (FFT) and | ||
# the overlap-add filter implements a linear convolution, the signal | ||
# boundary will be slightly different and we ignore it | ||
n_edge_ignore = 1000 | ||
assert_array_almost_equal(hp[n_edge_ignore:-n_edge_ignore], | ||
hp_oa[n_edge_ignore:-n_edge_ignore], 2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. new line missing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't need the \ in parenthesis