-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathsort.py
204 lines (167 loc) · 7.21 KB
/
sort.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""
SORT: A Simple, Online and Realtime Tracker
Copyright (C) 2016 Alex Bewley [email protected]
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import numpy as np
from scipy.optimize import linear_sum_assignment
from typing import Tuple
import cv2
def convert_bbox_to_z(bbox):
"""
Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form
[x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
the aspect ratio
"""
w, h = bbox[2:4] - bbox[0:2]
x, y = (bbox[0:2] + bbox[2:4]) / 2
s = w * h # scale is just area
r = w / h
return np.array([x, y, s, r])[:, None].astype(np.float64)
def convert_x_to_bbox(x):
"""
Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
[x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
"""
# Shape of x is (7, 1)
x = x[:, 0]
center = x[0:2]
w = np.sqrt(x[2] * x[3])
h = x[2] / w
half_size = np.array([w, h]) / 2
bbox = np.concatenate((center - half_size, center + half_size))
return bbox.astype(np.float64)
class KalmanBoxTracker:
"""
This class represents the internel state of individual tracked objects observed as bbox.
"""
def __init__(self, bbox, label):
self.id = label
self.time_since_update = 0
self.hit_streak = 0
self.kf = cv2.KalmanFilter(dynamParams=7, measureParams=4, type=cv2.CV_64F)
# define constant velocity model
self.kf.transitionMatrix = np.array(
[[1, 0, 0, 0, 1, 0, 0],
[0, 1, 0, 0, 0, 1, 0],
[0, 0, 1, 0, 0, 0, 1],
[0, 0, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 0, 1]], dtype=np.float64)
self.kf.processNoiseCov = np.diag([10, 10, 10, 10, 1e4, 1e4, 1e4]).astype(np.float64)
# We only observe
self.kf.measurementMatrix = np.array(
[[1, 0, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0, 0]], dtype=np.float64)
self.kf.measurementNoiseCov = np.diag([1, 1, 10, 10]).astype(np.float64)
# Start the particle at their initial position with 0 velocities.
self.kf.statePost = np.vstack((convert_bbox_to_z(bbox), [[0], [0], [0]]))
self.kf.errorCovPost = np.diag([1, 1, 1, 1, 1e-2, 1e-2, 1e-4]).astype(np.float64)
def update(self, bbox):
"""
Updates the state vector with observed bbox.
"""
self.time_since_update = 0
self.hit_streak += 1
self.kf.correct(convert_bbox_to_z(bbox))
def predict(self):
"""
Advances the state vector and returns the predicted bounding box estimate.
"""
if self.time_since_update > 0:
self.hit_streak = 0
self.time_since_update += 1
retval = self.kf.predict()
return convert_x_to_bbox(retval)
@property
def current_state(self):
"""
Returns the current bounding box estimate.
"""
return convert_x_to_bbox(self.kf.statePost)
def iou(a: np.ndarray, b: np.ndarray) -> float:
"""
Computes IUO between two bboxes in the form [x1,y1,x2,y2]
"""
a_tl, a_br = a[:4].reshape((2, 2))
b_tl, b_br = b[:4].reshape((2, 2))
int_tl = np.maximum(a_tl, b_tl)
int_br = np.minimum(a_br, b_br)
int_area = np.product(np.maximum(0., int_br - int_tl))
a_area = np.product(a_br - a_tl)
b_area = np.product(b_br - b_tl)
return int_area / (a_area + b_area - int_area)
def associate_detections_to_trackers(detections: np.ndarray, trackers: np.ndarray,
iou_threshold: float = 0.3) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Assigns detections to tracked object (both represented as bounding boxes)
Returns 3 lists of matches, unmatched_detections and unmatched_trackers
"""
iou_matrix = np.zeros((len(detections), len(trackers)), dtype=np.float64)
for d, det in enumerate(detections):
for t, trk in enumerate(trackers):
iou_matrix[d, t] = iou(det, trk)
row_ind, col_ind = linear_sum_assignment(-iou_matrix)
matched_indices = np.transpose(np.array([row_ind, col_ind]))
iou_values = np.array([iou_matrix[detection, tracker]
for detection, tracker in matched_indices])
good_matches = matched_indices[iou_values > 0.3]
unmatched_detections = np.array(
[i for i in range(len(detections)) if i not in good_matches[:, 0]])
unmatched_trackers = np.array(
[i for i in range(len(trackers)) if i not in good_matches[:, 1]])
return good_matches, unmatched_detections, unmatched_trackers
class Sort:
def __init__(self, max_age=10, min_hits=6):
"""
Sets key parameters for SORT
"""
self.max_age = max_age
self.min_hits = min_hits
self.trackers = []
self.count = 0
def next_id(self):
self.count += 1
return self.count
def update(self, dets):
"""
Params:
dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
Requires: this method must be called once for each frame even with empty detections.
Returns the a similar array, where the last column is the object ID.
NOTE: The number of objects returned may differ from the number of detections provided.
"""
# Predict new locations and remove trakcers with nans.
self.trackers = [
tracker for tracker in self.trackers if not np.any(
np.isnan(
tracker.predict()))]
# get predicted locations
trks = np.array([tracker.current_state for tracker in self.trackers])
matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(
dets, trks)
# update matched trackers with assigned detections
for detection_num, tracker_num in matched:
self.trackers[tracker_num].update(dets[detection_num])
# create and initialise new trackers for unmatched detections
for i in unmatched_dets:
self.trackers.append(KalmanBoxTracker(dets[i, :], self.next_id()))
ret = np.array([np.concatenate((trk.current_state, [trk.id + 1]))
for trk in self.trackers
if trk.time_since_update < 1 and trk.hit_streak >= self.min_hits])
# remove dead tracklet
self.trackers = [
tracker for tracker in self.trackers if tracker.time_since_update <= self.max_age]
return ret