Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interpolation in Track-to-Track associators and metrics. #364

Closed
sdhiscocks opened this issue Feb 2, 2021 · 2 comments · Fixed by #872
Closed

Interpolation in Track-to-Track associators and metrics. #364

sdhiscocks opened this issue Feb 2, 2021 · 2 comments · Fixed by #872
Assignees
Labels
enhancement good first issue A good first issue to start contributing to the project

Comments

@sdhiscocks
Copy link
Member

A few times a use case has come up to use Stone Soup metrics to investigate existing track and truth data sets, but this can be difficult if the track/truth timestamps do not align, as then states on the track/truth cannot be compared. It would be good if the data associators (and then in turn metrics?) supported interpolating tracks/truths. It may also be better if this was a separate pre-process to using associators/metrics?

I think it would make sense to look at the smoother components in Stone Soup to help with interpolation.

@sdhiscocks sdhiscocks added this to the v1.0 milestone Mar 22, 2021
@sdhiscocks sdhiscocks added the good first issue A good first issue to start contributing to the project label Apr 14, 2023
@gawebb-dstl
Copy link
Contributor

We use an interpolate function locally which might be a good starting point. It's hardcoded to sample every second which probably should be changed

def interpolate(tracks_set: Set[StateMutableSequence]):
    new_tracks = set()

    for track in tracks_set:

        # Need to avoid deepcopy
        new_track = copy(track)
        if hasattr(new_track, "metadatas"):
            new_track.metadatas = list()
        new_track.states = list()
        state_iter = iter(track)
        aft = bef = next(state_iter)

        # Create list of timestamps with 1 second resolution
        # Between first and last state
        first_timestamp_round_up = (track.states[0].timestamp + timedelta(seconds=1)).replace(microsecond=0)
        last_timestamp_round_down = track.states[-1].timestamp.replace(microsecond=0)
        timestamps = time_range(first_timestamp_round_up, last_timestamp_round_down)

        for timestamp in timestamps:

            # want `bef` to be state just before timestamp, and `aft` to be state just after
            while aft.timestamp < timestamp:
                bef = aft
                while bef.timestamp == aft.timestamp:
                    aft = next(state_iter)

            if aft.timestamp == timestamp:
                # if `aft` happens to have timestamp exactly equal, just take its state vector
                sv = aft.state_vector
            else:
                # otherwise, get the point on the line connecting `bef` and `aft`
                # that lies exactly where `timestamp` would be (assuming constant velocity)
                bef_sv = bef.state_vector
                aft_sv = aft.state_vector

                frac = (timestamp - bef.timestamp) / (aft.timestamp - bef.timestamp)

                sv = bef_sv + ((aft_sv - bef_sv) * frac)

            # use `bef` as template for new state (since metadata might change in future, at `aft`)
            new_state = State.from_state(bef, state_vector=sv, timestamp=timestamp)
            new_state.state_vector = sv
            new_track.append(new_state)
        new_tracks.add(new_track)

    return new_tracks

@gawebb-dstl
Copy link
Contributor

Other options are:

def interpolate2(state_sequence: StateMutableSequence, times: List[datetime]):
    output_states = []
    state_sequence_times = [state.timestamp for state in state_sequence]
    for timestamp in times:
        if timestamp in state_sequence_times:
            output_states.append(state_sequence[timestamp])
        elif timestamp < state_sequence_times[0] or state_sequence_times[-1] < timestamp:
            continue
        else:
            pass
            # todo
            # do some interpolating

    output = copy.copy(state_sequence)
    output.states = output_states
    return output

from scipy.interpolate import interp1d
def interpolate3(state_sequence: StateMutableSequence, times: List[datetime]):

    adjusted_times = [time.timestamp() for time in times
                      if state_sequence[0].timestamp <= time <= state_sequence[-1].timestamp]

    svs = [state.state_vector for state in state_sequence.states]
    state_times = [state.timestamp.timestamp() for state in state_sequence]
    interpolate_object = interp1d(state_times, np.stack(svs, axis=0), axis=0)

    output1 = interpolate_object(adjusted_times)

    new_states = []
    for idx, time in enumerate(adjusted_times):
        new_states.append(State(output1[idx, :, :], timestamp=time))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement good first issue A good first issue to start contributing to the project
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants