-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.py
215 lines (183 loc) · 8.95 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import collections, random
from typing import List, Tuple, Dict, Any
# An abstract class representing a Markov Decision Process (MDP).
class MDP:
# Return the start state.
def startState(self) -> Tuple: raise NotImplementedError("Override me")
# Return set of actions possible from |state|.
def actions(self, state: Tuple) -> List[Any]: raise NotImplementedError("Override me")
# Return a list of (newState, prob, reward) tuples corresponding to edges
# coming out of |state|.
# Mapping to notation from class:
# state = s, action = a, newState = s', prob = T(s, a, s'), reward = Reward(s, a, s')
# If IsEnd(state), return the empty list.
def succAndProbReward(self, state: Tuple, action: Any) -> List[Tuple]: raise NotImplementedError("Override me")
def discount(self): raise NotImplementedError("Override me")
# Compute set of states reachable from startState. Helper function for
# MDPAlgorithms to know which states to compute values and policies for.
# This function sets |self.states| to be the set of all states.
def computeStates(self):
self.states = set()
queue = []
self.states.add(self.startState())
queue.append(self.startState())
while len(queue) > 0:
state = queue.pop()
for action in self.actions(state):
for newState, prob, reward in self.succAndProbReward(state, action):
if newState not in self.states:
self.states.add(newState)
queue.append(newState)
# print ("%d states" % len(self.states))
# print (self.states)
############################################################
# A simple example of an MDP where states are integers in [-n, +n].
# and actions involve moving left and right by one position.
# We get rewarded for going to the right.
class NumberLineMDP(MDP):
def __init__(self, n=5): self.n = n
def startState(self): return 0
def actions(self, state): return [-1, +1]
def succAndProbReward(self, state, action):
return [(state, 0.4, 0),
(min(max(state + action, -self.n), +self.n), 0.6, state)]
def discount(self): return 0.9
############################################################
# An algorithm that solves an MDP (i.e., computes the optimal
# policy).
class MDPAlgorithm:
# Set:
# - self.pi: optimal policy (mapping from state to action)
# - self.V: values (mapping from state to best values)
def solve(self, mdp: MDP): raise NotImplementedError("Override me")
############################################################
class ValueIteration(MDPAlgorithm):
'''
Solve the MDP using value iteration. Your solve() method must set
- self.V to the dictionary mapping states to optimal values
- self.pi to the dictionary mapping states to an optimal action
Note: epsilon is the error tolerance: you should stop value iteration when
all of the values change by less than epsilon.
The ValueIteration class is a subclass of util.MDPAlgorithm (see util.py).
'''
def solve(self, mdp: MDP, epsilon=0.001):
mdp.computeStates()
def computeQ(mdp: MDP, V: Dict[Tuple, float], state: Tuple, action: Any) -> float:
# Return Q(state, action) based on V(state).
return sum(prob * (reward + mdp.discount() * V[newState]) \
for newState, prob, reward in mdp.succAndProbReward(state, action))
def computeOptimalPolicy(mdp: MDP, V: Dict[Tuple, float]) -> Dict[Tuple, Any]:
# Return the optimal policy given the values V.
pi = {}
for state in mdp.states:
pi[state] = max((computeQ(mdp, V, state, action), action) for action in mdp.actions(state))[1]
return pi
V = collections.defaultdict(float) # state -> value of state
numIters = 0
while True:
newV = {}
for state in mdp.states:
# This evaluates to zero for end states, which have no available actions (by definition)
# print(f"state {state} found value {newV[state]}")
newV[state] = max(computeQ(mdp, V, state, action) for action in mdp.actions(state))
numIters += 1
if max(abs(V[state] - newV[state]) for state in mdp.states) < epsilon:
V = newV
break
V = newV
# Compute the optimal policy now
pi = computeOptimalPolicy(mdp, V)
print(("ValueIteration: %d iterations" % numIters))
self.pi = pi
self.V = V
############################################################
############################################################
# Abstract class: an RLAlgorithm performs reinforcement learning. All it needs
# to know is the set of available actions to take. The simulator (see
# simulate()) will call getAction() to get an action, perform the action, and
# then provide feedback (via incorporateFeedback()) to the RL algorithm, so it can adjust
# its parameters.
class RLAlgorithm:
# Your algorithm will be asked to produce an action given a state.
def getAction(self, state: Tuple) -> Any: raise NotImplementedError("Override me")
# We will call this function when simulating an MDP, and you should update
# parameters.
# If |state| is a terminal state, this function will be called with (s, a,
# 0, None). When this function is called, it indicates that taking action
# |action| in state |state| resulted in reward |reward| and a transition to state
# |newState|.
def incorporateFeedback(self, state: Tuple, action: Any, reward: int, newState: Tuple): raise NotImplementedError("Override me")
# An RL algorithm that acts according to a fixed policy |pi| and doesn't
# actually do any learning.
class FixedRLAlgorithm(RLAlgorithm):
def __init__(self, pi: Dict[Tuple, Any]): self.pi = pi
# Just return the action given by the policy.
def getAction(self, state: Tuple) -> Any: return self.pi[state]
# Don't do anything: just stare off into space.
def incorporateFeedback(self, state: Tuple, action: Any, reward: int, newState: Tuple): pass
############################################################
# Perform |numTrials| of the following:
# On each trial, take the MDP |mdp| and an RLAlgorithm |rl| and simulates the
# RL algorithm according to the dynamics of the MDP.
# Each trial will run for at most |maxIterations|.
# Return the list of rewards that we get for each trial.
def simulate(mdp: MDP, rl: RLAlgorithm, numTrials=10, maxIterations=1000, verbose=False,
sort=False):
# Return i in [0, ..., len(probs)-1] with probability probs[i].
def sample(probs):
target = random.random()
accum = 0
for i, prob in enumerate(probs):
accum += prob
if accum >= target: return i
raise Exception("Invalid probs: %s" % probs)
totalRewards = [] # The rewards we get on each trial
for trial in range(numTrials):
state = mdp.startState()
sequence = [state]
totalDiscount = 1
totalReward = 0
for _ in range(maxIterations):
action = rl.getAction(state)
transitions = mdp.succAndProbReward(state, action)
# ah, I think this sort is throwing things off
if sort: transitions = sorted(transitions)
if len(transitions) == 0:
rl.incorporateFeedback(state, action, 0, None)
break
# Choose a random transition
i = sample([prob for newState, prob, reward in transitions])
newState, prob, reward = transitions[i]
sequence.append(action)
sequence.append(reward)
sequence.append(newState)
rl.incorporateFeedback(state, action, reward, newState)
totalReward += totalDiscount * reward
totalDiscount *= mdp.discount()
state = newState
if verbose:
print(("Trial %d (totalReward = %s): %s" % (trial, totalReward, sequence)))
totalRewards.append(totalReward)
return totalRewards
# sample a random trajectory through the MDP.
# Start at the initial state and sample the optimal state from
# the optimal policy. Then, randomly sample the next state from
# the possible next states weighted by the probabilities per-state.
# returns
def sample_trajectory(mdp : MDP, policy: MDPAlgorithm) -> List[Any]:
# get the first state
traj = []
state = mdp.startState()
while True:
# look up optimal action for that state
act = policy.pi[state]
# this returns a list of next states
traj.append(act)
next_states = mdp.succAndProbReward(state, act)
if len(next_states) < 1:
break
# now, randomly sample one of the states weighted by probability
probs = [n[1] for n in next_states]
# then you go to the next state chosen
state = random.choices(next_states, weights=probs, k=1)[0][0]
return traj