-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtracker.py
executable file
·176 lines (150 loc) · 6.41 KB
/
tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
Version: 1.5
Summary: Class of Tracker Using Kalman Filter & Hungarian Algorithm
Author: suxing liu
Author-email: [email protected]
USAGE:
from kalman_filter import KalmanFilter
"""
# Import python libraries
import numpy as np
from kalman_filter import KalmanFilter
from scipy.optimize import linear_sum_assignment
class Track(object):
"""Track class for every object to be tracked
Attributes:
None
"""
def __init__(self, prediction, trackIdCount):
"""Initialize variables used by Track class
Args:
prediction: predicted centroids of object to be tracked
trackIdCount: identification of each track object
Return:
None
"""
self.track_id = trackIdCount # identification of each track object
self.KF = KalmanFilter() # KF instance to track this object
self.prediction = np.asarray(prediction) # predicted centroids (x,y)
self.skipped_frames = 0 # number of frames skipped undetected
self.trace = [] # trace path
class Tracker(object):
"""Tracker class that updates track vectors of object tracked
Attributes:
None
"""
def __init__(self, dist_thresh, max_frames_to_skip, max_trace_length, trackIdCount):
"""Initialize variable used by Tracker class
Args:
dist_thresh: distance threshold. When exceeds the threshold,
track will be deleted and new track is created
max_frames_to_skip: maximum allowed frames to be skipped for
the track object undetected
max_trace_lenght: trace path history length
trackIdCount: identification of each track object
Return:
None
"""
self.dist_thresh = dist_thresh
self.max_frames_to_skip = max_frames_to_skip
self.max_trace_length = max_trace_length
self.tracks = []
self.trackIdCount = trackIdCount
def Update(self, detections):
"""Update tracks vector using following steps:
- Create tracks if no tracks vector found
- Calculate cost using sum of square distance
between predicted vs detected centroids
- Using Hungarian Algorithm assign the correct
detected measurements to predicted tracks
https://en.wikipedia.org/wiki/Hungarian_algorithm
- Identify tracks with no assignment, if any
- If tracks are not detected for long time, remove them
- Now look for un_assigned detects
- Start new tracks
- Update KalmanFilter state, lastResults and tracks trace
Args:
detections: detected centroids of object to be tracked
Return:
None
"""
# Create tracks if no tracks vector found
if (len(self.tracks) == 0):
for i in range(len(detections)):
track = Track(detections[i], self.trackIdCount)
self.trackIdCount += 1
self.tracks.append(track)
# Calculate cost using sum of square distance between
# predicted vs detected centroids
N = len(self.tracks)
M = len(detections)
cost = np.zeros(shape=(N, M)) # Cost matrix
for i in range(len(self.tracks)):
for j in range(len(detections)):
try:
diff = self.tracks[i].prediction - detections[j]
distance = np.sqrt(diff[0][0]*diff[0][0] +
diff[1][0]*diff[1][0])
cost[i][j] = distance
except:
pass
# Let's average the squared ERROR
cost = (0.5) * cost
# Using Hungarian Algorithm assign the correct detected measurements
# to predicted tracks
assignment = []
for _ in range(N):
assignment.append(-1)
row_ind, col_ind = linear_sum_assignment(cost)
for i in range(len(row_ind)):
assignment[row_ind[i]] = col_ind[i]
# Identify tracks with no assignment, if any
un_assigned_tracks = []
for i in range(len(assignment)):
if (assignment[i] != -1):
# check for cost distance threshold.
# If cost is very high then un_assign (delete) the track
if (cost[i][assignment[i]] > self.dist_thresh):
assignment[i] = -1
un_assigned_tracks.append(i)
pass
else:
self.tracks[i].skipped_frames += 1
# If tracks are not detected for long time, remove them
del_tracks = []
for i in range(len(self.tracks)):
if (self.tracks[i].skipped_frames > self.max_frames_to_skip):
del_tracks.append(i)
if len(del_tracks) > 0: # only when skipped frame exceeds max
for id in del_tracks:
if id < len(self.tracks):
del self.tracks[id]
del assignment[id]
else:
dprint("ERROR: id is greater than length of tracks")
# Now look for un_assigned detects
un_assigned_detects = []
for i in range(len(detections)):
if i not in assignment:
un_assigned_detects.append(i)
# Start new tracks
if(len(un_assigned_detects) != 0):
for i in range(len(un_assigned_detects)):
track = Track(detections[un_assigned_detects[i]],
self.trackIdCount)
self.trackIdCount += 1
self.tracks.append(track)
# Update KalmanFilter state, lastResults and tracks trace
for i in range(len(assignment)):
self.tracks[i].KF.predict()
if(assignment[i] != -1):
self.tracks[i].skipped_frames = 0
self.tracks[i].prediction = self.tracks[i].KF.correct(detections[assignment[i]], 1)
else:
self.tracks[i].prediction = self.tracks[i].KF.correct(np.array([[0], [0]]), 0)
if(len(self.tracks[i].trace) > self.max_trace_length):
for j in range(len(self.tracks[i].trace) -
self.max_trace_length):
del self.tracks[i].trace[j]
self.tracks[i].trace.append(self.tracks[i].prediction)
self.tracks[i].KF.lastResult = self.tracks[i].prediction