-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy patheval_store.py
140 lines (111 loc) · 4.59 KB
/
eval_store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from argparse import ArgumentParser
import pytorch_lightning as pl
from torch_geometric.data import DataLoader
from datasets import ArgoverseV1Dataset
from models.hivt import HiVT
import logging
logger = logging.getLogger(__name__)
import os
import numpy as np
import torch
import matplotlib.pyplot as plt
import numpy as np
from typing import Dict, List, Tuple, NamedTuple, Any, Union, Optional
import os
import pickle
from tqdm import tqdm
def compute_ade(forecasted_trajectories, gt_trajectory):
"""Compute the average displacement error for a set of K predicted trajectories (for the same actor).
Args:
forecasted_trajectories: (K, N, 2) predicted trajectories, each N timestamps in length.
gt_trajectory: (N, 2) ground truth trajectory.
Returns:
(K,) Average displacement error for each of the predicted trajectories.
"""
# displacement_errors = np.mean(np.linalg.norm(forecasted_trajectories - gt_trajectory, axis=-1), 1)
displacement_errors = np.sqrt(np.sum((forecasted_trajectories - gt_trajectory)**2, -1))
ade = np.mean(displacement_errors, axis=-1)
return ade
def compute_fde(forecasted_trajectories, gt_trajectory):
"""Compute the final displacement error for a set of K predicted trajectories (for the same actor).
Args:
forecasted_trajectories: (K, N, 2) predicted trajectories, each N timestamps in length.
gt_trajectory: (N, 2) ground truth trajectory, FDE will be evaluated against true position at index `N-1`.
Returns:
(K,) Final displacement error for each of the predicted trajectories.
"""
# Compute final displacement error for all K trajectories
error_vector = forecasted_trajectories - gt_trajectory
fde_vector = error_vector[:, -1]
fde = np.linalg.norm(fde_vector, axis=-1)
return fde
class Metric:
def __init__(self):
self.values = []
def accumulate(self, value):
if value is not None:
self.values.append(value)
def get_mean(self):
if len(self.values) > 0:
return np.mean(self.values)
else:
return 0.0
def get_sum(self):
return np.sum(self.values)
class PredictionMetrics:
def __init__(self):
self.minADE = Metric()
self.minFDE = Metric()
self.MR = Metric()
self.brier_minFDE = Metric()
def serialize(self) -> Dict[str, Any]:
return dict(
minADE=float(self.minADE.get_mean()),
minFDE=float(self.minFDE.get_mean()),
MR=float(self.MR.get_mean()),
brier_minFDE=float(self.brier_minFDE.get_mean()),
)
if __name__ == '__main__':
#! set split first.
split='train'
#! prepare your model, dataloader configuration here.
model = None
dataloader = None
processed_dir = './p1/'
model.to("cuda")
model.eval()
metrics = PredictionMetrics()
for data in tqdm(dataloader):
data.to("cuda")
with torch.no_grad():
#! infer your model here and output trajectory and embeddings.
pred_trajectory = None # [K, N, T, 2]
embeds = None # [K, N, -1]
file_names = None # data ids
gt_eval = None # ground-truth: [N, T, 2]
embeds = embeds.transpose(0,1).detach().cpu().numpy()
pred_trajectory = pred_trajectory.detach().cpu().numpy()
gt_eval = gt_eval.detach().cpu().numpy()
for i in range(gt_trajectory.shape[0]):
forecasted_trajectories = pred_trajectory[i][:, :, :]
gt_trajectory = gt_eval[i][:,:]
#! make sure the file name is the same with original id in dataset.
raw_file_name = file_names[i]
#! dict to store..
dict_data = {
'traj': torch.from_numpy(pred_trajectory[i].copy().astype(np.float32)),
'embed': torch.from_numpy(embeds[i].copy().astype(np.float32)),
}
with open(os.path.join(processed_dir, split, f'{raw_file_name}.pkl'), 'wb') as handle:
pickle.dump(dict_data, handle, protocol=pickle.HIGHEST_PROTOCOL)
assert forecasted_trajectories.shape == (6, 30, 2)
assert gt_trajectory.shape == (30, 2)
fde = compute_fde(forecasted_trajectories, gt_trajectory)
idx = fde.argmin()
ade = compute_ade(forecasted_trajectories[idx], gt_trajectory)
metrics.minADE.accumulate(ade.min())
metrics.minFDE.accumulate(fde.min())
metrics.MR.accumulate(fde.min() > 2.0)
import json
print('Metrics:')
print(json.dumps(metrics.serialize(), indent=4))