Skip to content

Commit

Permalink
Add SquaredMahalanobis distance and cache covariance matrix inversions
Browse files Browse the repository at this point in the history
Multiple calls to Mahalanobis measure are common (e.g. comparing
measurement predictions to detections), and by caching the matrix
inversion, this can avoid repeating this costly operation.
  • Loading branch information
sdhiscocks committed Aug 3, 2022
1 parent bfbe6ec commit 0391745
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 17 deletions.
82 changes: 68 additions & 14 deletions stonesoup/measures.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
from abc import abstractmethod
from functools import lru_cache

import numpy as np
from scipy.spatial import distance
Expand Down Expand Up @@ -143,23 +144,34 @@ def __call__(self, state1, state2):
self.weighting)


class Mahalanobis(Measure):
r"""Mahalanobis distance measure
class SquaredMahalanobis(Measure):
r"""Squared Mahalanobis distance measure
This measure returns the Mahalanobis distance between a pair of
This measure returns the Squared Mahalanobis distance between a pair of
:class:`~.State` objects taking into account the distribution (i.e.
the :class:`~.CovarianceMatrix`) of the first :class:`.State` object
The Mahalanobis distance between a distribution with mean :math:`\mu` and
Covariance matrix :math:`\Sigma` and a point :math:`x` is defined as:
The Squared Mahalanobis distance between a distribution with mean :math:`\mu`
and Covariance matrix :math:`\Sigma` and a point :math:`x` is defined as:
.. math::
\sqrt{( {\mu - x}) \Sigma^{-1} ({\mu - x}^T )}
( {\mu - x}) \Sigma^{-1} ({\mu - x}^T )
"""
state_covar_inv_cache_size: int = Property(
default=128,
doc="Number of covariance matrix inversions to cache. Setting to `0` will disable the "
"cache, whilst setting to `None` will not limit the size of the cache. Default is "
"128.")

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.state_covar_inv_cache_size is None or self.state_covar_inv_cache_size > 0:
self._inv_cov = lru_cache(maxsize=self.state_covar_inv_cache_size)(self._inv_cov)

def __call__(self, state1, state2):
r"""Calculate the Mahalanobis distance between a pair of state objects
r"""Calculate the Squared Mahalanobis distance between a pair of state objects
Parameters
----------
Expand All @@ -169,7 +181,7 @@ def __call__(self, state1, state2):
Returns
-------
float
Mahalanobis distance between a pair of input :class:`~.State`
Squared Mahalanobis distance between a pair of input :class:`~.State`
objects
"""
Expand All @@ -180,17 +192,59 @@ def __call__(self, state1, state2):
u = state_vector1[self.mapping, 0]
v = state_vector2[self.mapping2, 0]
# extract the mapped covariance data
rows = np.array(self.mapping, dtype=np.intp)
columns = np.array(self.mapping, dtype=np.intp)
cov = state1.covar[rows[:, np.newaxis], columns]
vi = self._inv_cov(state1, tuple(self.mapping))
else:
u = state_vector1[:, 0]
v = state_vector2[:, 0]
cov = state1.covar
vi = self._inv_cov(state1)

delta = u - v

return np.dot(np.dot(delta, vi), delta)

@staticmethod
def _inv_cov(state, mapping=None):
if mapping:
rows = np.array(mapping, dtype=np.intp)
columns = np.array(mapping, dtype=np.intp)
covar = state.covar[rows[:, np.newaxis], columns]
else:
covar = state.covar

return np.linalg.inv(covar)


class Mahalanobis(SquaredMahalanobis):
r"""Mahalanobis distance measure
This measure returns the Mahalanobis distance between a pair of
:class:`~.State` objects taking into account the distribution (i.e.
the :class:`~.CovarianceMatrix`) of the first :class:`.State` object
The Mahalanobis distance between a distribution with mean :math:`\mu` and
Covariance matrix :math:`\Sigma` and a point :math:`x` is defined as:
.. math::
\sqrt{( {\mu - x}) \Sigma^{-1} ({\mu - x}^T )}
"""
def __call__(self, state1, state2):
r"""Calculate the Mahalanobis distance between a pair of state objects
vi = np.linalg.inv(cov)
Parameters
----------
state1 : :class:`~.State`
state2 : :class:`~.State`
Returns
-------
float
Mahalanobis distance between a pair of input :class:`~.State`
objects
return distance.mahalanobis(u, v, vi)
"""
return np.sqrt(super().__call__(state1, state2))


class SquaredGaussianHellinger(Measure):
Expand Down
6 changes: 3 additions & 3 deletions stonesoup/mixturereducer/gaussianmixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..base import Property
from .base import MixtureReducer
from ..types.state import TaggedWeightedGaussianState, WeightedGaussianState
from ..measures import Mahalanobis
from ..measures import SquaredMahalanobis
from operator import attrgetter


Expand Down Expand Up @@ -184,7 +184,7 @@ def merge(self, components_list):

merged_components = []
final_merged_components = []
measure = Mahalanobis()
measure = SquaredMahalanobis(state_covar_inv_cache_size=None)
while remaining_components:
# Get highest weighted component
best_component = remaining_components.pop()
Expand All @@ -204,7 +204,7 @@ def merge(self, components_list):
# Check for similar components against threshold
for component in matched_components:
# Calculate distance between component and best component
distance = (measure(state1=component, state2=best_component))**2
distance = measure(state1=component, state2=best_component)
# Merge if similar
if distance < self.merge_threshold:
remaining_components.remove(component)
Expand Down

0 comments on commit 0391745

Please sign in to comment.