From b86a0d05ba91bdbe7efc3e83ced79c0c03a0dffa Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Thu, 12 May 2022 11:04:37 +0100 Subject: [PATCH 1/3] Add Nonparametric PDA and gate out non valid measurements This adds the ability to use nonparametric PDA for clutter spatial density, where it is estimated based on assumption that measurements within a defined validation region represent the clutter level. This change also gates out non valid measurements by default, similar to the behaviour in the DistanceHypothesier, and per description in original PDA paper. --- stonesoup/dataassociator/tests/conftest.py | 3 +- stonesoup/gater/tests/test_distance.py | 3 +- stonesoup/hypothesiser/probability.py | 81 ++++++++++++++++--- .../hypothesiser/tests/test_composite.py | 4 +- .../hypothesiser/tests/test_probability.py | 63 ++++++++++++--- 5 files changed, 128 insertions(+), 26 deletions(-) diff --git a/stonesoup/dataassociator/tests/conftest.py b/stonesoup/dataassociator/tests/conftest.py index 2695881ae..fb928659a 100644 --- a/stonesoup/dataassociator/tests/conftest.py +++ b/stonesoup/dataassociator/tests/conftest.py @@ -33,7 +33,8 @@ def updater(measurement_model): def probability_hypothesiser(predictor, updater): return PDAHypothesiser(predictor, updater, clutter_spatial_density=1.2e-2, - prob_detect=0.9, prob_gate=0.99) + prob_detect=0.9, prob_gate=0.99, + include_all=True) @pytest.fixture() diff --git a/stonesoup/gater/tests/test_distance.py b/stonesoup/gater/tests/test_distance.py index a38ff19dc..b30f5d3df 100644 --- a/stonesoup/gater/tests/test_distance.py +++ b/stonesoup/gater/tests/test_distance.py @@ -41,7 +41,8 @@ def test_distance(predictor, updater, detections, gate_threshold, num_gated): timestamp = datetime.datetime.now() - hypothesiser = PDAHypothesiser(predictor, updater, clutter_spatial_density=0.000001) + hypothesiser = PDAHypothesiser( + predictor, updater, clutter_spatial_density=0.000001, include_all=True) gater = DistanceGater(hypothesiser, measure=measure, gate_threshold=gate_threshold) track = Track([GaussianStateUpdate( diff --git a/stonesoup/hypothesiser/probability.py b/stonesoup/hypothesiser/probability.py index 0e1c41ef5..bd585c644 100644 --- a/stonesoup/hypothesiser/probability.py +++ b/stonesoup/hypothesiser/probability.py @@ -1,4 +1,9 @@ -from scipy.stats import multivariate_normal as mn +from functools import lru_cache + +from scipy.stats import multivariate_normal, chi2 +from scipy.linalg import det +from scipy.special import gamma +import numpy as np from .base import Hypothesiser from ..base import Property @@ -21,7 +26,10 @@ class PDAHypothesiser(Hypothesiser): predictor: Predictor = Property(doc="Predict tracks to detection times") updater: Updater = Property(doc="Updater used to get measurement prediction") clutter_spatial_density: float = Property( - doc="Spatial density of clutter - tied to probability of false detection") + default=None, + doc="Spatial density of clutter - tied to probability of false detection. Default is None " + "where the clutter spatial density is calculated based on assumption that " + "all but one measurement within the validation region of the track are clutter.") prob_detect: Probability = Property( default=Probability(0.85), doc="Target Detection Probability") @@ -29,6 +37,16 @@ class PDAHypothesiser(Hypothesiser): default=Probability(0.95), doc="Gate Probability - prob. gate contains true measurement " "if detected") + include_all: bool = Property( + default=False, + doc="If `True`, hypotheses outside probability gates will be returned. This requires " + "that the clutter spatial density is also provided, as it may not be possible to" + "estimate this. Default `False`") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if self.include_all and self.clutter_spatial_density is None: + raise ValueError("Must provide clutter spatial density if including all hypotheses") def hypothesise(self, track, detections, timestamp, **kwargs): r"""Evaluate and return all track association hypotheses. @@ -104,6 +122,7 @@ def hypothesise(self, track, detections, timestamp, **kwargs): """ hypotheses = list() + validated_measurements = 0 # Common state & measurement prediction prediction = self.predictor.predict(track, timestamp=timestamp, **kwargs) @@ -118,6 +137,7 @@ def hypothesise(self, track, detections, timestamp, **kwargs): # True detection hypotheses for detection in detections: + valid_measurement = False # Re-evaluate prediction prediction = self.predictor.predict( track, timestamp=detection.timestamp, **kwargs) @@ -126,18 +146,57 @@ def hypothesise(self, track, detections, timestamp, **kwargs): prediction, detection.measurement_model, **kwargs) # Calculate difference before to handle custom types (mean defaults to zero) # This is required as log pdf coverts arrays to floats - log_pdf = mn.logpdf( + log_pdf = multivariate_normal.logpdf( (detection.state_vector - measurement_prediction.state_vector).ravel(), cov=measurement_prediction.covar) pdf = Probability(log_pdf, log_value=True) - probability = (pdf * self.prob_detect)/self.clutter_spatial_density - # True detection hypothesis - hypotheses.append( - SingleProbabilityHypothesis( - prediction, - detection, - probability, - measurement_prediction)) + if self._is_valid_measurement(measurement_prediction, detection): + validated_measurements += 1 + valid_measurement = True + + if self.include_all or valid_measurement: + probability = pdf * self.prob_detect + if self.clutter_spatial_density is None: + # Note: will divide by validated measurements count later... + probability *= self._validation_region_volume( + self.prob_gate, measurement_prediction) + else: + probability /= self.clutter_spatial_density + + # True detection hypothesis + hypotheses.append( + SingleProbabilityHypothesis( + prediction, + detection, + probability, + measurement_prediction)) + + if self.clutter_spatial_density is None: + for hypothesis in hypotheses[1:]: # Skip missed detection + hypothesis.probability /= validated_measurements return MultipleHypothesis(hypotheses, normalise=True, total_weight=1) + + def _is_valid_measurement(self, meas_pred, det): + z = meas_pred.state_vector - det.state_vector + return \ + z.T@self._inv_cov(meas_pred)@z <= self._gate_threshold(self.prob_gate, meas_pred.ndim) + + @classmethod + @lru_cache() + def _validation_region_volume(cls, prob_gate, meas_pred): + n = meas_pred.ndim + gate_threshold = cls._gate_threshold(prob_gate, n) + c_z = np.pi**(n/2) / gamma(n/2 + 1) + return c_z * gate_threshold**(n/2) * np.sqrt(det(meas_pred.covar)) + + @staticmethod + @lru_cache() + def _inv_cov(meas_pred): + return np.linalg.inv(meas_pred.covar) + + @staticmethod + @lru_cache() + def _gate_threshold(prob_gate, n): + return chi2.ppf(float(prob_gate), n) diff --git a/stonesoup/hypothesiser/tests/test_composite.py b/stonesoup/hypothesiser/tests/test_composite.py index 39731e73d..a2605fcdc 100644 --- a/stonesoup/hypothesiser/tests/test_composite.py +++ b/stonesoup/hypothesiser/tests/test_composite.py @@ -25,11 +25,11 @@ def make_categorical_measurement_model(ndim_state, ndim_meas): def test_composite(predictor, updater, dummy_category_predictor, dummy_category_updater): sub_hypothesisers = [ PDAHypothesiser(predictor, updater, clutter_spatial_density=1.2e-2, prob_detect=0.9, - prob_gate=0.99), + prob_gate=0.99, include_all=True), HMMHypothesiser(dummy_category_predictor, dummy_category_updater, prob_detect=0.7, prob_gate=0.95), PDAHypothesiser(predictor, updater, clutter_spatial_density=1.4e-2, prob_detect=0.5, - prob_gate=0.98), + prob_gate=0.98, include_all=True), HMMHypothesiser(dummy_category_predictor, dummy_category_updater, prob_detect=0.8, prob_gate=0.97) ] diff --git a/stonesoup/hypothesiser/tests/test_probability.py b/stonesoup/hypothesiser/tests/test_probability.py index 33cfdb00e..c5fe3b489 100644 --- a/stonesoup/hypothesiser/tests/test_probability.py +++ b/stonesoup/hypothesiser/tests/test_probability.py @@ -1,6 +1,7 @@ import datetime import numpy as np +import pytest from ..probability import PDAHypothesiser from ...types.detection import Detection, MissedDetection @@ -24,20 +25,60 @@ def test_pda(predictor, updater): mulltihypothesis = \ hypothesiser.hypothesise(track, detections, timestamp) - # There are 3 weighted hypotheses - Detections 1 and 2, MissedDectection + # There are 2 weighted hypotheses - Detections 1 and MissedDetection + # Detection 2 is not a "valid measurement" and gated out + assert len(mulltihypothesis) == 2 + + # Each hypothesis has a probability/weight attribute + assert all(hypothesis.probability >= 0 and isinstance(hypothesis.probability, Probability) + for hypothesis in mulltihypothesis) + + # Detection 1 and MissedDetection are present + assert detection1 in {hypothesis.measurement for hypothesis in mulltihypothesis} + assert any(isinstance(hypothesis.measurement, MissedDetection) + for hypothesis in mulltihypothesis) + + hypothesiser = PDAHypothesiser(predictor, updater, + clutter_spatial_density=1.2e-2, + prob_detect=0.9, prob_gate=0.99, + include_all=True) + + mulltihypothesis = \ + hypothesiser.hypothesise(track, detections, timestamp) + + # There are 3 weighted hypotheses - Detections 1 and 2, MissedDetection assert len(mulltihypothesis) == 3 # Each hypothesis has a probability/weight attribute - assert all(hypothesis.probability >= 0 and - isinstance(hypothesis.probability, Probability) - for hypothesis in - mulltihypothesis) + assert all(hypothesis.probability >= 0 and isinstance(hypothesis.probability, Probability) + for hypothesis in mulltihypothesis) # Detection 1, 2 and MissedDetection are present - assert any(hypothesis.measurement is detection1 for hypothesis in - mulltihypothesis) - assert any(hypothesis.measurement is detection2 for hypothesis in - mulltihypothesis) + assert {detection1, detection2} < {hypothesis.measurement for hypothesis in mulltihypothesis} + assert any(isinstance(hypothesis.measurement, MissedDetection) + for hypothesis in mulltihypothesis) + + hypothesiser = PDAHypothesiser(predictor, updater, + prob_detect=0.9, prob_gate=0.99) + + mulltihypothesis = \ + hypothesiser.hypothesise(track, detections, timestamp) + + # There are 2 weighted hypotheses - Detections 1 and MissedDetection + # Detection 2 is not a "valid measurement" and gated out + assert len(mulltihypothesis) == 2 + + # Each hypothesis has a probability/weight attribute + assert all(hypothesis.probability >= 0 and isinstance(hypothesis.probability, Probability) + for hypothesis in mulltihypothesis) + + # Detection 1 and MissedDetection are present + assert detection1 in {hypothesis.measurement for hypothesis in mulltihypothesis} assert any(isinstance(hypothesis.measurement, MissedDetection) - for hypothesis in - mulltihypothesis) + for hypothesis in mulltihypothesis) + + +def test_invalid_pda_arguments(predictor, updater): + with pytest.raises(ValueError): + PDAHypothesiser(predictor, updater, + prob_detect=0.9, prob_gate=0.99, include_all=True) From 70ddf7f3ad39fe4fdf891880e0b1c7f399f47c70 Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Wed, 24 Aug 2022 09:09:50 +0100 Subject: [PATCH 2/3] Group related code in PDA Hypothesiser --- stonesoup/hypothesiser/probability.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/stonesoup/hypothesiser/probability.py b/stonesoup/hypothesiser/probability.py index bd585c644..2c017e32a 100644 --- a/stonesoup/hypothesiser/probability.py +++ b/stonesoup/hypothesiser/probability.py @@ -137,7 +137,6 @@ def hypothesise(self, track, detections, timestamp, **kwargs): # True detection hypotheses for detection in detections: - valid_measurement = False # Re-evaluate prediction prediction = self.predictor.predict( track, timestamp=detection.timestamp, **kwargs) @@ -154,14 +153,13 @@ def hypothesise(self, track, detections, timestamp, **kwargs): if self._is_valid_measurement(measurement_prediction, detection): validated_measurements += 1 valid_measurement = True + else: + # Will be gated out unless include_all is set + valid_measurement = False if self.include_all or valid_measurement: probability = pdf * self.prob_detect - if self.clutter_spatial_density is None: - # Note: will divide by validated measurements count later... - probability *= self._validation_region_volume( - self.prob_gate, measurement_prediction) - else: + if self.clutter_spatial_density is not None: probability /= self.clutter_spatial_density # True detection hypothesis @@ -174,7 +172,8 @@ def hypothesise(self, track, detections, timestamp, **kwargs): if self.clutter_spatial_density is None: for hypothesis in hypotheses[1:]: # Skip missed detection - hypothesis.probability /= validated_measurements + hypothesis.probability *= self._validation_region_volume( + self.prob_gate, hypothesis.measurement_prediction) / validated_measurements return MultipleHypothesis(hypotheses, normalise=True, total_weight=1) From 323f87a7ba3d1e05649cb1dd1d501da93fd17204 Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Wed, 24 Aug 2022 09:19:39 +0100 Subject: [PATCH 3/3] Change PDAHypothesiser to use SquaredMahalanobis measure --- stonesoup/hypothesiser/probability.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/stonesoup/hypothesiser/probability.py b/stonesoup/hypothesiser/probability.py index 2c017e32a..b95049542 100644 --- a/stonesoup/hypothesiser/probability.py +++ b/stonesoup/hypothesiser/probability.py @@ -7,6 +7,7 @@ from .base import Hypothesiser from ..base import Property +from ..measures import SquaredMahalanobis from ..types.detection import MissedDetection from ..types.hypothesis import SingleProbabilityHypothesis from ..types.multihypothesis import MultipleHypothesis @@ -123,6 +124,7 @@ def hypothesise(self, track, detections, timestamp, **kwargs): hypotheses = list() validated_measurements = 0 + measure = SquaredMahalanobis(state_covar_inv_cache_size=None) # Common state & measurement prediction prediction = self.predictor.predict(track, timestamp=timestamp, **kwargs) @@ -150,7 +152,8 @@ def hypothesise(self, track, detections, timestamp, **kwargs): cov=measurement_prediction.covar) pdf = Probability(log_pdf, log_value=True) - if self._is_valid_measurement(measurement_prediction, detection): + if measure(measurement_prediction, detection) \ + <= self._gate_threshold(self.prob_gate, measurement_prediction.ndim): validated_measurements += 1 valid_measurement = True else: @@ -177,11 +180,6 @@ def hypothesise(self, track, detections, timestamp, **kwargs): return MultipleHypothesis(hypotheses, normalise=True, total_weight=1) - def _is_valid_measurement(self, meas_pred, det): - z = meas_pred.state_vector - det.state_vector - return \ - z.T@self._inv_cov(meas_pred)@z <= self._gate_threshold(self.prob_gate, meas_pred.ndim) - @classmethod @lru_cache() def _validation_region_volume(cls, prob_gate, meas_pred): @@ -190,11 +188,6 @@ def _validation_region_volume(cls, prob_gate, meas_pred): c_z = np.pi**(n/2) / gamma(n/2 + 1) return c_z * gate_threshold**(n/2) * np.sqrt(det(meas_pred.covar)) - @staticmethod - @lru_cache() - def _inv_cov(meas_pred): - return np.linalg.inv(meas_pred.covar) - @staticmethod @lru_cache() def _gate_threshold(prob_gate, n):