Skip to content

Commit

Permalink
Merge pull request #706 from dstl/sif
Browse files Browse the repository at this point in the history
Add the Sliding Innovation Filter
  • Loading branch information
sdhiscocks authored Aug 31, 2022
2 parents f828abc + 2c6d03f commit 5276c1b
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/source/stonesoup.updater.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ AlphaBeta
.. automodule:: stonesoup.updater.alphabeta
:show-inheritance:

Sliding Innovation Filter
-------------------------

.. automodule:: stonesoup.updater.slidinginnovation
:show-inheritance:

Categorical
-----------

Expand Down
62 changes: 62 additions & 0 deletions stonesoup/updater/slidinginnovation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import numpy as np

from ..base import Property
from ..types.array import CovarianceMatrix
from .kalman import KalmanUpdater, ExtendedKalmanUpdater


class SlidingInnovationUpdater(KalmanUpdater):
r"""Sliding Innovation Filter Updater
The Sliding Innovation Filter (SIF) is a sub-optimal filter (in comparison to Kalman filter)
which uses a switching gain to provide robustness to estimation problems that may be
ill-conditioned or contain modeling uncertainties or disturbances.
The main difference from Kalman filter is the calculation of the gain:
.. math::
K_k = H_k^+ \overline{sat}(|\mathbf{z}_{k|k-1}|/\mathbf{\delta})
where :math:`\mathbf{\delta}` is the sliding boundary layer width.
References
----------
1. S. A. Gadsden and M. Al-Shabi, "The Sliding Innovation Filter," in IEEE Access, vol. 8,
pp. 96129-96138, 2020, doi: 10.1109/ACCESS.2020.2995345.
"""
layer_width: np.ndarray = Property(
doc="Sliding boundary layer width :math:`\\mathbf{\\delta}`. A tunable parameter in "
"measurement space. An example initial value provided in original paper is "
":math:`10 \\times \\text{diag}(R)`")

def _posterior_covariance(self, hypothesis):
measurement_model = self._check_measurement_model(hypothesis.measurement.measurement_model)
measurement_matrix = self._measurement_matrix(hypothesis.prediction, measurement_model)

layer_width = self.layer_width.reshape((-1, 1)) # Must be column vector

innovation_vector = hypothesis.measurement.state_vector \
- hypothesis.measurement_prediction.state_vector
gain = np.linalg.pinv(measurement_matrix) \
@ np.diag(np.clip(np.abs(innovation_vector)/layer_width, -1, 1).ravel())

I_KH = np.identity(hypothesis.prediction.ndim) - gain@measurement_matrix
posterior_covariance = \
I_KH@hypothesis.prediction.covar@I_KH.T + gain@measurement_model.covar()@gain.T

return posterior_covariance.view(CovarianceMatrix), gain


class ExtendedSlidingInnovationUpdater(SlidingInnovationUpdater, ExtendedKalmanUpdater):
"""Extended Sliding Innovation Filter Updater
This is the Extended version of the :class:`~.SlidingInnovationUpdater` for non-linear
measurement models.
References
----------
1. S. A. Gadsden and M. Al-Shabi, "The Sliding Innovation Filter," in IEEE Access, vol. 8,
pp. 96129-96138, 2020, doi: 10.1109/ACCESS.2020.2995345.
"""
pass
103 changes: 103 additions & 0 deletions stonesoup/updater/tests/test_sif.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import pytest
import numpy as np

from stonesoup.models.measurement.linear import LinearGaussian
from stonesoup.types.detection import Detection
from stonesoup.types.hypothesis import SingleHypothesis
from stonesoup.types.prediction import (
GaussianStatePrediction, GaussianMeasurementPrediction)
from stonesoup.types.state import GaussianState
from stonesoup.updater.slidinginnovation import (
SlidingInnovationUpdater, ExtendedSlidingInnovationUpdater)


