Skip to content

Commit

Permalink
Merge pull request #732 from dstl/remove_repeated_code_in_sensor
Browse files Browse the repository at this point in the history
Remove repeated code in sensor
  • Loading branch information
sdhiscocks authored Oct 10, 2022
2 parents 7667aa0 + b03afc7 commit 8731398
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 208 deletions.
34 changes: 4 additions & 30 deletions stonesoup/sensor/passive.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
from typing import Set, Union

import numpy as np

from ..base import Property
from ..models.measurement.nonlinear import CartesianToElevationBearing
from ..sensor.sensor import Sensor
from ..sensor.sensor import SimpleSensor
from ..types.array import CovarianceMatrix
from ..types.detection import TrueDetection
from ..types.groundtruth import GroundTruthState


class PassiveElevationBearing(Sensor):
class PassiveElevationBearing(SimpleSensor):
"""A simple passive sensor that generates measurements of targets, using a
:class:`~.CartesianToElevationBearing` model, relative to its position.
Expand Down Expand Up @@ -41,28 +38,5 @@ def measurement_model(self):
translation_offset=self.position,
rotation_offset=self.orientation)

def measure(self, ground_truths: Set[GroundTruthState], noise: Union[np.ndarray, bool] = True,
**kwargs) -> Set[TrueDetection]:

measurement_model = self.measurement_model

if noise is True:
# Pre-fetch noise values
noise_vectors_iter = iter(measurement_model.rvs(len(ground_truths), **kwargs))

detections = set()
for truth in ground_truths:
if noise is True:
noise_val = next(noise_vectors_iter)
else:
noise_val = noise

measurement_vector = measurement_model.function(truth, noise=noise_val, **kwargs)

detection = TrueDetection(measurement_vector,
measurement_model=measurement_model,
timestamp=truth.timestamp,
groundtruth_path=truth)
detections.add(detection)

return detections
def is_detectable(self, state: GroundTruthState) -> bool:
return True
206 changes: 29 additions & 177 deletions stonesoup/sensor/radar/radar.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import copy
from math import erfc
from typing import Tuple, Set, Union

import numpy as np
import scipy.constants as const
from math import erfc

from .beam_pattern import BeamTransitionModel
from .beam_shape import BeamShape
Expand All @@ -13,18 +13,17 @@
from ...models.measurement.nonlinear import \
(CartesianToBearingRange, CartesianToElevationBearingRange,
CartesianToBearingRangeRate, CartesianToElevationBearingRangeRate)
from ...sensor.actionable import ActionableProperty
from ...sensor.action.dwell_action import DwellActionsGenerator
from ...sensor.sensor import Sensor
from ...sensor.actionable import ActionableProperty
from ...sensor.sensor import Sensor, SimpleSensor
from ...types.array import CovarianceMatrix
from ...types.detection import TrueDetection
from ...types.groundtruth import GroundTruthState
from ...types.numeric import Probability
from ...types.state import StateVector
from ...models.clutter.clutter import ClutterModel


class RadarBearingRange(Sensor):
class RadarBearingRange(SimpleSensor):
"""A simple radar sensor that generates measurements of targets, using a
:class:`~.CartesianToBearingRange` model, relative to its position.
Expand All @@ -45,11 +44,6 @@ class RadarBearingRange(Sensor):
doc="The sensor noise covariance matrix. This is utilised by "
"(and follows in format) the underlying "
":class:`~.CartesianToBearingRange` model")
clutter_model: ClutterModel = Property(
default=None,
doc="An optional clutter generator that adds a set of simulated "
":class:`Clutter` objects to the measurements at each time step. "
"The clutter is simulated according to the provided distribution.")
max_range: float = Property(
default=np.inf,
doc="The maximum detection range of the radar (in meters)")
Expand All @@ -63,45 +57,10 @@ def measurement_model(self):
translation_offset=self.position,
rotation_offset=self.orientation)

def measure(self, ground_truths: Set[GroundTruthState], noise: Union[np.ndarray, bool] = True,
**kwargs) -> Set[TrueDetection]:

measurement_model = self.measurement_model

if noise is True:
# Pre-fetch noise values
noise_vectors_iter = iter(measurement_model.rvs(len(ground_truths), **kwargs))

detections = set()
for truth in ground_truths:
measurement_vector = measurement_model.function(truth, noise=False, **kwargs)

if noise is True:
measurement_noise = next(noise_vectors_iter)
else:
measurement_noise = noise

range_t = measurement_vector[1, 0] # Bearing(0), Range(1)
# Do not measure if state is out of range
if range_t > self.max_range:
continue

