diff --git a/docs/examples/EnsembleFilterExample.py b/docs/examples/EnsembleFilterExample.py new file mode 100644 index 000000000..9ee49a8e3 --- /dev/null +++ b/docs/examples/EnsembleFilterExample.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python +# coding: utf-8 + +""" +======================= +Ensemble Filter Example +======================= +""" + +# %% +# The Ensemble Kalman Filter (EnKF) is a hybrid of the Kalman updating scheme and the Monte Carlo +# approach of the particle filter. The EnKF provides an alternative to the Kalman Filter (and +# extensions of) which is specifically designed for high-dimensional states. +# +# EnKF can be applied to both non-linear and non-Gaussian state-spaces due to being completely +# based on sampling. +# +# Multiple versions of EnKF are implemented in Stone Soup - all make use of the same prediction +# step, but implement different versions of the update step. Namely, the updaters are: +# +# - :class:`~.EnsembleUpdater` +# - :class:`~.LinearisedEnsembleUpdater` +# - :class:`~.RecursiveLinearisedEnsembleUpdater` +# - :class:`~.RecursiveUpdater` +# +# The :class:`~.EnsembleUpdater` is deliberately structured to resemble the Vanilla Kalman Filter, +# :meth:`update` first calls :meth:`predict_measurement` function which +# proceeds by calculating the predicted measurement, innovation covariance +# and measurement cross-covariance. Note however, these are not propagated +# explicitly, they are derived from the sample covariance of the ensemble itself. +# +# The :class:`~.LinearisedEnsembleUpdater` is an implementation of 'The Linearized EnKF Update' +# algorithm from "Ensemble Kalman Filter with Bayesian Recursive Update" by Kristen Michaelson, +# Andrey A. Popov and Renato Zanetti. Similar to the EnsembleUpdater, but uses a different form +# of Kalman gain. This alternative form of the EnKF calculates a separate kalman gain for each +# ensemble member. This alternative Kalman gain calculation involves linearization of the +# measurement. An additional step is introduced to perform inflation. +# +# The :class:`~.RecursiveLinearisedEnsembleUpdater` is an implementation of 'The Bayesian +# Recursive Update Linearized EnKF' algorithm from "Ensemble Kalman Filter with Bayesian +# Recursive Update" by Kristen Michaelson, Andrey A. Popov and Renato Zanetti. It is an +# extended version of the LinearisedEnsembleUpdater that recursively iterates over the +# update step for a given number of iterations. This recursive version is designed to +# minimise the error caused by linearisation. +# +# The :class:`~.RecursiveEnsembleUpdater` is an extended version of the +# :class:`~.EnsembleUpdater` which recursively iterates over the update step. + +# %% +# Example using stonesoup +# ----------------------- +# Package imports +# ^^^^^^^^^^^^^^^ + + +import numpy as np +from datetime import datetime, timedelta + + +# %% +# Start time of simulation +# ^^^^^^^^^^^^^^^^^^^^^^^^ + + +start_time = datetime.now() +np.random.seed(1991) + + +# %% +# Generate and plot ground truth +# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + +from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \ + ConstantVelocity +from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState +from stonesoup.plotter import Plotterly + +transition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.05), + ConstantVelocity(0.05)]) +truth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)]) + +for k in range(1, 21): + truth.append(GroundTruthState( + transition_model.function(truth[k - 1], noise=True, time_interval=timedelta(seconds=1)), + timestamp=start_time + timedelta(seconds=k))) + +# %% + +plotter = Plotterly() +plotter.plot_ground_truths(truth, [0, 2]) +plotter.fig + +# %% +# Generate and plot detections +# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + +from stonesoup.models.measurement.nonlinear import CartesianToBearingRange + +sensor_x = 50 # Placing the sensor off-centre +sensor_y = 0 + +measurement_model = CartesianToBearingRange( + ndim_state=4, + mapping=(0, 2), + noise_covar=np.diag([np.radians(0.2), 1]), # Covariance matrix. 0.2 degree variance in + # bearing and 1 metre in range + translation_offset=np.array([[sensor_x], [sensor_y]]) # Offset measurements to location of + # sensor in cartesian. +) + +# %% + +from stonesoup.types.detection import Detection + +measurements = [] +for state in truth: + measurement = measurement_model.function(state, noise=True) + measurements.append(Detection(measurement, timestamp=state.timestamp, + measurement_model=measurement_model)) + +# %% + +plotter.plot_measurements(measurements, [0, 2]) +plotter.fig + +# %% +# Set up predictor and updater +# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +# In this EnKF example we must use the :class:`~.EnsemblePredictor`, and choose to use the +# standard :class:`~.EnsembleUpdater`. Note that we could instanciate any of the other (ensemble) +# updaters mentioned in this example, in place of the :class:`~.EnsembleUpdater`. + + +from stonesoup.predictor.ensemble import EnsemblePredictor +from stonesoup.updater.ensemble import EnsembleUpdater + +predictor = EnsemblePredictor(transition_model) +updater = EnsembleUpdater(measurement_model) + + +# %% +# Prior state +# ^^^^^^^^^^^ +# For the simulation we must provide a prior state. The Ensemble Filter in stonesoup requires +# this to be an :class:`~.EnsembleState`. We generate an :class:`~.EnsembleState` by calling +# :meth:`~.generate_ensemble`. The prior state stores the prior state vector - see below that +# for the :class:`~.EnsembleState`, the `state_vector` must be a :class:`~.StateVectors` object. + + +from stonesoup.types.state import EnsembleState + +ensemble = EnsembleState.generate_ensemble( + mean=np.array([[0], [1], [0], [1]]), + covar=np.diag([[1.5, 0.5, 1.5, 0.5]]), num_vectors=100) +prior = EnsembleState(state_vector=ensemble, timestamp=start_time) + +# %% + +type(prior.state_vector) + + +# %% +# Run the EnKF +# ^^^^^^^^^^^^ +# Here we run the Ensemble Kalman Filter and plot the results. By marking flag `particle=True`, +# we plot each member of the ensembles. As usual, the ellipses represent the uncertainty in the +# tracks. + +from stonesoup.types.hypothesis import SingleHypothesis +from stonesoup.types.track import Track + +track = Track() +for measurement in measurements: + prediction = predictor.predict(prior, timestamp=measurement.timestamp) + hypothesis = SingleHypothesis(prediction, measurement) # Group a prediction and measurement + post = updater.update(hypothesis) + track.append(post) + prior = track[-1] + +plotter.plot_tracks(track, [0, 2], uncertainty=True, particle=True) +plotter.fig diff --git a/docs/source/stonesoup.updater.rst b/docs/source/stonesoup.updater.rst index 2f7e59911..742000a9e 100644 --- a/docs/source/stonesoup.updater.rst +++ b/docs/source/stonesoup.updater.rst @@ -25,6 +25,12 @@ Ensemble .. automodule:: stonesoup.updater.ensemble :show-inheritance: +Recursive +--------- + +.. automodule:: stonesoup.updater.recursive + :show-inheritance: + Information ----------- diff --git a/stonesoup/updater/ensemble.py b/stonesoup/updater/ensemble.py index 8a359671a..9e018baf1 100644 --- a/stonesoup/updater/ensemble.py +++ b/stonesoup/updater/ensemble.py @@ -6,6 +6,7 @@ from .kalman import KalmanUpdater from ..base import Property from ..types.state import State, EnsembleState +from ..types.array import StateVectors from ..types.prediction import MeasurementPrediction from ..types.update import Update from ..models.measurement import MeasurementModel @@ -14,7 +15,7 @@ class EnsembleUpdater(KalmanUpdater): r"""Ensemble Kalman Filter Updater class The EnKF is a hybrid of the Kalman updating scheme and the - Monte Carlo aproach of the the particle filter. + Monte Carlo approach of the particle filter. Deliberately structured to resemble the Vanilla Kalman Filter, :meth:`update` first calls :meth:`predict_measurement` function which @@ -24,7 +25,7 @@ class EnsembleUpdater(KalmanUpdater): itself. Note that the EnKF equations are simpler when written in the following - formalism. Note that h is not neccisarily a matrix, but could be a + formalism. Note that h is not necessarily a matrix, but could be a nonlinear measurement function. .. math:: @@ -113,7 +114,7 @@ def predict_measurement(self, predicted_state, measurement_model=None, Parameters ---------- - pred_state : :class:`~.State` + predicted_state : :class:`~.State` The predicted state :math:`\mathbf{x}_{k|k-1}` measurement_model : :class:`~.MeasurementModel` The measurement model. If omitted, the model in the updater object @@ -293,3 +294,94 @@ def update(self, hypothesis, **kwargs): return Update.from_state(hypothesis.prediction, posterior_ensemble, timestamp=hypothesis.measurement.timestamp, hypothesis=hypothesis) + + +class LinearisedEnsembleUpdater(EnsembleUpdater): + """ + Implementation of 'The Linearized EnKF Update' algorithm from "Ensemble Kalman Filter with + Bayesian Recursive Update" by Kristen Michaelson, Andrey A. Popov and Renato Zanetti. + Similar to the EnsembleUpdater, but uses a different form of Kalman gain. This alternative form + of the EnKF calculates a separate kalman gain for each ensemble member. This alternative + Kalman gain calculation involves linearization of the measurement. An additional step is + introduced to perform inflation. + + References + ---------- + + 1. K. Michaelson, A. A. Popov and R. Zanetti, + "Ensemble Kalman Filter with Bayesian Recursive Update" + """ + inflation_factor: float = Property( + default=1., + doc="Parameter to control inflation") + + def update(self, hypothesis, **kwargs): + r"""The LinearisedEnsembleUpdater update method. This method includes an additional step + over the EnsembleUpdater update step to perform inflation. + + Parameters + ---------- + hypothesis : :class:`~.SingleHypothesis` + the prediction-measurement association hypothesis. This hypothesis + may carry a predicted measurement, or a predicted state. In the + latter case a predicted measurement will be calculated. + + + Returns + ------- + : :class:`~.EnsembleStateUpdate` + The posterior state which contains an ensemble of state vectors + and a timestamp. + """ + # Extract the number of vectors from the prediction + num_vectors = hypothesis.prediction.num_vectors + + # Assign measurement prediction if prediction is missing + hypothesis = self._check_measurement_prediction(hypothesis) + + # Prior state vector + X0 = hypothesis.prediction.state_vector + + # Measurement covariance + R = self.measurement_model.covar() + + # Line 1: Compute mean + m = hypothesis.prediction.mean + + # Line 2: Compute inflation + X = StateVectors(m + self.inflation_factor * (X0 - m)) + + # Line 3: Recompute prior covariance + P = 1/(num_vectors-1) * (X0 - m) @ (X0 - m).T + + states = list() + + # Line 5: Y_hat + Y_hat = hypothesis.measurement_prediction.state_vector + + # Line 4 + for x, y_hat in zip(X, Y_hat): + + # Line 6: Compute Jacobian + H = self.measurement_model.jacobian(State(state_vector=x, + timestamp=hypothesis.prediction.timestamp)) + + # Line 7: Calculate Innovation + S = H @ P @ H.T + R + + # Line 8: Calculate Kalman gain + K = P @ H.T @ scipy.linalg.inv(S) + + # Line 9: Recalculate X + x = x + K @ (hypothesis.measurement.state_vector - y_hat) + + # Collect state vectors + states.append(x) + + # Convert list of state vectors into a StateVectors class + X = StateVectors(np.hstack(states)) + + return Update.from_state(hypothesis.prediction, + X, + timestamp=hypothesis.measurement.timestamp, + hypothesis=hypothesis) diff --git a/stonesoup/updater/recursive.py b/stonesoup/updater/recursive.py new file mode 100644 index 000000000..423639cb3 --- /dev/null +++ b/stonesoup/updater/recursive.py @@ -0,0 +1,351 @@ +import copy +import numpy as np +import scipy + +from .ensemble import EnsembleUpdater +from .kalman import ExtendedKalmanUpdater +from ..base import Property +from ..types.prediction import Prediction, EnsembleStatePrediction +from ..types.state import State +from ..types.update import Update +from ..types.array import CovarianceMatrix, StateVectors + + +class BayesianRecursiveUpdater(ExtendedKalmanUpdater): + """ + Recursive extension of the ExtendedKalmanUpdater. + """ + number_steps: int = Property(doc="Number of recursive steps", + default=1) + use_joseph_cov: bool = Property(doc="Bool dictating the method of covariance calculation. If " + "use_joseph_cov is True then the Joseph form of the " + "covariance equation is used.", + default=False) + + def _innovation_covariance(self, m_cross_cov, meas_mat, meas_mod): + """Compute the innovation covariance + + Parameters + ---------- + m_cross_cov : numpy.ndarray + The measurement cross covariance matrix + meas_mat : numpy.ndarray + Measurement matrix + meas_mod : :class:~.MeasurementModel` + Measurement model + + Returns + ------- + : numpy.ndarray + The innovation covariance + + """ + return meas_mat@m_cross_cov + self.number_steps*meas_mod.covar() + + def _posterior_covariance(self, hypothesis): + """ + Return the posterior covariance for a given hypothesis + + Parameters + ---------- + hypothesis: :class:`~.Hypothesis` + A hypothesised association between state prediction and measurement. It returns the + measurement prediction which in turn contains the measurement cross covariance, + :math:`P_{k|k-1} H_k^T and the innovation covariance, + :math:`S = H_k P_{k|k-1} H_k^T + R` + + Returns + ------- + : :class:`~.CovarianceMatrix` + The posterior covariance matrix rendered via the Kalman update process. + : numpy.ndarray + The Kalman gain, :math:`K = P_{k|k-1} H_k^T S^{-1}` + + """ + if self.use_joseph_cov: + # Identity matrix + id_matrix = np.identity(hypothesis.prediction.ndim) + + # Calculate Kalman gain + kalman_gain = hypothesis.measurement_prediction.cross_covar @ \ + np.linalg.inv(hypothesis.measurement_prediction.covar) + + # Calculate measurement matrix/jacobian matrix + meas_matrix = self._measurement_matrix(hypothesis.prediction) + + # Calculate Prior covariance + prior_covar = hypothesis.prediction.covar + + # Calculate measurement covariance + meas_covar = hypothesis.measurement.measurement_model.covar() + + # Compute posterior covariance matrix + I_KH = id_matrix - kalman_gain @ meas_matrix + post_cov = I_KH @ prior_covar @ I_KH.T \ + + kalman_gain @ (self.number_steps * meas_covar) @ kalman_gain.T + + return post_cov.view(CovarianceMatrix), kalman_gain + + else: + kalman_gain = hypothesis.measurement_prediction.cross_covar @ \ + np.linalg.inv(hypothesis.measurement_prediction.covar) + + post_cov = hypothesis.prediction.covar - kalman_gain @ \ + hypothesis.measurement_prediction.covar @ kalman_gain.T + + return post_cov.view(CovarianceMatrix), kalman_gain + + def update(self, hypothesis, **kwargs): + r"""The Kalman update method. Given a hypothesised association between + a predicted state or predicted measurement and an actual measurement, + calculate the posterior state. + + Parameters + ---------- + hypothesis : :class:`~.SingleHypothesis` + the prediction-measurement association hypothesis. This hypothesis + may carry a predicted measurement, or a predicted state. In the + latter case a predicted measurement will be calculated. + **kwargs : various + These are passed to :meth:`predict_measurement` + + Returns + ------- + : :class:`~.GaussianStateUpdate` + The posterior state Gaussian with mean :math:`\mathbf{x}_{k|k}` and + covariance :math:`P_{x|x}` + + """ + # Get the predicted state out of the hypothesis + predicted_state = hypothesis.prediction + + # Get the measurement model out of the measurement if it's there. + # If not, use the one native to the updater (which might still be + # none) + measurement_model = hypothesis.measurement.measurement_model + measurement_model = self._check_measurement_model(measurement_model) + + # If there is no measurement prediction in the hypothesis then do the + # measurement prediction (and attach it back to the hypothesis). + if hypothesis.measurement_prediction is None: + # Attach the measurement prediction to the hypothesis + hypothesis.measurement_prediction = self.predict_measurement( + predicted_state, measurement_model=measurement_model, **kwargs) + + if not self.number_steps > 0: + raise ValueError("Updater cannot run 0 times (or less). This would not provide an " + "updated state") + + nhypothesis = copy.copy(hypothesis) + for _ in range(self.number_steps): + + nhypothesis.measurement_prediction = self.predict_measurement( + nhypothesis.prediction, measurement_model=measurement_model) + # Kalman gain and posterior covariance + posterior_covariance, kalman_gain = \ + self._posterior_covariance(nhypothesis) + + # Posterior mean + posterior_mean = self._posterior_mean(nhypothesis.prediction, kalman_gain, + nhypothesis.measurement, + nhypothesis.measurement_prediction) + nhypothesis.prediction = Prediction.from_state( + nhypothesis.prediction, state_vector=posterior_mean, covar=posterior_covariance) + + if self.force_symmetric_covariance: + posterior_covariance = \ + (posterior_covariance + posterior_covariance.T)/2 + + return Update.from_state( + hypothesis.prediction, + posterior_mean, posterior_covariance, + timestamp=hypothesis.measurement.timestamp, hypothesis=hypothesis) + + +class RecursiveEnsembleUpdater(ExtendedKalmanUpdater, EnsembleUpdater): + """ + Recursive version of EnsembleUpdater. Uses calculated posterior ensemble as prior ensemble to + recursively update number_steps times. + """ + number_steps: int = Property(doc="Number of recursive steps") + + def update(self, hypothesis, **kwargs): + r"""The BayesianRecursiveEnsembleUpdater update method. The Ensemble Kalman filter + simply uses the Kalman Update scheme + to evolve a set or Ensemble + of state vectors as a group. This ensemble of vectors contains all the + information on the system state. + Uses calculated posterior ensemble as prior ensemble to + recursively update number_steps times. + + Parameters + ---------- + hypothesis : :class:`~.SingleHypothesis` + the prediction-measurement association hypothesis. This hypothesis + may carry a predicted measurement, or a predicted state. In the + latter case a predicted measurement will be calculated. + Returns + ------- + : :class:`~.EnsembleStateUpdate` + The posterior state which contains an ensemble of state vectors + and a timestamp. + """ + # Assigning more readible variable names + hypothesis = self._check_measurement_prediction(hypothesis) + num_vectors = hypothesis.prediction.num_vectors + + nhypothesis = copy.copy(hypothesis) + + if not self.number_steps > 0: + raise ValueError("Updater cannot run 0 times (or less). This would not provide an " + "updated state") + + for _ in range(self.number_steps): + + # Generate an ensemble of measurements based on measurement + measurement_ensemble = nhypothesis.prediction.generate_ensemble( + mean=hypothesis.measurement.state_vector, + covar=self.measurement_model.covar(), + num_vectors=num_vectors) + + # Calculate Kalman Gain according to Dr. Jan Mandel's EnKF formalism. + innovation_ensemble = nhypothesis.prediction.state_vector - nhypothesis.prediction.mean + + meas_innovation = ( + self.measurement_model.function(nhypothesis.prediction, + num_samples=num_vectors) + - self.measurement_model.function(State(nhypothesis.prediction.mean))) + + # Calculate Kalman Gain + kalman_gain = 1 / (num_vectors - 1) * innovation_ensemble @ meas_innovation.T @ \ + scipy.linalg.inv(1 / (num_vectors - 1) * meas_innovation @ meas_innovation.T + + self.measurement_model.covar()) + + # Calculate Posterior Ensemble + posterior_ensemble = ( + nhypothesis.prediction.state_vector + + kalman_gain @ ( + measurement_ensemble - + nhypothesis.measurement_prediction.state_vector)) + + nhypothesis.prediction = EnsembleStatePrediction(posterior_ensemble, + timestamp=nhypothesis.measurement. + timestamp) + + return Update.from_state(hypothesis.prediction, + posterior_ensemble, + timestamp=hypothesis.measurement.timestamp, + hypothesis=hypothesis) + + +class RecursiveLinearisedEnsembleUpdater(ExtendedKalmanUpdater, EnsembleUpdater): + """ + Implementation of 'The Bayesian Recursive Update Linearized EnKF' algorithm from "Ensemble + Kalman Filter with Bayesian Recursive Update" by Kristen Michaelson, Andrey A. Popov and + Renato Zanetti. + Recursive version of the LinearisedEnsembleUpdater that recursively iterates over the update + step for a given number of steps. + + References + ---------- + + 1. K. Michaelson, A. A. Popov and R. Zanetti, + "Ensemble Kalman Filter with Bayesian Recursive Update" + """ + number_steps: int = Property(doc="Number of recursive steps") + inflation_factor: float = Property(default=1., + doc="Parameter to control inflation") + + def update(self, hypothesis, **kwargs): + r"""The RecursiveLinearisedEnsembleUpdater update method. Uses an alternative form of + Kalman gain to calculate a value for each member of the ensemble. Iterates over the update + step recursively to improve upon error caused by linearisation. + + + Parameters + ---------- + hypothesis : :class:`~.SingleHypothesis` + the prediction-measurement association hypothesis. This hypothesis + may carry a predicted measurement, or a predicted state. In the + latter case a predicted measurement will be calculated. + + + Returns + ------- + : :class:`~.EnsembleStateUpdate` + The posterior state which contains an ensemble of state vectors + and a timestamp. + """ + + # Number of steps + N = self.number_steps + + if not self.number_steps > 0: + raise ValueError("Updater cannot run 0 times (or less). This would not provide an " + "updated state") + + # Preserve original hypothesis - use copy instead + nhypothesis = copy.copy(hypothesis) + + # Measurement covariance + R = self.measurement_model.covar() + + # Line 1: Ensemble from prior distribution + X = nhypothesis.prediction.state_vector + + # Line 2: Iterate the update step + for _ in range(N): + + # Line 3: Generate mean of prediction ensemble + m = nhypothesis.prediction.mean + + # Line 4: Compute inflation + X = StateVectors(m + (self.inflation_factor ** (1/N)) * (X - m)) + + # Clear measurement prediction so that it is recalculated + nhypothesis.measurement_prediction = None + + # Update predicted state vector + nhypothesis.prediction.state_vector = X + + # Recalculate measurement prediction + nhypothesis = self._check_measurement_prediction(nhypothesis) + + # Number of vectors + M = hypothesis.prediction.num_vectors + + # Line 5: Compute prior covariance + P = 1/(M-1) * (X - m) @ (X - m).T + + # Line 7: Y_hat (vectorised) + Y_hat = nhypothesis.measurement_prediction.state_vector + + # Line 6 + states = list() + for x, y_hat in zip(X, Y_hat): + + # Line 8: Compute Jacobian + H = self.measurement_model.jacobian( + State(state_vector=x, timestamp=nhypothesis.measurement.timestamp)) + + # Line 9: Compute Innovation + S = H @ P @ H.T + N * R + + # Line 10: Calculate Kalman gain + K = P @ H.T @ scipy.linalg.inv(S) + + # Line 11: Recalculate X + x = x + K @ (hypothesis.measurement.state_vector - y_hat) + + states.append(x) + + X = StateVectors(np.hstack(states)) + + nhypothesis.prediction = EnsembleStatePrediction(X, + timestamp=nhypothesis.measurement. + timestamp) + + return Update.from_state(hypothesis.prediction, + X, + timestamp=hypothesis.measurement.timestamp, + hypothesis=hypothesis) diff --git a/stonesoup/updater/tests/conftest.py b/stonesoup/updater/tests/conftest.py index 8c06d6291..63eb3f5d0 100644 --- a/stonesoup/updater/tests/conftest.py +++ b/stonesoup/updater/tests/conftest.py @@ -1,4 +1,4 @@ -import datetime +from datetime import datetime import numpy as np import pytest @@ -15,17 +15,22 @@ def measurement_model(): @pytest.fixture() -def prediction(): +def timestamp(): + return datetime(2023, 10, 11, 11, 19, 30) + + +@pytest.fixture() +def prediction(timestamp): return TaggedWeightedGaussianStatePrediction( np.array([[-6.45], [0.7]]), np.array([[4.1123, 0.0013], [0.0013, 0.0365]]), weight=1, tag=1, - timestamp=datetime.datetime(2022, 9, 16)) + timestamp=timestamp) @pytest.fixture() -def measurement(): +def measurement(measurement_model, timestamp): return Detection(np.array([[-6.23]]), - timestamp=datetime.datetime(2022, 9, 16)) + timestamp=timestamp, measurement_model=measurement_model) diff --git a/stonesoup/updater/tests/test_ensemble.py b/stonesoup/updater/tests/test_ensemble.py index f505fded2..9f88d349f 100644 --- a/stonesoup/updater/tests/test_ensemble.py +++ b/stonesoup/updater/tests/test_ensemble.py @@ -7,7 +7,8 @@ from stonesoup.types.hypothesis import SingleHypothesis from stonesoup.types.prediction import EnsembleStatePrediction from stonesoup.types.state import EnsembleState -from stonesoup.updater.ensemble import EnsembleUpdater, EnsembleSqrtUpdater +from stonesoup.updater.ensemble import EnsembleUpdater, EnsembleSqrtUpdater, \ + LinearisedEnsembleUpdater def test_ensemble(): @@ -116,3 +117,57 @@ def test_sqrt_ensemble(): updated_state.hypothesis.prediction.num_vectors assert np.allclose(updated_state.sqrt_covar @ updated_state.sqrt_covar.T, updated_state.covar) + + +def test_linearised_ensemble_updater(): + # Initialize variables + measurement_model = LinearGaussian(ndim_state=2, mapping=[0], + noise_covar=np.array([[0.04]])) + timestamp = datetime.datetime(2021, 3, 5, 22, 3, 17) + num_vectors = 100 + + test_ensemble = EnsembleState.generate_ensemble( + np.array([[-6.45], [0.7]]), + np.array([[4.1123, 0.0013], + [0.0013, 0.0365]]), num_vectors) + + # Create Prediction, Measurement, and Updater + prediction = EnsembleStatePrediction(test_ensemble, + timestamp=timestamp) + + measurement = Detection(np.array([[-6.23]]), timestamp) + updater = LinearisedEnsembleUpdater(measurement_model) + + # Construct hypothesis + + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement) + + # Run updater + + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + assert updated_state.num_vectors == \ + updated_state.hypothesis.prediction.num_vectors + assert np.allclose(updated_state.sqrt_covar @ updated_state.sqrt_covar.T, + updated_state.covar) + + # Test updater runs with measurement prediction already in hypothesis. + test_measurement_prediction = updater.predict_measurement(prediction) + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement, + measurement_prediction=test_measurement_prediction) + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + assert updated_state.num_vectors == \ + updated_state.hypothesis.prediction.num_vectors + assert np.allclose(updated_state.sqrt_covar @ updated_state.sqrt_covar.T, + updated_state.covar) diff --git a/stonesoup/updater/tests/test_recursive.py b/stonesoup/updater/tests/test_recursive.py new file mode 100644 index 000000000..fd7f5730f --- /dev/null +++ b/stonesoup/updater/tests/test_recursive.py @@ -0,0 +1,701 @@ +"""Test for updater.recursive module""" +import datetime + +import numpy as np +import pytest + +from stonesoup.models.measurement.linear import LinearGaussian +from stonesoup.types.detection import Detection +from stonesoup.types.hypothesis import SingleHypothesis +from stonesoup.types.prediction import GaussianMeasurementPrediction, EnsembleStatePrediction +from stonesoup.types.state import GaussianState, EnsembleState +from stonesoup.updater.recursive import BayesianRecursiveUpdater, RecursiveEnsembleUpdater, \ + RecursiveLinearisedEnsembleUpdater + + +def test_bruf_single_step(measurement_model, prediction, measurement, timestamp): + + n_steps = 1 + + updater = BayesianRecursiveUpdater(measurement_model=measurement_model, number_steps=n_steps, + use_joseph_cov=False) + + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement) + + # Run updater + + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + # Test updater runs with measurement prediction already in hypothesis. + test_measurement_prediction = updater.predict_measurement(prediction) + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement, + measurement_prediction=test_measurement_prediction) + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + + eval_measurement_prediction = GaussianMeasurementPrediction( + measurement_model.matrix() @ prediction.mean, + measurement_model.matrix() @ prediction.covar + @ measurement_model.matrix().T + + n_steps * measurement_model.covar(), + cross_covar=prediction.covar @ measurement_model.matrix().T) + kalman_gain = eval_measurement_prediction.cross_covar @ np.linalg.inv( + eval_measurement_prediction.covar) + eval_posterior = GaussianState( + prediction.mean + + kalman_gain @ (measurement.state_vector + - eval_measurement_prediction.mean), + prediction.covar + - kalman_gain @ eval_measurement_prediction.covar @ kalman_gain.T) + + # Get and assert measurement prediction + measurement_prediction = updater.predict_measurement(prediction) + assert (np.allclose(measurement_prediction.mean, + eval_measurement_prediction.mean, + 0, atol=1.e-14)) + + assert (np.allclose(measurement_prediction.covar, + eval_measurement_prediction.covar, + 0, atol=1.e-14)) + assert (np.allclose(measurement_prediction.cross_covar, + eval_measurement_prediction.cross_covar, + 0, atol=1.e-14)) + + # Perform and assert state update (without measurement prediction) + posterior = updater.update(SingleHypothesis( + prediction=prediction, + measurement=measurement)) + assert (np.allclose(posterior.mean, eval_posterior.mean, 0, atol=1.e-14)) + assert (np.allclose(posterior.covar, eval_posterior.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.prediction, prediction)) + assert (np.allclose( + posterior.hypothesis.measurement_prediction.state_vector, + measurement_prediction.state_vector, 0, atol=1.e-14)) + assert (np.allclose(posterior.hypothesis.measurement_prediction.covar, + measurement_prediction.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.measurement, measurement)) + assert (posterior.timestamp == prediction.timestamp) + + # Perform and assert state update + posterior = updater.update(SingleHypothesis( + prediction=prediction, + measurement=measurement, + measurement_prediction=measurement_prediction)) + assert (np.allclose(posterior.mean, eval_posterior.mean, 0, atol=1.e-14)) + assert (np.allclose(posterior.covar, eval_posterior.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.prediction, prediction)) + assert (np.allclose( + posterior.hypothesis.measurement_prediction.state_vector, + measurement_prediction.state_vector, 0, atol=1.e-14)) + assert (np.allclose(posterior.hypothesis.measurement_prediction.covar, + measurement_prediction.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.measurement, measurement)) + assert (posterior.timestamp == prediction.timestamp) + + +def test_bruf_multi_step(measurement_model, prediction, measurement, timestamp): + + n_steps = 10 + + updater = BayesianRecursiveUpdater(measurement_model=measurement_model, number_steps=n_steps, + use_joseph_cov=False) + + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement) + + # Run updater + + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + # Test updater runs with measurement prediction already in hypothesis. + test_measurement_prediction = updater.predict_measurement(prediction) + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement, + measurement_prediction=test_measurement_prediction) + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + + eval_measurement_prediction = GaussianMeasurementPrediction( + measurement_model.matrix() @ prediction.mean, + measurement_model.matrix() @ prediction.covar + @ measurement_model.matrix().T + + n_steps * measurement_model.covar(), + cross_covar=prediction.covar @ measurement_model.matrix().T) + + # Get and assert measurement prediction + measurement_prediction = updater.predict_measurement(prediction) + assert (np.allclose(measurement_prediction.mean, + eval_measurement_prediction.mean, + 0, atol=1.e-14)) + + assert (np.allclose(measurement_prediction.covar, + eval_measurement_prediction.covar, + 0, atol=1.e-14)) + assert (np.allclose(measurement_prediction.cross_covar, + eval_measurement_prediction.cross_covar, + 0, atol=1.e-14)) + + +def test_force_symmetric_bruf(measurement_model, prediction, measurement, timestamp): + + n_steps = 1 + + updater = BayesianRecursiveUpdater(measurement_model=measurement_model, number_steps=n_steps, + use_joseph_cov=False, force_symmetric_covariance=True) + + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement) + + # Run updater + + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + # Test updater runs with measurement prediction already in hypothesis. + test_measurement_prediction = updater.predict_measurement(prediction) + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement, + measurement_prediction=test_measurement_prediction) + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + + eval_measurement_prediction = GaussianMeasurementPrediction( + measurement_model.matrix() @ prediction.mean, + measurement_model.matrix() @ prediction.covar + @ measurement_model.matrix().T + + n_steps * measurement_model.covar(), + cross_covar=prediction.covar @ measurement_model.matrix().T) + kalman_gain = eval_measurement_prediction.cross_covar @ np.linalg.inv( + eval_measurement_prediction.covar) + eval_posterior = GaussianState( + prediction.mean + + kalman_gain @ (measurement.state_vector + - eval_measurement_prediction.mean), + prediction.covar + - kalman_gain @ eval_measurement_prediction.covar @ kalman_gain.T) + + # Get and assert measurement prediction + measurement_prediction = updater.predict_measurement(prediction) + assert (np.allclose(measurement_prediction.mean, + eval_measurement_prediction.mean, + 0, atol=1.e-14)) + + assert (np.allclose(measurement_prediction.covar, + eval_measurement_prediction.covar, + 0, atol=1.e-14)) + assert (np.allclose(measurement_prediction.cross_covar, + eval_measurement_prediction.cross_covar, + 0, atol=1.e-14)) + + # Perform and assert state update (without measurement prediction) + posterior = updater.update(SingleHypothesis( + prediction=prediction, + measurement=measurement)) + assert (np.allclose(posterior.mean, eval_posterior.mean, 0, atol=1.e-14)) + assert (np.allclose(posterior.covar, eval_posterior.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.prediction, prediction)) + assert (np.allclose( + posterior.hypothesis.measurement_prediction.state_vector, + measurement_prediction.state_vector, 0, atol=1.e-14)) + assert (np.allclose(posterior.hypothesis.measurement_prediction.covar, + measurement_prediction.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.measurement, measurement)) + assert (posterior.timestamp == prediction.timestamp) + + # Perform and assert state update + posterior = updater.update(SingleHypothesis( + prediction=prediction, + measurement=measurement, + measurement_prediction=measurement_prediction)) + assert (np.allclose(posterior.mean, eval_posterior.mean, 0, atol=1.e-14)) + assert (np.allclose(posterior.covar, eval_posterior.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.prediction, prediction)) + assert (np.allclose( + posterior.hypothesis.measurement_prediction.state_vector, + measurement_prediction.state_vector, 0, atol=1.e-14)) + assert (np.allclose(posterior.hypothesis.measurement_prediction.covar, + measurement_prediction.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.measurement, measurement)) + assert (posterior.timestamp == prediction.timestamp) + + +def test_bruf_errors(measurement_model, prediction, measurement): + + # Initialise a kalman updater + updater = BayesianRecursiveUpdater(measurement_model=measurement_model, + number_steps=0, + force_symmetric_covariance=True) + + # Run updater + with pytest.raises(ValueError): + _ = updater.update(SingleHypothesis(prediction=prediction, measurement=measurement)) + + +def test_jcru_single_step(measurement_model, prediction, measurement, timestamp): + + n_steps = 1 + + updater = BayesianRecursiveUpdater(measurement_model=measurement_model, number_steps=n_steps, + use_joseph_cov=True) + + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement) + + # Run updater + + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + # Test updater runs with measurement prediction already in hypothesis. + test_measurement_prediction = updater.predict_measurement(prediction) + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement, + measurement_prediction=test_measurement_prediction) + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + + eval_measurement_prediction = GaussianMeasurementPrediction( + measurement_model.matrix() @ prediction.mean, + measurement_model.matrix() @ prediction.covar + @ measurement_model.matrix().T + + n_steps * measurement_model.covar(), + cross_covar=prediction.covar @ measurement_model.matrix().T) + kalman_gain = eval_measurement_prediction.cross_covar @ np.linalg.inv( + eval_measurement_prediction.covar) + eval_posterior = GaussianState( + prediction.mean + + kalman_gain @ (measurement.state_vector + - eval_measurement_prediction.mean), + prediction.covar + - kalman_gain @ eval_measurement_prediction.covar @ kalman_gain.T) + + # Get and assert measurement prediction + measurement_prediction = updater.predict_measurement(prediction) + assert (np.allclose(measurement_prediction.mean, + eval_measurement_prediction.mean, + 0, atol=1.e-14)) + + assert (np.allclose(measurement_prediction.covar, + eval_measurement_prediction.covar, + 0, atol=1.e-14)) + assert (np.allclose(measurement_prediction.cross_covar, + eval_measurement_prediction.cross_covar, + 0, atol=1.e-14)) + + # Perform and assert state update (without measurement prediction) + posterior = updater.update(SingleHypothesis( + prediction=prediction, + measurement=measurement)) + assert (np.allclose(posterior.mean, eval_posterior.mean, 0, atol=1.e-14)) + assert (np.allclose(posterior.covar, eval_posterior.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.prediction, prediction)) + assert (np.allclose( + posterior.hypothesis.measurement_prediction.state_vector, + measurement_prediction.state_vector, 0, atol=1.e-14)) + assert (np.allclose(posterior.hypothesis.measurement_prediction.covar, + measurement_prediction.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.measurement, measurement)) + assert (posterior.timestamp == prediction.timestamp) + + # Perform and assert state update + posterior = updater.update(SingleHypothesis( + prediction=prediction, + measurement=measurement, + measurement_prediction=measurement_prediction)) + assert (np.allclose(posterior.mean, eval_posterior.mean, 0, atol=1.e-14)) + assert (np.allclose(posterior.covar, eval_posterior.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.prediction, prediction)) + assert (np.allclose( + posterior.hypothesis.measurement_prediction.state_vector, + measurement_prediction.state_vector, 0, atol=1.e-14)) + assert (np.allclose(posterior.hypothesis.measurement_prediction.covar, + measurement_prediction.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.measurement, measurement)) + assert (posterior.timestamp == prediction.timestamp) + + +def test_jcru_multi_step(measurement_model, prediction, measurement, timestamp): + + n_steps = 10 + + updater = BayesianRecursiveUpdater(measurement_model=measurement_model, number_steps=n_steps, + use_joseph_cov=True) + + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement) + + # Run updater + + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + # Test updater runs with measurement prediction already in hypothesis. + test_measurement_prediction = updater.predict_measurement(prediction) + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement, + measurement_prediction=test_measurement_prediction) + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + + eval_measurement_prediction = GaussianMeasurementPrediction( + measurement_model.matrix() @ prediction.mean, + measurement_model.matrix() @ prediction.covar + @ measurement_model.matrix().T + + n_steps * measurement_model.covar(), + cross_covar=prediction.covar @ measurement_model.matrix().T) + + # Get and assert measurement prediction + measurement_prediction = updater.predict_measurement(prediction) + assert (np.allclose(measurement_prediction.mean, + eval_measurement_prediction.mean, + 0, atol=1.e-14)) + + assert (np.allclose(measurement_prediction.covar, + eval_measurement_prediction.covar, + 0, atol=1.e-14)) + assert (np.allclose(measurement_prediction.cross_covar, + eval_measurement_prediction.cross_covar, + 0, atol=1.e-14)) + + +def test_force_symmetric_jcru(measurement_model, prediction, measurement, timestamp): + + n_steps = 1 + + updater = BayesianRecursiveUpdater(measurement_model=measurement_model, number_steps=n_steps, + use_joseph_cov=True, force_symmetric_covariance=True) + + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement) + + # Run updater + + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + # Test updater runs with measurement prediction already in hypothesis. + test_measurement_prediction = updater.predict_measurement(prediction) + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement, + measurement_prediction=test_measurement_prediction) + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + + eval_measurement_prediction = GaussianMeasurementPrediction( + measurement_model.matrix() @ prediction.mean, + measurement_model.matrix() @ prediction.covar + @ measurement_model.matrix().T + + n_steps * measurement_model.covar(), + cross_covar=prediction.covar @ measurement_model.matrix().T) + kalman_gain = eval_measurement_prediction.cross_covar @ np.linalg.inv( + eval_measurement_prediction.covar) + eval_posterior = GaussianState( + prediction.mean + + kalman_gain @ (measurement.state_vector + - eval_measurement_prediction.mean), + prediction.covar + - kalman_gain @ eval_measurement_prediction.covar @ kalman_gain.T) + + # Get and assert measurement prediction + measurement_prediction = updater.predict_measurement(prediction) + assert (np.allclose(measurement_prediction.mean, + eval_measurement_prediction.mean, + 0, atol=1.e-14)) + + assert (np.allclose(measurement_prediction.covar, + eval_measurement_prediction.covar, + 0, atol=1.e-14)) + assert (np.allclose(measurement_prediction.cross_covar, + eval_measurement_prediction.cross_covar, + 0, atol=1.e-14)) + + # Perform and assert state update (without measurement prediction) + posterior = updater.update(SingleHypothesis( + prediction=prediction, + measurement=measurement)) + assert (np.allclose(posterior.mean, eval_posterior.mean, 0, atol=1.e-14)) + assert (np.allclose(posterior.covar, eval_posterior.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.prediction, prediction)) + assert (np.allclose( + posterior.hypothesis.measurement_prediction.state_vector, + measurement_prediction.state_vector, 0, atol=1.e-14)) + assert (np.allclose(posterior.hypothesis.measurement_prediction.covar, + measurement_prediction.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.measurement, measurement)) + assert (posterior.timestamp == prediction.timestamp) + + # Perform and assert state update + posterior = updater.update(SingleHypothesis( + prediction=prediction, + measurement=measurement, + measurement_prediction=measurement_prediction)) + assert (np.allclose(posterior.mean, eval_posterior.mean, 0, atol=1.e-14)) + assert (np.allclose(posterior.covar, eval_posterior.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.prediction, prediction)) + assert (np.allclose( + posterior.hypothesis.measurement_prediction.state_vector, + measurement_prediction.state_vector, 0, atol=1.e-14)) + assert (np.allclose(posterior.hypothesis.measurement_prediction.covar, + measurement_prediction.covar, 0, atol=1.e-14)) + assert (np.array_equal(posterior.hypothesis.measurement, measurement)) + assert (posterior.timestamp == prediction.timestamp) + + +def test_jcru_errors(measurement_model, prediction, measurement): + + # Initialise a kalman updater + updater = BayesianRecursiveUpdater(measurement_model=measurement_model, + number_steps=0, use_joseph_cov=True) + + # Run updater + with pytest.raises(ValueError): + _ = updater.update(SingleHypothesis(prediction=prediction, measurement=measurement)) + + +def test_recursive_linearised_ensemble_updater(): + # Initialize variables + measurement_model = LinearGaussian(ndim_state=2, mapping=[0], + noise_covar=np.array([[0.04]])) + timestamp = datetime.datetime(2021, 3, 5, 22, 3, 17) + num_vectors = 100 + + test_ensemble = EnsembleState.generate_ensemble( + np.array([[-6.45], [0.7]]), + np.array([[4.1123, 0.0013], + [0.0013, 0.0365]]), num_vectors) + + # Create Prediction, Measurement, and Updater + prediction = EnsembleStatePrediction(test_ensemble, + timestamp=timestamp) + + measurement = Detection(np.array([[-6.23]]), timestamp, measurement_model=measurement_model) + updater = RecursiveLinearisedEnsembleUpdater(measurement_model=measurement_model, + number_steps=5) + + # Construct hypothesis + + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement) + + # Run updater + + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + assert updated_state.num_vectors == \ + updated_state.hypothesis.prediction.num_vectors + assert np.allclose(updated_state.sqrt_covar @ updated_state.sqrt_covar.T, + updated_state.covar) + + # Test updater runs with measurement prediction already in hypothesis. + test_measurement_prediction = updater.predict_measurement(prediction) + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement, + measurement_prediction=test_measurement_prediction) + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + assert updated_state.num_vectors == \ + updated_state.hypothesis.prediction.num_vectors + assert np.allclose(updated_state.sqrt_covar @ updated_state.sqrt_covar.T, + updated_state.covar) + + +def test_recursive_linearised_ensemble_errors(measurement_model, prediction, measurement): + + # Initialise a kalman updater + updater = RecursiveLinearisedEnsembleUpdater(measurement_model=measurement_model, + number_steps=0, + force_symmetric_covariance=True) + + # Run updater + with pytest.raises(ValueError): + _ = updater.update(SingleHypothesis(prediction=prediction, measurement=measurement)) + + +def test_recursive_ensemble_multi_step(): + + # Initialize variables + measurement_model = LinearGaussian(ndim_state=2, mapping=[0], + noise_covar=np.array([[0.04]])) + timestamp = datetime.datetime(2021, 3, 5, 22, 3, 17) + num_vectors = 100 + + test_ensemble = EnsembleState.generate_ensemble( + np.array([[-6.45], [0.7]]), + np.array([[4.1123, 0.0013], + [0.0013, 0.0365]]), num_vectors) + + # Create Prediction, Measurement, and Updater + prediction = EnsembleStatePrediction(test_ensemble, + timestamp=timestamp) + measurement = Detection(np.array([[-6.23]]), timestamp, measurement_model=measurement_model) + updater = RecursiveEnsembleUpdater(measurement_model=measurement_model, number_steps=5) + + # Construct hypothesis + + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement) + + # Run updater + + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + assert updated_state.num_vectors == \ + updated_state.hypothesis.prediction.num_vectors + assert np.allclose(updated_state.sqrt_covar @ updated_state.sqrt_covar.T, + updated_state.covar) + + # Test updater runs with measurement prediction already in hypothesis. + test_measurement_prediction = updater.predict_measurement(prediction) + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement, + measurement_prediction=test_measurement_prediction) + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + assert updated_state.num_vectors == \ + updated_state.hypothesis.prediction.num_vectors + assert np.allclose(updated_state.sqrt_covar @ updated_state.sqrt_covar.T, + updated_state.covar) + + +def test_recursive_ensemble_single_step(): + + # Initialize variables + measurement_model = LinearGaussian(ndim_state=2, mapping=[0], + noise_covar=np.array([[0.04]])) + timestamp = datetime.datetime(2021, 3, 5, 22, 3, 17) + num_vectors = 100 + + test_ensemble = EnsembleState.generate_ensemble( + np.array([[-6.45], [0.7]]), + np.array([[4.1123, 0.0013], + [0.0013, 0.0365]]), num_vectors) + + # Create Prediction, Measurement, and Updater + prediction = EnsembleStatePrediction(test_ensemble, + timestamp=timestamp) + measurement = Detection(np.array([[-6.23]]), timestamp, measurement_model=measurement_model) + updater = RecursiveEnsembleUpdater(measurement_model=measurement_model, number_steps=1) + + # Construct hypothesis + + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement) + + # Run updater + + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + assert updated_state.num_vectors == \ + updated_state.hypothesis.prediction.num_vectors + assert np.allclose(updated_state.sqrt_covar @ updated_state.sqrt_covar.T, + updated_state.covar) + + # Test updater runs with measurement prediction already in hypothesis. + test_measurement_prediction = updater.predict_measurement(prediction) + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement, + measurement_prediction=test_measurement_prediction) + updated_state = updater.update(hypothesis) + + assert updated_state.timestamp == timestamp + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + assert updated_state.ndim == updated_state.hypothesis.prediction.ndim + assert updated_state.num_vectors == \ + updated_state.hypothesis.prediction.num_vectors + assert np.allclose(updated_state.sqrt_covar @ updated_state.sqrt_covar.T, + updated_state.covar) + + +def test_recursive_ensemble_errors(): + # Initialize variables + measurement_model = LinearGaussian(ndim_state=2, mapping=[0], + noise_covar=np.array([[0.04]])) + timestamp = datetime.datetime(2021, 3, 5, 22, 3, 17) + num_vectors = 100 + + test_ensemble = EnsembleState.generate_ensemble( + np.array([[-6.45], [0.7]]), + np.array([[4.1123, 0.0013], + [0.0013, 0.0365]]), num_vectors) + + # Create Prediction, Measurement, and Updater + prediction = EnsembleStatePrediction(test_ensemble, + timestamp=timestamp) + measurement = Detection(np.array([[-6.23]]), timestamp, measurement_model=measurement_model) + updater = RecursiveEnsembleUpdater(measurement_model=measurement_model, number_steps=0) + + # Construct hypothesis + + hypothesis = SingleHypothesis(prediction=prediction, + measurement=measurement) + + # Run updater + with pytest.raises(ValueError): + _ = updater.update(hypothesis)