forked from YangRui2015/2048_env
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgym_2048.py
313 lines (255 loc) · 9.79 KB
/
gym_2048.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
from __future__ import print_function
import gym
from gym import spaces
from gym.utils import seeding
import numpy as np
import itertools
import logging
from six import StringIO
import sys
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
class IllegalMove(Exception):
pass
def stack(flat, layers=16):
larray = []
for i in range(1, layers + 1):
ii = 2 ** i
layer = np.copy(flat)
layer[layer != ii] = 0
layer[layer == ii] = 1
#print("Layer")
#print(layer)
#print(layer.shape)
larray.append(layer)
newstack = np.stack(larray, axis=-1)
return newstack
class Game2048Env(gym.Env): # directions 0, 1, 2, 3 are up, right, down, left
metadata = {'render.modes': ['human', 'ansi']}
max_steps = 10000
def __init__(self):
# Definitions for game. Board must be square.
self.size = 4
self.w = self.size
self.h = self.size
self.squares = self.size * self.size
# Maintain own idea of game score, separate from rewards
self.score = 0
# Members for gym implementation
self.action_space = spaces.Discrete(4)
# Suppose that the maximum tile is as if you have powers of 2 across the board.
layers = self.squares
self.observation_space = spaces.Box(0, 1, (self.w, self.h, layers), dtype=np.int)
self.set_illegal_move_reward(0.)
self.set_max_tile(None)
self.max_illegal = 10 # max number of illegal actions
self.num_illegal = 0
# Initialise seed
self.seed()
# # Reset ready for a game
# self.reset()
def _get_info(self, info=None):
if not info:
info = {}
else:
assert type(info) == dict, 'info should be of type dict!'
info['highest'] = self.highest()
info['score'] = self.score
info['steps'] = self.steps
return info
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def set_illegal_move_reward(self, reward):
"""Define the reward/penalty for performing an illegal move. Also need
to update the reward range for this."""
# Guess that the maximum reward is also 2**squares though you'll probably never get that.
# (assume that illegal move reward is the lowest value that can be returned
self.illegal_move_reward = reward
self.reward_range = (self.illegal_move_reward, float(2**self.squares))
def set_max_tile(self, max_tile):
"""Define the maximum tile that will end the game (e.g. 2048). None means no limit.
This does not affect the state returned."""
assert max_tile is None or isinstance(max_tile, int)
self.max_tile = max_tile
# Implement gym interface
def step(self, action):
"""Perform one step of the game. This involves moving and adding a new tile."""
logging.debug("Action {}".format(action))
self.steps += 1
score = 0
done = None
info = {
'illegal_move': False,
}
try:
score = float(self.move(action))
self.score += score
assert score <= 2**(self.w*self.h)
self.add_tile()
done = self.isend()
reward = float(score)
except IllegalMove as e:
logging.debug("Illegal move")
info['illegal_move'] = True
if self.steps > self.max_steps:
done = True
else:
done = False
reward = self.illegal_move_reward
self.num_illegal += 1
if self.num_illegal >= self.max_illegal: # exceed the maximum number of illegal actions
done = True
info = self._get_info(info)
# Return observation (board state), reward, done and info dict
return self.Matrix, reward, done, info
def reset(self):
self.Matrix = np.zeros((self.h, self.w), np.int)
self.score = 0
self.steps = 0
self.num_illegal = 0
logging.debug("Adding tiles")
self.add_tile()
self.add_tile()
return self.Matrix, 0, False, self._get_info()
def render(self, mode='human'):
outfile = StringIO() if mode == 'ansi' else sys.stdout
s = 'Score: {}\n'.format(self.score)
s += 'Highest: {}\n'.format(self.highest())
npa = np.array(self.Matrix)
grid = npa.reshape((self.size, self.size))
s += "{}\n\n".format(grid)
outfile.write(s)
return outfile
# Implement 2048 game
def add_tile(self):
"""Add a tile, probably a 2 but maybe a 4"""
possible_tiles = np.array([2, 4])
tile_probabilities = np.array([0.9, 0.1])
val = self.np_random.choice(possible_tiles, 1, p=tile_probabilities)[0]
empties = self.empties()
assert empties.shape[0]
empty_idx = self.np_random.choice(empties.shape[0])
empty = empties[empty_idx]
logging.debug("Adding %s at %s", val, (empty[0], empty[1]))
self.set(empty[0], empty[1], val)
def get(self, x, y):
"""Return the value of one square."""
return self.Matrix[x, y]
def set(self, x, y, val):
"""Set the value of one square."""
self.Matrix[x, y] = val
def empties(self):
"""Return a 2d numpy array with the location of empty squares."""
return np.argwhere(self.Matrix == 0)
def highest(self):
"""Report the highest tile on the board."""
return np.max(self.Matrix)
def move(self, direction, trial=False):
"""Perform one move of the game. Shift things to one side then,
combine. directions 0, 1, 2, 3 are up, right, down, left.
Returns the score that [would have] got."""
if not trial:
if direction == 0:
logging.debug("Up")
elif direction == 1:
logging.debug("Right")
elif direction == 2:
logging.debug("Down")
elif direction == 3:
logging.debug("Left")
changed = False
move_score = 0
dir_div_two = int(direction / 2)
dir_mod_two = int(direction % 2)
shift_direction = dir_mod_two ^ dir_div_two # 0 for towards up left, 1 for towards bottom right
# Construct a range for extracting row/column into a list
rx = list(range(self.w))
ry = list(range(self.h))
if dir_mod_two == 0:
# Up or down, split into columns
for y in range(self.h):
old = [self.get(x, y) for x in rx]
(new, ms) = self.shift(old, shift_direction)
move_score += ms
if old != new:
changed = True
if not trial:
for x in rx:
self.set(x, y, new[x])
else:
# Left or right, split into rows
for x in range(self.w):
old = [self.get(x, y) for y in ry]
(new, ms) = self.shift(old, shift_direction)
move_score += ms
if old != new:
changed = True
if not trial:
for y in ry:
self.set(x, y, new[y])
if changed != True:
raise IllegalMove
return move_score
def combine(self, shifted_row):
"""Combine same tiles when moving to one side. This function always
shifts towards the left. Also count the score of combined tiles."""
move_score = 0
combined_row = [0] * self.size
skip = False
output_index = 0
for p in pairwise(shifted_row):
if skip:
skip = False
continue
combined_row[output_index] = p[0]
if p[0] == p[1]:
combined_row[output_index] += p[1]
move_score += p[0] + p[1]
# Skip the next thing in the list.
skip = True
output_index += 1
if shifted_row and not skip:
combined_row[output_index] = shifted_row[-1]
return (combined_row, move_score)
def shift(self, row, direction):
"""Shift one row left (direction == 0) or right (direction == 1), combining if required."""
length = len(row)
assert length == self.size
assert direction == 0 or direction == 1
# Shift all non-zero digits up
shifted_row = [i for i in row if i != 0]
# Reverse list to handle shifting to the right
if direction:
shifted_row.reverse()
(combined_row, move_score) = self.combine(shifted_row)
# Reverse list to handle shifting to the right
if direction:
combined_row.reverse()
assert len(combined_row) == self.size
return (combined_row, move_score)
def isend(self):
"""Has the game ended. Game ends if there is a tile equal to the limit
or there are no legal moves. If there are empty spaces then there
must be legal moves."""
if self.max_tile is not None and self.highest() == self.max_tile:
return True
if self.steps >= self.max_steps:
return True
for direction in range(4):
try:
self.move(direction, trial=True)
# Not the end if we can do any move
return False
except IllegalMove:
pass
return True
def get_board(self):
"""Retrieve the whole board, useful for testing."""
return self.Matrix
def set_board(self, new_board):
"""Retrieve the whole board, useful for testing."""
self.Matrix = new_board