-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathproblem.py
116 lines (85 loc) · 3.68 KB
/
problem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
"""Logic to combine dynamics and cost in one framework to simplify distribution"""
from time import perf_counter as pc
import numpy as np
from .control import ilqrSolver
from .cost import ReferenceCost, GameCost
from .dynamics import DynamicalModel, MultiDynamicalModel
from .util import split_agents_gen
class ilqrProblem:
"""Centralized optimal control problem that combines dynamics and cost"""
def __init__(self, dynamics, cost):
self.dynamics = dynamics
self.game_cost = cost
self.n_agents = 1
if isinstance(cost, GameCost):
self.n_agents = len(cost.ref_costs)
@property
def ids(self):
if not isinstance(self.dynamics, MultiDynamicalModel):
raise NotImplementedError(
"Only MultiDynamicalModel's have an 'ids' attribute"
)
if not self.dynamics.ids == self.game_cost.ids:
raise ValueError(f"Dynamics and cost have inconsistent ID's: {self}")
return self.dynamics.ids.copy()
def split(self, graph):
"""Split up this centralized problem into a list of distributed
sub-problems.
"""
split_dynamics = self.dynamics.split(graph)
split_costs = self.game_cost.split(graph)
return [
ilqrProblem(dynamics, cost)
for dynamics, cost in zip(split_dynamics, split_costs)
]
def extract(self, X, U, id_):
"""Extract the state and controls for a particular agent id_ from the
concatenated problem state/controls
"""
if id_ not in self.ids:
raise IndexError(f"Index {id_} not in ids: {self.ids}.")
# NOTE: Assume uniform dynamical models.
ext_ind = self.ids.index(id_)
x_dim = self.game_cost.x_dims[0]
u_dim = self.game_cost.u_dims[0]
Xi = X[:, ext_ind * x_dim : (ext_ind + 1) * x_dim]
Ui = U[:, ext_ind * u_dim : (ext_ind + 1) * u_dim]
return Xi, Ui
def selfish_warmstart(self, x0, N):
"""Compute a 'selfish' warmstart by ignoring other agents"""
print("=" * 80 + "\nComputing warmstart...")
# Split out the full problem into separate problems for each agent.
x0 = x0.reshape(-1, 1)
selfish_graph = {id_: [id_] for id_ in self.ids}
subproblems = self.split(selfish_graph)
U_warm = np.zeros((N, self.dynamics.n_u))
t0_all = pc()
for problem, x0i, id_ in zip(
subproblems, split_agents_gen(x0, self.game_cost.x_dims), self.ids
):
t0 = pc()
solver = ilqrSolver(problem, N)
_, Ui, _ = solver.solve(x0i)
print(f"Problem {id_}: Took {pc() - t0} seconds\n" + "=" * 60)
nu_i = problem.dynamics.n_u
ind_i = self.ids.index(id_)
U_warm[:, nu_i * ind_i : nu_i * (ind_i + 1)] = Ui
print(f"All: {self.ids}\nTook {pc() - t0_all} seconds\n" + "=" * 80)
return U_warm
def __repr__(self):
return f"ilqrProblem(\n\t{self.dynamics},\n\t{self.game_cost}\n)"
def solve_subproblem(args, **kwargs):
"""Solve the sub-problem and extract results for this agent"""
subproblem, x0, U, id_, verbose = args
N = U.shape[0]
subsolver = ilqrSolver(subproblem, N)
Xi, Ui, _ = subsolver.solve(x0, U, verbose=verbose, **kwargs)
return *subproblem.extract(Xi, Ui, id_), id_
def solve_subproblem_starmap(subproblem, x0, U, id_):
"""Package up the input arguments for compatiblity with mp.imap()."""
return solve_subproblem((subproblem, x0, U, id_))
def _reset_ids():
"""Set each of the agent specific ID's to zero for understandability"""
DynamicalModel._reset_ids()
ReferenceCost._reset_ids()