-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathreward.py
322 lines (249 loc) · 9.7 KB
/
reward.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
"""
AWS DeepRacer reward function
"""
import math
# Constants
DEBUG_LOG_ENABLED = True
# Action space constants
MAX_SPEED = 8.0
MAX_STEERING_ANGLE = 30.0
# TUNING: Adjust these to find tune factors affect on reward
#
# Reward weights, always 0..1. These are relative to one another
SPEED_FACTOR_WEIGHT = 1.0
WHEEL_FACTOR_WEIGHT = 0.0
HEADING_FACTOR_WEIGHT = 0.0
STEERING_FACTOR_WEIGHT = 0.2 # Put pressure on steering but not too much
STEERING_FACTOR_EASING = 'linear'
PROGRESS_FACTOR_WEIGHT = 0.0
LANE_FACTOR_WEIGHT = 0.5
LANE_FACTOR_EASING = 'quintic'
#===============================================================================
#
# REWARD
#
#===============================================================================
def reward_function(params):
"""Reward function is:
f(s,w,h,t,p) = 1.0 * W(s,Ks) * W(w,Kw) * W(h,Kh) * W(t,Kt) * W(p,Kp) * W(l,Kl)
s: speed factor, linear 0..1 for range of speed from 0 to MAX_SPEED
w: wheel factor, non-linear 0..1 for wheels being off the track and
vehicle in danger of going off the track. We want to use the full
width of the track for smoothing curves so we only apply wheel
factor if the car is hanging off the track.
h: heading factor, 0..1 for range of angle between car heading vector
and the track direction vector. This is the current heading
based on the immediate direction of the car regardless of steering.
t: steering factor, 0..1 for steering pressure if steering the wrong
direction to correct the heading.
p: progress factor
l: lane factor
W: Weighting function: (1.0 - (1.0 - f) * Kf)
Kx: Weight of respective factor
Example 1:
s = 0
Ks = 0.5
reward = (1.0 - ((1.0 - s) * Ks)) = 1.0 - (1.0 - 0) * 0.5 = 0.5
Example 2:
s = 0.25
Ks = 1.0
reward = (1.0 - ((1.0 - s) * Ks)) = 1.0 - (1.0 - 0.25) * 1.0 = 0.25
Example 2:
s = 1.0
Ks = 0.1
reward = (1.0 - ((1.0 - s) * Ks)) = 1.0 - (1.0 - 1.0) * 1.0 = 1.0
params:
from https://docs.aws.amazon.com/deepracer/latest/developerguide/deepracer-reward-function-input.html
Name Type Value(s)
---- ---- --------
track_width float 0..Dtrack (varies)
distance_from_center float 0..~track_width/2
speed float 0.0..5.0
steering_angle float -30..30
all_wheels_on_track Boolean True|False
heading float -180..+180
waypoints list of [float, float] [[xw,0,yw,0] ... [xw,Max-1, yw,Max-1]]
closest_waypoints [int, int] [0..Max-2, 1..Max-1]
steps int 0..Nstep
progress float 0..100
"""
# s: Speed Factor: ideal speed is max
speed_factor = calculate_speed_factor(params)
# w: Wheel Factor: apply pressure when wheels are off the track
wheel_factor = calculate_wheel_factor(params)
# h: Heading Factor
heading_factor = calculate_heading_factor(params)
# t: Steering Factor
steering_factor = calculate_steering_factor(params)
# p: Progress Factor: TBD
progress_factor = 1.0
# l: Lane Factor
lane_factor = calculate_lane_factor(params)
# Log for validation
if DEBUG_LOG_ENABLED:
print("s: %0.2f, w: %0.2f, h: %0.2f, t: %0.2f, p %0.2f, l %0.2f" %
(speed_factor, wheel_factor, heading_factor, steering_factor,
progress_factor, lane_factor))
reward = 1.0
reward *= apply_weight(speed_factor, SPEED_FACTOR_WEIGHT)
reward *= apply_weight(wheel_factor, WHEEL_FACTOR_WEIGHT)
reward *= apply_weight(heading_factor, HEADING_FACTOR_WEIGHT)
reward *= apply_weight(steering_factor, STEERING_FACTOR_WEIGHT,
STEERING_FACTOR_EASING)
reward *= apply_weight(progress_factor, PROGRESS_FACTOR_WEIGHT)
reward *= apply_weight(lane_factor, LANE_FACTOR_WEIGHT, LANE_FACTOR_EASING)
return float(max(reward, 1e-3)) # make sure we never return exactly zero
#===============================================================================
#
# SPEED
#
#===============================================================================
def calculate_speed_factor(params):
""" Calculate the speed factor """
speed_factor = params['speed'] / MAX_SPEED
return min(speed_factor, 1.0)
#===============================================================================
#
# WHEELS
#
#===============================================================================
def calculate_wheel_factor(params):
""" Calculate the wheel factor """
wheel_factor = 1.0
if not params['all_wheels_on_track']:
wheel_factor = 0.25 # hard code multiplier rather than making it
# continuous since we don't know the width of
# the car wheelbase
"""
# SUPRESS: Probably don't need this. Lab docs say car will be reset if off the track
MAX_DISTANCE_FROM_CENTER = params['track_width']/2.0
# Hard fail if distance from center is large
if distance_from_center > MAX_DISTANCE_FROM_CENTER:
wheel_factor = 1e-3
"""
return min(wheel_factor, 1.0)
#===============================================================================
#
# HEADING
#
#===============================================================================
def calculate_heading_factor(params):
""" Calculate the heading factor """
"""
# SUPRESS: This is too experimental while we haven't finished tracks yet
closest_waypoints = params['closest_waypoints']
waypoints = params['waypoints']
heading = params['heading']
# Calculate the immediate track angle
wp1 = waypoints[closest_waypoints[0]]
wp2 = waypoints[closest_waypoints[1]]
ta1 = angle_of_vector([wp1,wp2])
print("track angle 1: %i" % ta1)
# h: Heading Factor: apply pressure as heading is different than track angle
# Find closest angle, accounting for possibility of wrapping
a = abs(ta1 - heading)
b = abs(ta1 - (heading + 360))
heading_delta = min(a,b)
# hard fail if going backwards
if heading_delta > 90:
heading_factor = 1e-3
elif heading_delta > 45:
heading_factor = 0.5
else:
heading_factor = 1.0
"""
heading_factor = 1.0
return min(heading_factor, 1.0)
#===============================================================================
#
# STEERING
#
#===============================================================================
def percentage_steering_angle(steering_angle):
steering_severity = abs(steering_angle) / MAX_STEERING_ANGLE
return max(min(1.0 - steering_severity, 1.0), 0.0)
def calculate_steering_factor(params):
""" Calculate the steering factor """
steering_factor = percentage_steering_angle(params['steering_angle'])
return min(steering_factor, 1.0)
#===============================================================================
#
# PROGRESS
#
#===============================================================================
def calculate_progress_factor(params):
""" Calculate the progress factor """
progress_factor = 1.0
return min(progress_factor, 1.0)
#===============================================================================
#
# LANE
#
#===============================================================================
def percentage_distance_from_track_center(track_width, distance_from_center):
""" Return a linear percentage distance along the track width from
the center to the outside
"""
# make sure not negative, in case distance_from_center is over the track_width
distance = distance_from_center / (track_width/2.0)
return max(min(1.0 - distance, 1.0), 0.0)
def calculate_lane_factor(params):
""" Calulcate the reward for the position on the track.
Be careful to account for the wheel factor here, possibly merge
the two later.
"""
lane_factor = percentage_distance_from_track_center(params['track_width'],
params['distance_from_center'])
return min(lane_factor, 1.0)
#===============================================================================
#
# HELPER METHODS
#
#===============================================================================
def apply_weight(factor, weight, easing='linear'):
"""Apply a weight to factor, clamping both arguments at 1.0
Factor values will be 0..1. This function will cause the range of the
factor values to be reduced according to:
f = 1 - weight * (1 - factor)^easing
In simple terms, a weight of 0.5 will cause the factor to only have weighted
values of 0.5..1.0. If we further apply an easing, the decay from 1.0 toward
the weighted minimum will be along a curve.
"""
f_clamp = min(factor, 1.0)
w_clamp = min(weight, 1.0)
if EASING_FUNCTIONS[easing]:
ease = EASING_FUNCTIONS[easing]
else:
ease = EASING_FUNCTIONS['linear']
return 1.0 - w_clamp * ease(1.0 - f_clamp)
def vector_of_angle(angle):
""" Unit vector of an angle in degrees. """
return [[0.0, 0.0], [math.sin(math.radians(angle)), math.cos(math.radians(angle))]]
def angle_of_vector(vector):
""" Calculate the angle of the vector in degrees relative to
a normal 2d coordinate system. This is useful for finding the
angle between two waypoints.
vector: [[x0,y0],[x1,y1]]
"""
rad = math.atan2(vector[1][1] - vector[0][1], vector[1][0] - vector[0][0])
return math.degrees(rad)
#
# SCALING FUNCTIONS
#
def ease_linear(x):
return x
def ease_quadratic(x):
return x*x
def ease_cubic(x):
return abs(x*x*x)
def ease_quartic(x):
return x*x*x*x
def ease_quintic(x):
return abs(x*x*x*x*x)
EASING_FUNCTIONS = {
'linear': ease_linear,
'quadratic': ease_quadratic,
'cubic': ease_cubic,
'quartic': ease_quartic,
'quintic': ease_quintic
}