-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsearch_node.py
178 lines (150 loc) · 5.7 KB
/
search_node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from collections import deque
from heapq import heappush, heappop, heapify
import random
# Constants of search methods
FIFO = 'FIFO'
LIFO = 'LIFO'
UCS = 'UCS'
SUM = 'SUM'
STAR = 'STAR'
HILL = 'HILL'
class SearchNodes:
"""
Class responsible for handling search nodes for 8 puzzle.
This class operates in many modes:
- BFS: FIFO Frontier
- DFS: LIFO Frontier
- UCS: Heap Frontier
- Greedy: Heap Frontier
- AStar: Heap Frontier
"""
def __init__(self, mode):
self._frontier = deque()
self._frontier_set = set()
self._explored = set()
self._movements = {}
self._mode = None
if mode == 'bfs':
self.mode = FIFO
elif mode == 'dfs':
self.mode = LIFO
elif mode == 'ucs':
self.mode = UCS
self._frontier = []
elif mode == 'greedy_sum':
self.mode = SUM
self._frontier = []
elif mode == 'astar':
self.mode = STAR
self._frontier = []
elif mode == 'hill':
self.mode = HILL
self._frontier = []
def push_frontier(self, state, parent, action, cost=None, depth=None):
parent_as_str = SearchNodes.puzzle_to_key(parent) if parent is not None else ''
node_as_str = SearchNodes.puzzle_to_key(state)
# Creates movement history for generating solution later
if parent is None:
self._movements[node_as_str] = ''
else:
cost = 0 if cost is None else cost
depth = depth if depth is not None else 0
history = ''.join([parent_as_str, '|', action[0], '|', str(cost), '|', str(depth)])
self._movements[node_as_str] = history
# Adds state to frontier set for easy search
self._frontier_set.add(node_as_str)
# Insert into frontier the puzzle
if self.mode == FIFO:
self._frontier.append(node_as_str)
elif self.mode == LIFO:
self._frontier.append(node_as_str)
elif self.mode == UCS:
heappush(self._frontier, (cost, node_as_str))
elif self.mode == SUM:
heappush(self._frontier, (cost, node_as_str))
elif self.mode == STAR:
heappush(self._frontier, (cost, node_as_str))
elif self.mode == HILL:
heappush(self._frontier, (cost, node_as_str, action))
def pop_frontier(self, return_cost=False, return_depth=False):
content = None
if self.mode == FIFO:
content = self._frontier.popleft()
elif self.mode == LIFO:
content = self._frontier.pop()
elif self.mode == UCS:
cost, content = heappop(self._frontier)
elif self.mode == SUM:
cost, content = heappop(self._frontier)
elif self.mode == STAR:
cost, content = heappop(self._frontier)
elif self.mode == HILL:
cost, content, action = self.hill_climb_pop()
try:
self._frontier_set.remove(content)
except:
None
result = []
content_as_list = [int(x) for x in list(content)]
result.append(content_as_list)
if return_cost:
result.append(cost)
if return_depth:
depth = self.get_depth(content)
result.append(depth)
if self.mode == HILL:
result.append(action)
return tuple(result)
def frontier_has_next(self):
return len(self._frontier) > 0
def push_explored(self, state):
state_as_str = SearchNodes.puzzle_to_key(state)
if state_as_str not in self._explored:
self._explored.add(state_as_str)
def is_explored(self, state):
state_as_str = SearchNodes.puzzle_to_key(state)
return state_as_str in self._explored
def is_in_frontier(self, state):
state_as_str = ''.join(str(x) for x in state)
return state_as_str in self._frontier_set
def swap_frontier_if_better(self, puzzle, parent, action, cost, depth):
node_as_str = SearchNodes.puzzle_to_key(puzzle)
parent_as_str = SearchNodes.puzzle_to_key(parent)
node_hist, action_hist, cost_hist, depth = self._movements[node_as_str].split('|')
if int(cost_hist) > cost:
self._movements[node_as_str] = ''.join([parent_as_str, '|', action, '|', str(cost), '|', str(depth)])
index = self._frontier.index((int(cost_hist), node_as_str))
self._frontier[index] = (cost, node_as_str)
heapify(self._frontier)
def get_solution(self, puzzle, action):
puzzle_as_str = SearchNodes.puzzle_to_key(puzzle)
parent = self._movements[puzzle_as_str]
solution = action[0] if action is not None else ''
while parent is not None and parent != '':
parent_str, movement, cost, depth = parent.split('|')
solution = ''.join([solution, movement])
parent = self._movements[parent_str]
if parent == '':
parent = None
return solution[::-1]
def get_depth(self, puzzle):
history = self._movements[puzzle]
if history == '':
return 0
_, _, _, depth = history.split('|')
return int(depth)
def hill_climb_pop(self):
costs = []
cost, content, action = heappop(self._frontier)
costs.append((cost, content, action))
for c, con, act in self._frontier:
if c == cost:
costs.append((c, con, act))
self._frontier = []
return random.choice(costs)
@staticmethod
def puzzle_to_key(puzzle):
return ''.join(str(x) for x in puzzle)
@staticmethod
def s(tups, elem):
return filter(lambda tup: elem in tup, tups)