-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhungarian.py
480 lines (398 loc) · 19.7 KB
/
hungarian.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
#!/usr/bin/python
"""
Implementation of the Hungarian (Munkres) Algorithm using Python and NumPy
References: http://www.ams.jhu.edu/~castello/362/Handouts/hungarian.pdf
http://weber.ucsd.edu/~vcrawfor/hungar.pdf
http://en.wikipedia.org/wiki/Hungarian_algorithm
http://www.public.iastate.edu/~ddoty/HungarianAlgorithm.html
http://www.clapper.org/software/python/munkres/
"""
# Module Information.
__version__ = "1.1.1"
__author__ = "Thom Dedecko"
__url__ = "http://github.com/tdedecko/hungarian-algorithm"
__copyright__ = "(c) 2010 Thom Dedecko"
__license__ = "MIT License"
class HungarianError(Exception):
pass
# Import numpy. Error if fails
try:
import numpy as np
except ImportError:
raise HungarianError("NumPy is not installed.")
class Hungarian:
"""
Implementation of the Hungarian (Munkres) Algorithm using np.
Usage:
hungarian = Hungarian(cost_matrix)
hungarian.calculate()
or
hungarian = Hungarian()
hungarian.calculate(cost_matrix)
Handle Profit matrix:
hungarian = Hungarian(profit_matrix, is_profit_matrix=True)
or
cost_matrix = Hungarian.make_cost_matrix(profit_matrix)
The matrix will be automatically padded if it is not square.
For that numpy's resize function is used, which automatically adds 0's to any row/column that is added
Get results and total potential after calculation:
hungarian.get_results()
hungarian.get_total_potential()
"""
def __init__(self, input_matrix=None, is_profit_matrix=False):
"""
input_matrix is a List of Lists.
input_matrix is assumed to be a cost matrix unless is_profit_matrix is True.
"""
if input_matrix is not None:
# Save input
my_matrix = np.array(input_matrix)
self._input_matrix = np.array(input_matrix)
self._maxColumn = my_matrix.shape[1]
self._maxRow = my_matrix.shape[0]
# Adds 0s if any columns/rows are added. Otherwise stays unaltered
matrix_size = max(self._maxColumn, self._maxRow)
pad_columns = matrix_size - self._maxRow
pad_rows = matrix_size - self._maxColumn
my_matrix = np.pad(my_matrix, ((0,pad_columns),(0,pad_rows)), 'constant', constant_values=(0))
# Convert matrix to profit matrix if necessary
if is_profit_matrix:
my_matrix = self.make_cost_matrix(my_matrix)
self._cost_matrix = my_matrix
self._size = len(my_matrix)
self._shape = my_matrix.shape
# Results from algorithm.
self._results = []
self._totalPotential = 0
else:
self._cost_matrix = None
def get_results(self):
"""Get results after calculation."""
return self._results
def get_total_potential(self):
"""Returns expected value after calculation."""
return self._totalPotential
def calculate(self, input_matrix=None, is_profit_matrix=False):
"""
Implementation of the Hungarian (Munkres) Algorithm.
input_matrix is a List of Lists.
input_matrix is assumed to be a cost matrix unless is_profit_matrix is True.
"""
# Handle invalid and new matrix inputs.
if input_matrix is None and self._cost_matrix is None:
raise HungarianError("Invalid input")
elif input_matrix is not None:
self.__init__(input_matrix, is_profit_matrix)
result_matrix = self._cost_matrix.copy()
# Step 1: Subtract row mins from each row.
for index, row in enumerate(result_matrix):
result_matrix[index] -= row.min()
# Step 2: Subtract column mins from each column.
for index, column in enumerate(result_matrix.T):
result_matrix[:, index] -= column.min()
# Step 3: Use minimum number of lines to cover all zeros in the matrix.
# If the total covered rows+columns is not equal to the matrix size then adjust matrix and repeat.
total_covered = 0
while total_covered < self._size:
# Find minimum number of lines to cover all zeros in the matrix and find total covered rows and columns.
cover_zeros = CoverZeros(result_matrix)
covered_rows = cover_zeros.get_covered_rows()
covered_columns = cover_zeros.get_covered_columns()
total_covered = len(covered_rows) + len(covered_columns)
# if the total covered rows+columns is not equal to the matrix size then adjust it by min uncovered num (m).
if total_covered < self._size:
result_matrix = self._adjust_matrix_by_min_uncovered_num(result_matrix, covered_rows, covered_columns)
# Step 4: Starting with the top row, work your way downwards as you make assignments.
# Find single zeros in rows or columns.
# Add them to final result and remove them and their associated row/column from the matrix.
expected_results = min(self._maxColumn, self._maxRow)
zero_locations = (result_matrix == 0)
while len(self._results) != expected_results:
# If number of zeros in the matrix is zero before finding all the results then an error has occurred.
if not zero_locations.any():
raise HungarianError("Unable to find results. Algorithm has failed.")
# Find results and mark rows and columns for deletion
matched_rows, matched_columns = self.__find_matches(zero_locations)
# Make arbitrary selection
total_matched = len(matched_rows) + len(matched_columns)
if total_matched == 0:
matched_rows, matched_columns = self.select_arbitrary_match(zero_locations)
# Delete rows and columns
for row in matched_rows:
zero_locations[row] = False
for column in matched_columns:
zero_locations[:, column] = False
# Save Results
self.__set_results(zip(matched_rows, matched_columns))
# Calculate total potential
value = 0
for row, column in self._results:
value += self._input_matrix[row, column]
self._totalPotential = value
@staticmethod
def make_cost_matrix(profit_matrix):
"""
Converts a profit matrix into a cost matrix.
Expects NumPy objects as input.
"""
# subtract profit matrix from a matrix made of the max value of the profit matrix
matrix_shape = profit_matrix.shape
offset_matrix = np.ones(matrix_shape, dtype=int) * profit_matrix.max()
cost_matrix = offset_matrix - profit_matrix
return cost_matrix
def _adjust_matrix_by_min_uncovered_num(self, result_matrix, covered_rows, covered_columns):
"""Subtract m from every uncovered number and add m to every element covered with two lines."""
# Calculate minimum uncovered number (m)
elements = []
for row_index, row in enumerate(result_matrix):
if row_index not in covered_rows:
for index, element in enumerate(row):
if index not in covered_columns:
elements.append(element)
min_uncovered_num = min(elements)
# Add m to every covered element
adjusted_matrix = result_matrix
for row in covered_rows:
adjusted_matrix[row] += min_uncovered_num
for column in covered_columns:
adjusted_matrix[:, column] += min_uncovered_num
# Subtract m from every element
m_matrix = np.ones(self._shape, dtype=int) * min_uncovered_num
adjusted_matrix -= m_matrix
return adjusted_matrix
def __find_matches(self, zero_locations):
"""Returns rows and columns with matches in them."""
marked_rows = np.array([], dtype=int)
marked_columns = np.array([], dtype=int)
# Mark rows and columns with matches
# Iterate over rows
for index, row in enumerate(zero_locations):
row_index = np.array([index])
if np.sum(row) == 1:
column_index, = np.where(row)
marked_rows, marked_columns = self.__mark_rows_and_columns(marked_rows, marked_columns, row_index,
column_index)
# Iterate over columns
for index, column in enumerate(zero_locations.T):
column_index = np.array([index])
if np.sum(column) == 1:
row_index, = np.where(column)
marked_rows, marked_columns = self.__mark_rows_and_columns(marked_rows, marked_columns, row_index,
column_index)
return marked_rows, marked_columns
@staticmethod
def __mark_rows_and_columns(marked_rows, marked_columns, row_index, column_index):
"""Check if column or row is marked. If not marked then mark it."""
new_marked_rows = marked_rows
new_marked_columns = marked_columns
if not (marked_rows == row_index).any() and not (marked_columns == column_index).any():
new_marked_rows = np.insert(marked_rows, len(marked_rows), row_index)
new_marked_columns = np.insert(marked_columns, len(marked_columns), column_index)
return new_marked_rows, new_marked_columns
@staticmethod
def select_arbitrary_match(zero_locations):
"""Selects row column combination with minimum number of zeros in it."""
# Count number of zeros in row and column combinations
rows, columns = np.where(zero_locations)
zero_count = []
for index, row in enumerate(rows):
total_zeros = np.sum(zero_locations[row]) + np.sum(zero_locations[:, columns[index]])
zero_count.append(total_zeros)
# Get the row column combination with the minimum number of zeros.
indices = zero_count.index(min(zero_count))
row = np.array([rows[indices]])
column = np.array([columns[indices]])
return row, column
def __set_results(self, result_lists):
"""Set results during calculation."""
# Check if results values are out of bound from input matrix (because of matrix being padded).
# Add results to results list.
for result in result_lists:
row, column = result
if row < self._maxRow and column < self._maxColumn:
new_result = (int(row), int(column))
self._results.append(new_result)
class CoverZeros:
"""
Use minimum number of lines to cover all zeros in the matrix.
Algorithm based on: http://weber.ucsd.edu/~vcrawfor/hungar.pdf
"""
def __init__(self, matrix):
"""
Input a matrix and save it as a boolean matrix to designate zero locations.
Run calculation procedure to generate results.
"""
# Find zeros in matrix
self._zero_locations = (matrix == 0)
self._shape = matrix.shape
# Choices starts without any choices made.
self._choices = np.zeros(self._shape, dtype=bool)
self._marked_rows = []
self._marked_columns = []
# marks rows and columns
self.__calculate()
# Draw lines through all unmarked rows and all marked columns.
self._covered_rows = list(set(range(self._shape[0])) - set(self._marked_rows))
self._covered_columns = self._marked_columns
def get_covered_rows(self):
"""Return list of covered rows."""
return self._covered_rows
def get_covered_columns(self):
"""Return list of covered columns."""
return self._covered_columns
def __calculate(self):
"""
Calculates minimum number of lines necessary to cover all zeros in a matrix.
Algorithm based on: http://weber.ucsd.edu/~vcrawfor/hungar.pdf
"""
while True:
# Erase all marks.
self._marked_rows = []
self._marked_columns = []
# Mark all rows in which no choice has been made.
for index, row in enumerate(self._choices):
if not row.any():
self._marked_rows.append(index)
# If no marked rows then finish.
if not self._marked_rows:
return True
# Mark all columns not already marked which have zeros in marked rows.
num_marked_columns = self.__mark_new_columns_with_zeros_in_marked_rows()
# If no new marked columns then finish.
if num_marked_columns == 0:
return True
# While there is some choice in every marked column.
while self.__choice_in_all_marked_columns():
# Some Choice in every marked column.
# Mark all rows not already marked which have choices in marked columns.
num_marked_rows = self.__mark_new_rows_with_choices_in_marked_columns()
# If no new marks then Finish.
if num_marked_rows == 0:
return True
# Mark all columns not already marked which have zeros in marked rows.
num_marked_columns = self.__mark_new_columns_with_zeros_in_marked_rows()
# If no new marked columns then finish.
if num_marked_columns == 0:
return True
# No choice in one or more marked columns.
# Find a marked column that does not have a choice.
choice_column_index = self.__find_marked_column_without_choice()
while choice_column_index is not None:
# Find a zero in the column indexed that does not have a row with a choice.
choice_row_index = self.__find_row_without_choice(choice_column_index)
# Check if an available row was found.
new_choice_column_index = None
if choice_row_index is None:
# Find a good row to accomodate swap. Find its column pair.
choice_row_index, new_choice_column_index = \
self.__find_best_choice_row_and_new_column(choice_column_index)
# Delete old choice.
self._choices[choice_row_index, new_choice_column_index] = False
# Set zero to choice.
self._choices[choice_row_index, choice_column_index] = True
# Loop again if choice is added to a row with a choice already in it.
choice_column_index = new_choice_column_index
def __mark_new_columns_with_zeros_in_marked_rows(self):
"""Mark all columns not already marked which have zeros in marked rows."""
num_marked_columns = 0
for index, column in enumerate(self._zero_locations.T):
if index not in self._marked_columns:
if column.any():
row_indices, = np.where(column)
zeros_in_marked_rows = (set(self._marked_rows) & set(row_indices)) != set([])
if zeros_in_marked_rows:
self._marked_columns.append(index)
num_marked_columns += 1
return num_marked_columns
def __mark_new_rows_with_choices_in_marked_columns(self):
"""Mark all rows not already marked which have choices in marked columns."""
num_marked_rows = 0
for index, row in enumerate(self._choices):
if index not in self._marked_rows:
if row.any():
column_index, = np.where(row)
if column_index in self._marked_columns:
self._marked_rows.append(index)
num_marked_rows += 1
return num_marked_rows
def __choice_in_all_marked_columns(self):
"""Return Boolean True if there is a choice in all marked columns. Returns boolean False otherwise."""
for column_index in self._marked_columns:
if not self._choices[:, column_index].any():
return False
return True
def __find_marked_column_without_choice(self):
"""Find a marked column that does not have a choice."""
for column_index in self._marked_columns:
if not self._choices[:, column_index].any():
return column_index
raise HungarianError(
"Could not find a column without a choice. Failed to cover matrix zeros. Algorithm has failed.")
def __find_row_without_choice(self, choice_column_index):
"""Find a row without a choice in it for the column indexed. If a row does not exist then return None."""
row_indices, = np.where(self._zero_locations[:, choice_column_index])
for row_index in row_indices:
if not self._choices[row_index].any():
return row_index
# All rows have choices. Return None.
return None
def __find_best_choice_row_and_new_column(self, choice_column_index):
"""
Find a row index to use for the choice so that the column that needs to be changed is optimal.
Return a random row and column if unable to find an optimal selection.
"""
row_indices, = np.where(self._zero_locations[:, choice_column_index])
for row_index in row_indices:
column_indices, = np.where(self._choices[row_index])
column_index = column_indices[0]
if self.__find_row_without_choice(column_index) is not None:
return row_index, column_index
# Cannot find optimal row and column. Return a random row and column.
from random import shuffle
shuffle(row_indices)
column_index, = np.where(self._choices[row_indices[0]])
return row_indices[0], column_index[0]
# if __name__ == '__main__':
# profit_matrix = [
# [62, 75, 80, 93, 95, 97],
# [75, 80, 82, 85, 71, 97],
# [80, 75, 81, 98, 90, 97],
# [78, 82, 84, 80, 50, 98],
# [90, 85, 85, 80, 85, 99],
# [65, 75, 80, 75, 68, 96]]
#
# hungarian = Hungarian(profit_matrix, is_profit_matrix=True)
# hungarian.calculate()
# print("Expected value:\t\t543")
# print("Calculated value:\t", hungarian.get_total_potential()) # = 543
# print("Expected results:\n\t[(0, 4), (2, 3), (5, 5), (4, 0), (1, 1), (3, 2)]")
# print("Results:\n\t", hungarian.get_results())
# print("-" * 80)
#
# cost_matrix = [
# [4, 2, 8],
# [4, 3, 7],
# [3, 1, 6]]
# hungarian = Hungarian(cost_matrix)
# print('calculating...')
# hungarian.calculate()
# print("Expected value:\t\t12")
# print("Calculated value:\t", hungarian.get_total_potential()) # = 12
# print("Expected results:\n\t[(0, 1), (1, 0), (2, 2)]")
# print("Results:\n\t", hungarian.get_results())
# print("-" * 80)
#
# profit_matrix = [
# [62, 75, 80, 93, 0, 97],
# [75, 0, 82, 85, 71, 97],
# [80, 75, 81, 0, 90, 97],
# [78, 82, 0, 80, 50, 98],
# [0, 85, 85, 80, 85, 99],
# [65, 75, 80, 75, 68, 0]]
# hungarian = Hungarian()
# hungarian.calculate(profit_matrix, is_profit_matrix=True)
# print("Expected value:\t\t523")
# print("Calculated value:\t", hungarian.get_total_potential()) # = 523
# print("Expected results:\n\t[(0, 3), (2, 4), (3, 0), (5, 2), (1, 5), (4, 1)]")
# print("Results:\n\t", hungarian.get_results())
# print("-" * 80)