# Add in measurement noise to the measurement vector
measurement_vector += measurement_noise

detection = TrueDetection(measurement_vector,
measurement_model=measurement_model,
timestamp=truth.timestamp,
groundtruth_path=truth)
detections.add(detection)

# Generate clutter at this time step
if self.clutter_model is not None:
self.clutter_model.measurement_model = measurement_model
clutter = self.clutter_model.function(ground_truths)
detections = set.union(detections, clutter)

return detections
def is_detectable(self, state: GroundTruthState) -> bool:
measurement_vector = self.measurement_model.function(state, noise=False)
true_range = measurement_vector[1, 0] # Bearing(0), Range(1)
return true_range <= self.max_range


class RadarRotatingBearingRange(RadarBearingRange):
Expand Down Expand Up @@ -159,50 +118,18 @@ def measure(self, ground_truths: Set[GroundTruthState], noise: Union[np.ndarray,
# No ground truths to get timestamp from
return set()

measurement_model = self.measurement_model
detections = set()
return super().measure(ground_truths, noise, **kwargs)

if noise is True:
# Pre-fetch noise values
noise_vectors_iter = iter(measurement_model.rvs(len(ground_truths), **kwargs))
def is_detectable(self, state: GroundTruthState) -> bool:
measurement_vector = self.measurement_model.function(state, noise=False)

for truth in ground_truths:
# Transform state to measurement space and generate
# random noise
measurement_vector = measurement_model.function(truth, noise=False, **kwargs)

if noise is True:
measurement_noise = next(noise_vectors_iter)
else:
measurement_noise = noise

# Check if state falls within sensor's FOV
fov_min = -self.fov_angle / 2
fov_max = +self.fov_angle / 2
bearing_t = measurement_vector[0, 0]
range_t = measurement_vector[1, 0]

# Do not measure if state not in FOV
if (bearing_t > fov_max or bearing_t < fov_min
or range_t > self.max_range):
continue

# Else add measurement
measurement_vector += measurement_noise # Add noise

detection = TrueDetection(measurement_vector,
measurement_model=measurement_model,
timestamp=truth.timestamp,
groundtruth_path=truth)
detections.add(detection)

# Generate clutter at this time step
if self.clutter_model is not None:
self.clutter_model.measurement_model = measurement_model
clutter = self.clutter_model.function(ground_truths)
detections = set.union(detections, clutter)
# Check if state falls within sensor's FOV
fov_min = -self.fov_angle / 2
fov_max = +self.fov_angle / 2
bearing_t = measurement_vector[0, 0]
true_range = measurement_vector[1, 0]

return detections
return fov_min <= bearing_t <= fov_max and true_range <= self.max_range


class RadarElevationBearingRange(RadarBearingRange):
Expand Down Expand Up @@ -236,47 +163,10 @@ def measurement_model(self):
translation_offset=self.position,
rotation_offset=self.orientation)

def measure(self, ground_truths: Set[GroundTruthState], noise: Union[np.ndarray, bool] = True,
**kwargs) -> Set[TrueDetection]:

measurement_model = self.measurement_model

if noise is True:
# Pre-fetch noise values
noise_vectors_iter = iter(measurement_model.rvs(len(ground_truths), **kwargs))

detections = set()
for truth in ground_truths:
# Initially no noise is added to the measurement vector
measurement_vector = measurement_model.function(truth, noise=False, **kwargs)

if noise is True:
measurement_noise = next(noise_vectors_iter)
else:
measurement_noise = noise

# Check if state falls within range of sensor
range_t = measurement_vector[2, 0] # Elevation(0), Bearing(1), Range(2)
# Do not measure if state is out of range
if range_t > self.max_range:
continue

# Add in measurement noise to the measurement vector
measurement_vector += measurement_noise

detection = TrueDetection(measurement_vector,
measurement_model=measurement_model,
timestamp=truth.timestamp,
groundtruth_path=truth)
detections.add(detection)

# Generate clutter at this time step
if self.clutter_model is not None:
self.clutter_model.measurement_model = measurement_model
clutter = self.clutter_model.function(ground_truths)
detections = set.union(detections, clutter)

return detections
def is_detectable(self, state: GroundTruthState) -> bool:
measurement_vector = self.measurement_model.function(state, noise=False)
true_range = measurement_vector[2, 0] # Elevation(0), Bearing(1), Range(2)
return true_range <= self.max_range


class RadarBearingRangeRate(RadarBearingRange):
Expand Down Expand Up @@ -314,29 +204,10 @@ def measurement_model(self):
velocity=self.velocity,
rotation_offset=self.orientation)