@pytest.mark.parametrize(
"UpdaterClass, measurement_model, prediction, measurement, layer_width",
[
( # Standard Kalman
SlidingInnovationUpdater,
LinearGaussian(ndim_state=2, mapping=[0],
noise_covar=np.array([[0.04]])),
GaussianStatePrediction(np.array([[-6.45], [0.7]]),
np.array([[4.1123, 0.0013],
[0.0013, 0.0365]])),
Detection(np.array([[-6.23]])),
10*np.array([0.04]) # 10 x diag(R)
),
( # Extended Kalman
ExtendedSlidingInnovationUpdater,
LinearGaussian(ndim_state=2, mapping=[0],
noise_covar=np.array([[0.04]])),
GaussianStatePrediction(np.array([[-6.45], [0.7]]),
np.array([[4.1123, 0.0013],
[0.0013, 0.0365]])),
Detection(np.array([[-6.23]])),
10 * np.array([0.04]) # 10 x diag(R)
),
],
ids=["standard", "extended"]
)
def test_sif(UpdaterClass, measurement_model, prediction, measurement, layer_width):

# Calculate evaluation variables
eval_measurement_prediction = GaussianMeasurementPrediction(
measurement_model.matrix() @ prediction.mean,
measurement_model.matrix() @ prediction.covar
@ measurement_model.matrix().T
+ measurement_model.covar(),
cross_covar=prediction.covar @ measurement_model.matrix().T)
innovation_vector = measurement.state_vector - eval_measurement_prediction.state_vector
kalman_gain = np.linalg.pinv(measurement_model.matrix()) \
@ np.diag(np.clip(np.abs(innovation_vector) / layer_width, -1, 1).ravel())
I_KH = np.identity(prediction.ndim) - kalman_gain @ measurement_model.matrix()
posterior_covariance = \
I_KH @ prediction.covar @ I_KH.T + kalman_gain @ measurement_model.covar() @ kalman_gain.T
eval_posterior = GaussianState(
prediction.mean + kalman_gain@(measurement.state_vector-eval_measurement_prediction.mean),
posterior_covariance)

# Initialise a updater
updater = UpdaterClass(measurement_model=measurement_model, layer_width=layer_width)

# Get and assert measurement prediction
measurement_prediction = updater.predict_measurement(prediction)
assert(np.allclose(measurement_prediction.mean,
eval_measurement_prediction.mean,
0, atol=1.e-14))
assert(np.allclose(measurement_prediction.covar,
eval_measurement_prediction.covar,
0, atol=1.e-14))
assert(np.allclose(measurement_prediction.cross_covar,
eval_measurement_prediction.cross_covar,
0, atol=1.e-14))

# Perform and assert state update (without measurement prediction)
posterior = updater.update(SingleHypothesis(
prediction=prediction,
measurement=measurement))
assert(np.allclose(posterior.mean, eval_posterior.mean, 0, atol=1.e-14))
assert(np.allclose(posterior.covar, eval_posterior.covar, 0, atol=1.e-14))
assert(np.array_equal(posterior.hypothesis.prediction, prediction))
assert (np.allclose(
posterior.hypothesis.measurement_prediction.state_vector,
measurement_prediction.state_vector, 0, atol=1.e-14))
assert (np.allclose(posterior.hypothesis.measurement_prediction.covar,
measurement_prediction.covar, 0, atol=1.e-14))
assert(np.array_equal(posterior.hypothesis.measurement, measurement))
assert(posterior.timestamp == prediction.timestamp)

# Perform and assert state update
posterior = updater.update(SingleHypothesis(
prediction=prediction,
measurement=measurement,
measurement_prediction=measurement_prediction))
assert(np.allclose(posterior.mean, eval_posterior.mean, 0, atol=1.e-14))
assert(np.allclose(posterior.covar, eval_posterior.covar, 0, atol=1.e-14))
assert(np.array_equal(posterior.hypothesis.prediction, prediction))
assert (np.allclose(
posterior.hypothesis.measurement_prediction.state_vector,
measurement_prediction.state_vector, 0, atol=1.e-14))
assert (np.allclose(posterior.hypothesis.measurement_prediction.covar,
measurement_prediction.covar, 0, atol=1.e-14))
assert(np.array_equal(posterior.hypothesis.measurement, measurement))
assert(posterior.timestamp == prediction.timestamp)

0 comments on commit 5276c1b

Please sign in to comment.