-
Notifications
You must be signed in to change notification settings - Fork 1
/
noVis_sim.py
168 lines (142 loc) · 5.22 KB
/
noVis_sim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import numpy as np
import random
import math
from functools import reduce
class DroneEnv:
def __init__(
self,
row_count=10,
col_count=10,
n_anamolous=5,
uncertainity=5,
collision_dist=0.5,
n_drones=3,
step_size=0.5,
):
self.grid = None
self.row_count = row_count
self.col_count = col_count
self.n_anamolous = n_anamolous
self.uncertainity = uncertainity
self.uncertain_points = None
self.collision_dist = collision_dist
self.n_drones = n_drones
self.n_drones_pos = None
self.step_size = step_size
self.action_size = 4
self.state_size = row_count * col_count
self.step_func_count = 0
self.init_env()
def init_env(self):
self.step_func_count = 0
self.grid = []
for _ in range(self.row_count):
self.grid.append([1] * self.col_count)
i = 0
self.uncertain_points = {}
while i < self.n_anamolous:
a = np.random.randint(self.row_count)
b = np.random.randint(self.col_count)
if self.grid[a][b] == 1:
self.grid[a][b] = self.uncertainity
self.uncertain_points[(a, b)] = 1
i += 1
self.n_drones_pos = []
self.prev_drone_pos = []
for _ in range(self.n_drones):
x = np.random.randint(self.row_count)
y = np.random.randint(self.col_count)
self.n_drones_pos.append([float(x), float(y)])
def reset(self):
self.init_env()
grid = self.grid.copy()
grid = np.array(grid)
grid = grid.T.flatten()
return grid
def _eucid_dist(self, x1, y1, x2, y2):
return math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
def _det_collision(self, drone_x, drones_y):
reward = 0.0
for x in range(self.row_count):
break_flag = False
for y in range(self.col_count):
dist = self._eucid_dist(drone_x, drones_y, x, y)
if dist < self.collision_dist and self.grid[x][y] > 0:
self.grid[x][y] = 0
reward += 10.0 + self._check_uncertain_mapped()
break_flag = True
break
if break_flag:
break
return reward
def _drone_dist(self):
dist_count = 0
for i in range(self.n_drones - 1):
for j in range(i + 1, self.n_drones):
x1 = self.n_drones_pos[i][0]
y1 = self.n_drones_pos[i][1]
x2 = self.n_drones_pos[j][0]
y2 = self.n_drones_pos[j][1]
if self._eucid_dist(x1, y1, x2, y2) > self.row_count // self.n_drones:
dist_count += 1
return dist_count
def _check_uncertain_mapped(self):
reward = 0.0
for k, v in self.uncertain_points.items():
if self.grid[k[0]][k[1]] == 0 and self.uncertain_points[k] == 1:
reward += 20.0
self.uncertain_points[k] = 0
return reward
def step(self, actions):
assert len(actions) == self.n_drones, "provide actions for each drone"
total_reward = 0.0
for i, action in enumerate(actions):
assert (
action >= 0 and action < self.action_size
), f"action should be in range:[0,{self.action_size})"
if action == 0:
self.n_drones_pos[i][1] -= self.step_size
elif action == 1:
self.n_drones_pos[i][0] += self.step_size
elif action == 2:
self.n_drones_pos[i][1] += self.step_size
elif action == 3:
self.n_drones_pos[i][0] -= self.step_size
self.n_drones_pos[i][0] = np.clip(
self.n_drones_pos[i][0], 0, self.row_count - 1
)
self.n_drones_pos[i][1] = np.clip(
self.n_drones_pos[i][1], 0, self.col_count - 1
)
total_reward += self._det_collision(
self.n_drones_pos[i][0], self.n_drones_pos[i][1]
)
punsh_flag = False
if self.step_func_count > ((self.row_count * self.col_count) / self.step_size):
punsh_flag = True
if punsh_flag:
total_reward -= 1.0
# total_reward -= (0.5) * (
# self.step_func_count
# - (self.row_count * self.col_count) / self.step_size
# ) ## Linearly increasing punishment as env runs
else:
total_reward += 1.0 if self._drone_dist() > 0 else 0.0
# total_reward += (0.2 * self._drone_dist()) * (1.01) ** (
# -self.step_func_count
# ) ## Exponentialy decreasing reward as drones spread out
self.step_func_count += 1
done = True
for i in range(self.row_count):
f = 0
for j in range(self.col_count):
if self.grid[i][j] > 0:
done = False
f = 1
break
if f == 1:
break
grid = self.grid.copy()
grid = np.array(grid)
grid = grid.T.flatten()
return grid, total_reward, done