forked from cocagne/python-composable-paxos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomposable_paxos.py
370 lines (288 loc) · 13.4 KB
/
composable_paxos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
'''
This module provides an implementation of the Paxos algorithm as
a set of composable classes.
'''
import collections
# ProposalID
#
# In order for the Paxos algorithm to function, all proposal ids must be
# unique. A simple way to ensure this is to include the proposer's unique
# id in the proposal id.
#
# Python tuples allow the proposal number and the UID to be combined in a
# manner that supports comparison in the expected manner:
#
# (4,'C') > (4,'B') > (3,'Z')
#
# Named tuples from the collections module support all of the regular
# tuple operations but additionally allow access to the contents by
# name so the numeric component of the proposal ID may be referred to
# via 'proposal_id.number' instead of 'proposal_id[0]'.
#
ProposalID = collections.namedtuple('ProposalID', ['number', 'uid'])
class PaxosMessage (object):
'''
Base class for all messages defined in this module
'''
from_uid = None # Set by subclass constructor
class Prepare (PaxosMessage):
'''
Prepare messages should be broadcast to all Acceptors.
'''
def __init__(self, from_uid, proposal_id):
self.from_uid = from_uid
self.proposal_id = proposal_id
class Nack (PaxosMessage):
'''
NACKs are technically optional though few practical applications will
want to omit their use. They are used to signal a proposer that their
current proposal number is out of date and that a new one should be
chosen. NACKs may be sent in response to both Prepare and Accept
messages
'''
def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id):
self.from_uid = from_uid
self.proposal_id = proposal_id
self.proposer_uid = proposer_uid
self.promised_proposal_id = promised_proposal_id
class Promise (PaxosMessage):
'''
Promise messages should be sent to at least the Proposer specified in
the proposer_uid field
'''
def __init__(self, from_uid, proposer_uid, proposal_id, last_accepted_id, last_accepted_value):
self.from_uid = from_uid
self.proposer_uid = proposer_uid
self.proposal_id = proposal_id
self.last_accepted_id = last_accepted_id
self.last_accepted_value = last_accepted_value
class Accept (PaxosMessage):
'''
Accept messages should be broadcast to all Acceptors
'''
def __init__(self, from_uid, proposal_id, proposal_value):
self.from_uid = from_uid
self.proposal_id = proposal_id
self.proposal_value = proposal_value
class Accepted (PaxosMessage):
'''
Accepted messages should be sent to all Learners
'''
def __init__(self, from_uid, proposal_id, proposal_value):
self.from_uid = from_uid
self.proposal_id = proposal_id
self.proposal_value = proposal_value
class Resolution (PaxosMessage):
'''
Optional message used to indicate that the final value has been selected
'''
def __init__(self, from_uid, value):
self.from_uid = from_uid
self.value = value
class InvalidMessageError (Exception):
'''
Thrown if a PaxosMessage subclass is passed to a class that does not
support it
'''
class MessageHandler (object):
def receive(self, msg):
'''
Message dispatching function. This function accepts any PaxosMessage subclass and calls
the appropriate handler function
'''
handler = getattr(self, 'receive_' + msg.__class__.__name__.lower(), None)
if handler is None:
raise InvalidMessageError('Receiving class does not support messages of type: ' + msg.__class__.__name__)
return handler( msg )
class Proposer (MessageHandler):
'''
The 'leader' attribute is a boolean value indicating the Proposer's
belief in whether or not it is the current leader. This is not a reliable
value as multiple nodes may simultaneously believe themselves to be the
leader.
'''
leader = False
proposed_value = None
proposal_id = None
highest_accepted_id = None
promises_received = None
nacks_received = None
current_prepare_msg = None
current_accept_msg = None
def __init__(self, network_uid, quorum_size):
self.network_uid = network_uid
self.quorum_size = quorum_size
self.proposal_id = ProposalID(0, network_uid)
self.highest_proposal_id = ProposalID(0, network_uid)
def propose_value(self, value):
'''
Sets the proposal value for this node iff this node is not already aware of
a previous proposal value. If the node additionally believes itself to be
the current leader, an Accept message will be returned
'''
if self.proposed_value is None:
self.proposed_value = value
if self.leader:
self.current_accept_msg = Accept(self.network_uid, self.proposal_id, value)
return self.current_accept_msg
def prepare(self):
'''
Returns a new Prepare message with a proposal id higher than
that of any observed proposals. A side effect of this method is
to clear the leader flag if it is currently set.
'''
self.leader = False
self.promises_received = set()
self.nacks_received = set()
self.proposal_id = ProposalID(self.highest_proposal_id.number + 1, self.network_uid)
self.highest_proposal_id = self.proposal_id
self.current_prepare_msg = Prepare(self.network_uid, self.proposal_id)
return self.current_prepare_msg
def observe_proposal(self, proposal_id):
'''
Optional method used to update the proposal counter as proposals are
seen on the network. When co-located with Acceptors and/or Learners,
this method may be used to avoid a message delay when attempting to
assume leadership (guaranteed NACK if the proposal number is too low).
This method is automatically called for all received Promise and Nack
messages.
'''
if proposal_id > self.highest_proposal_id:
self.highest_proposal_id = proposal_id
def receive_nack(self, msg):
'''
Returns a new Prepare message if the number of Nacks received reaches
a quorum.
'''
self.observe_proposal( msg.promised_proposal_id )
if msg.proposal_id == self.proposal_id and self.nacks_received is not None:
self.nacks_received.add( msg.from_uid )
if len(self.nacks_received) == self.quorum_size:
return self.prepare() # Lost leadership or failed to acquire it
def receive_promise(self, msg):
'''
Returns an Accept messages if a quorum of Promise messages is achieved
'''
self.observe_proposal( msg.proposal_id )
if not self.leader and msg.proposal_id == self.proposal_id and msg.from_uid not in self.promises_received:
self.promises_received.add( msg.from_uid )
if msg.last_accepted_id > self.highest_accepted_id:
self.highest_accepted_id = msg.last_accepted_id
if msg.last_accepted_value is not None:
self.proposed_value = msg.last_accepted_value
if len(self.promises_received) == self.quorum_size:
self.leader = True
if self.proposed_value is not None:
self.current_accept_msg = Accept(self.network_uid, self.proposal_id, self.proposed_value)
return self.current_accept_msg
class Acceptor (MessageHandler):
'''
Acceptors act as the fault-tolerant memory for Paxos. To ensure correctness
in the presense of failure, Acceptors must be able to remember the promises
they've made even in the event of power outages. Consequently, any changes
to the promised_id, accepted_id, and/or accepted_value must be persisted to
stable media prior to sending promise and accepted messages.
When an Acceptor instance is composed alongside a Proposer instance, it
is generally advantageous to call the proposer's observe_proposal()
method when methods of this class are called.
'''
def __init__(self, network_uid, promised_id=None, accepted_id=None, accepted_value=None):
'''
promised_id, accepted_id, and accepted_value should be provided if and only if this
instance is recovering from persistent state.
'''
self.network_uid = network_uid
self.promised_id = promised_id
self.accepted_id = accepted_id
self.accepted_value = accepted_value
def receive_prepare(self, msg):
'''
Returns either a Promise or a Nack in response. The Acceptor's state must be persisted to disk
prior to transmitting the Promise message.
'''
if msg.proposal_id >= self.promised_id:
self.promised_id = msg.proposal_id
return Promise(self.network_uid, msg.from_uid, self.promised_id, self.accepted_id, self.accepted_value)
else:
return Nack(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id)
def receive_accept(self, msg):
'''
Returns either an Accepted or Nack message in response. The Acceptor's state must be persisted
to disk prior to transmitting the Accepted message.
'''
if msg.proposal_id >= self.promised_id:
self.promised_id = msg.proposal_id
self.accepted_id = msg.proposal_id
self.accepted_value = msg.proposal_value
return Accepted(self.network_uid, msg.proposal_id, msg.proposal_value)
else:
return Nack(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id)
class Learner (MessageHandler):
'''
This class listens to Accepted messages, determines when the final value is
selected, and tracks which peers have accepted the final value.
'''
class ProposalStatus (object):
__slots__ = ['accept_count', 'retain_count', 'acceptors', 'value']
def __init__(self, value):
self.accept_count = 0
self.retain_count = 0
self.acceptors = set()
self.value = value
def __init__(self, network_uid, quorum_size):
self.network_uid = network_uid
self.quorum_size = quorum_size
self.proposals = dict() # maps proposal_id => ProposalStatus
self.acceptors = dict() # maps from_uid => last_accepted_proposal_id
self.final_value = None
self.final_acceptors = None # Will be a set of acceptor UIDs once the final value is chosen
self.final_proposal_id = None
def receive_accepted(self, msg):
'''
Called when an Accepted message is received from an acceptor. Once the final value
is determined, the return value of this method will be a Resolution message containing
the consentual value. Subsequent calls after the resolution is chosen will continue to add
new Acceptors to the final_acceptors set and return Resolution messages.
'''
if self.final_value is not None:
if msg.proposal_id >= self.final_proposal_id and msg.proposal_value == self.final_value:
self.final_acceptors.add( msg.from_uid )
return Resolution(self.network_uid, self.final_value)
last_pn = self.acceptors.get(msg.from_uid)
if msg.proposal_id <= last_pn:
return # Old message
self.acceptors[ msg.from_uid ] = msg.proposal_id
if last_pn is not None:
ps = self.proposals[ last_pn ]
ps.retain_count -= 1
ps.acceptors.remove(msg.from_uid)
if ps.retain_count == 0:
del self.proposals[ last_pn ]
if not msg.proposal_id in self.proposals:
self.proposals[ msg.proposal_id ] = Learner.ProposalStatus(msg.proposal_value)
ps = self.proposals[ msg.proposal_id ]
assert msg.proposal_value == ps.value, 'Value mismatch for single proposal!'
ps.accept_count += 1
ps.retain_count += 1
ps.acceptors.add(msg.from_uid)
if ps.accept_count == self.quorum_size:
self.final_proposal_id = msg.proposal_id
self.final_value = msg.proposal_value
self.final_acceptors = ps.acceptors
self.proposals = None
self.acceptors = None
return Resolution( self.network_uid, self.final_value )
class PaxosInstance (Proposer, Acceptor, Learner):
'''
Aggregate Proposer, Accepter, & Learner class.
'''
def __init__(self, network_uid, quorum_size, promised_id=None, accepted_id=None, accepted_value=None):
Proposer.__init__(self, network_uid, quorum_size)
Acceptor.__init__(self, network_uid, promised_id, accepted_id, accepted_value)
Learner.__init__(self, network_uid, quorum_size)
def receive_prepare(self, msg):
self.observe_proposal( msg.proposal_id )
return super(PaxosInstance,self).receive_prepare(msg)
def receive_accept(self, msg):
self.observe_proposal( msg.proposal_id )
return super(PaxosInstance,self).receive_accept(msg)