def measure(self, ground_truths: Set[GroundTruthState], noise: Union[np.ndarray, bool] = True,
**kwargs) -> Set[TrueDetection]:

measurement_model = self.measurement_model

if noise is True:
# Pre-fetch noise values
noise_vectors_iter = iter(measurement_model.rvs(len(ground_truths), **kwargs))

detections = set()
for truth in ground_truths:
if noise is True:
noise_val = next(noise_vectors_iter)
else:
noise_val = noise
measurement_vector = measurement_model.function(truth, noise=noise_val, **kwargs)
detection = TrueDetection(measurement_vector,
measurement_model=measurement_model,
timestamp=truth.timestamp,
groundtruth_path=truth)
detections.add(detection)

return detections
def is_detectable(self, state: GroundTruthState) -> bool:
measurement_vector = self.measurement_model.function(state, noise=False)
true_range = measurement_vector[1, 0] # Bearing(0), Range(1), Range-Rate(2)
return true_range <= self.max_range


class RadarElevationBearingRangeRate(RadarBearingRangeRate):
Expand Down Expand Up @@ -373,29 +244,10 @@ def measurement_model(self):
velocity=self.velocity,
rotation_offset=self.orientation)

def measure(self, ground_truths: Set[GroundTruthState], noise: Union[np.ndarray, bool] = True,
**kwargs) -> Set[TrueDetection]:

measurement_model = self.measurement_model

if noise is True:
# Pre-fetch noise values
noise_vectors_iter = iter(measurement_model.rvs(len(ground_truths), **kwargs))

detections = set()
for truth in ground_truths:
if noise is True:
noise_val = next(noise_vectors_iter)
else:
noise_val = noise
measurement_vector = measurement_model.function(truth, noise=noise_val, **kwargs)
detection = TrueDetection(measurement_vector,
measurement_model=measurement_model,
timestamp=truth.timestamp,
groundtruth_path=truth)
detections.add(detection)

return detections
def is_detectable(self, state: GroundTruthState) -> bool:
measurement_vector = self.measurement_model.function(state, noise=False)
true_range = measurement_vector[2, 0] # Elevation(0), Bearing(1), Range(2), Range-Rate(3)
return true_range <= self.max_range


class RadarRasterScanBearingRange(RadarRotatingBearingRange):
Expand Down
57 changes: 56 additions & 1 deletion stonesoup/sensor/sensor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from abc import abstractmethod
from abc import abstractmethod, ABC
from typing import Set, Union, Sequence

import numpy as np

from .actionable import Actionable
from .base import PlatformMountable
from ..base import Property
from ..models.clutter.clutter import ClutterModel
from ..types.detection import TrueDetection
from ..types.groundtruth import GroundTruthState

Expand Down Expand Up @@ -73,6 +74,60 @@ def measurement_model(self):
raise NotImplementedError


class SimpleSensor(Sensor, ABC):

clutter_model: ClutterModel = Property(
default=None,
doc="An optional clutter generator that adds a set of simulated "
":class:`Clutter` objects to the measurements at each time step. "
"The clutter is simulated according to the provided distribution.")

def measure(self, ground_truths: Set[GroundTruthState], noise: Union[np.ndarray, bool] = True,
**kwargs) -> Set[TrueDetection]:

measurement_model = self.measurement_model

detectable_ground_truths = [truth for truth in ground_truths
if self.is_detectable(truth)]

if noise is True:
if len(detectable_ground_truths) > 1:
noise_vectors_iter = iter(measurement_model.rvs(len(detectable_ground_truths),
**kwargs))
else:
noise_vectors_iter = iter([measurement_model.rvs(**kwargs)])

detections = set()
for truth in detectable_ground_truths:
measurement_vector = measurement_model.function(truth, noise=False, **kwargs)

if noise is True:
measurement_noise = next(noise_vectors_iter)
else:
measurement_noise = noise

# Add in measurement noise to the measurement vector
measurement_vector += measurement_noise

detection = TrueDetection(measurement_vector,
measurement_model=measurement_model,
timestamp=truth.timestamp,
groundtruth_path=truth)
detections.add(detection)

# Generate clutter at this time step
if self.clutter_model is not None:
self.clutter_model.measurement_model = measurement_model
clutter = self.clutter_model.function(ground_truths)
detections = set.union(detections, clutter)

return detections

@abstractmethod
def is_detectable(self, state: GroundTruthState) -> bool:
raise NotImplementedError


class SensorSuite(Sensor):
"""Sensor composition type
Expand Down

0 comments on commit 8731398

Please sign in to comment.