-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path3700kvstore
383 lines (305 loc) · 11.5 KB
/
3700kvstore
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
#!/usr/bin/env python3
import json
import random
import select
import socket
import sys
import time
# Your ID number
my_id = sys.argv[1]
# The ID numbers of all the other replicas
replica_ids = sys.argv[2:]
# Connect to the network. All messages to/from other replicas and clients will
# occur over this socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
sock.connect(my_id)
last_append = time.time() # Time most recent AppendEntries RPC was received
data = {} # Storage for applied messages
# * STATE VARIABLES *
# PERSISTENT
# These must be updated before replying to an RPC
currentTerm = 0 # Leadership term to this server's best knowledge (TSBK)
votedFor = "FFFF" # Leader endorsed in this term,
leader = "FFFF" # Current leader TSBK
log = [] # Command log
vote_received = set()
# VOLATILE
# These can generally be updated at will
commitIndex = 0 # Latest committed log command's index TSBK
lastApplied = 0 # Latest applied log command's index
lastSent = 0 # Last message sent in last round's index
nextIndex = {} # [LEADER ONLY] Next log entry's index for each server
matchIndex = {} # [LEADER ONLY] Latest logged entry index for each server TSBK
# ----------------
# * SEND HELPERS *
# ----------------
# FAIL TYPE
def send_fail(dst, mid):
fail_msg = {'src': my_id, 'dst': dst, 'leader': leader, 'type': 'fail', 'MID': mid}
sock.send(bytes(json.dumps(fail_msg), encoding='utf8'))
# REDIRECT TYPE
def send_redirect(dst, mid):
redirect_msg = {'src': my_id, 'dst': dst, 'leader': leader, 'type': 'redirect', 'MID': mid}
sock.send(bytes(json.dumps(redirect_msg), encoding='utf8'))
# APPENDENTRIES TYPE
def send_append():
global last_append
global lastSent
if leader != my_id:
return
# For each other server
for rid in replica_ids:
prevLogIndex = nextIndex[rid] - 1
if commitIndex - prevLogIndex > 10:
continue
prevLogTerm = 0
if len(log) >= prevLogIndex > 0:
prevLogTerm = log[prevLogIndex - 1][0]
append_msg = {'src': my_id, 'dst': rid, 'leader': leader, 'type': 'append',
'MID': str(random.randint(1000000, 9999999)), 'term': currentTerm, 'prevLogIndex': prevLogIndex,
'prevLogTerm': prevLogTerm, 'leaderCommit': commitIndex, 'log': log[nextIndex[rid] - 1:]}
lastSent = len(log)
sock.send(bytes(json.dumps(append_msg), encoding='utf8'))
last_append = time.time()
# REQUESTVOTE TYPE
def send_request():
global my_id
global currentTerm
global log
global leader
lastTerm = -1
if log:
lastTerm = log[len(log) - 1][0]
# For each other server
for rid in replica_ids:
request_msg = {'src': my_id, 'dst': rid, 'leader': leader, 'type': 'request',
'MID': str(random.randint(1000000, 9999999)), 'term': currentTerm, 'candidateID': my_id,
'lastLogIndex': len(log), 'lastLogTerm': lastTerm}
sock.send(bytes(json.dumps(request_msg), encoding='utf8'))
# OK TYPE
def send_ok(dst, mid, value):
ok_msg = {'src': my_id, 'dst': dst, 'leader': leader, 'type': 'ok', 'MID': mid}
if value is not None:
ok_msg['value'] = value
sock.send(bytes(json.dumps(ok_msg), encoding='utf8'))
# APPEND_RESPONSE TYPE
def send_append_response(dst, mid, success):
append_response_msg = {'src': my_id, 'dst': dst, 'leader': leader, 'type': 'append_response', 'MID': mid,
'term': currentTerm, 'success': success}
sock.send(bytes(json.dumps(append_response_msg), encoding='utf8'))
# ----------------
# * RECV HELPERS *
# ----------------
# GET TYPE
def recv_get(dst, mid, msg):
# If this server is leader, log the message
if leader == my_id and msg['leader'] == my_id:
log.append((currentTerm, msg))
# Otherwise, ask the client to message the leader
else:
send_redirect(dst, mid)
# PUT TYPE
def recv_put(dst, mid, msg):
# If this server is leader, log the message
if leader == my_id and msg['leader'] == my_id:
log.append((currentTerm, msg))
# Otherwise, ask the client to message the leader
else:
send_redirect(dst, mid)
# APPENDENTRIES TYPE
def recv_append(dst, mid, term, prevLogIndex, prevLogTerm, entries, leaderCommit):
# for election only (NEED TO BE UPDATED)
global leader
global last_append
global votedFor
global log
global commitIndex
global currentTerm
last_append = time.time()
success = False
# Leader's term must be >= follower's term
term_condition = term >= currentTerm
# Follower's most recent entry must match leader's at that index
prev_condition = prevLogIndex < 1 or (len(log) >= prevLogIndex and log[prevLogIndex - 1][0] == prevLogTerm)
# Update own currentTerm converts to follower if stale term
if term > currentTerm:
currentTerm = term
votedFor = 'FFFF'
leader = dst
elif votedFor == my_id and leader != dst and term == currentTerm:
leader = dst
if not entries:
return
# Update logs if conditions are met
if term_condition and prev_condition:
success = True
log = log[:prevLogIndex] + entries
# Update commitIndex
if leaderCommit > commitIndex:
commitIndex = min(leaderCommit, len(log))
# Respond to the AppendEntries request
send_append_response(dst, mid, success)
# REQUESTVOTE TYPE
# process the received voting request
def recv_request(dst, mid, term, lastLogIndex, lastLogTerm):
global my_id
global leader
global votedFor
global last_append
global currentTerm
respond_msg = {'src': my_id, 'dst': dst, 'leader': leader, 'type': 'ballot',
'MID': mid, 'term': currentTerm, 'voteGranted': False}
term_condition = term >= currentTerm
if term > currentTerm:
currentTerm = term
votedFor = "FFFF"
vote_condition = votedFor == "FFFF"
log_term_condition = not log or lastLogTerm > log[len(log) - 1][0]
log_len_condition = not log or (lastLogTerm == log[len(log) - 1][0] and lastLogIndex >= len(log))
if term_condition and vote_condition:
if log_term_condition or log_len_condition:
#print("GRANTING VOTE TO " + dst + " FROM " + my_id)
#print(" CANDIDATE TERM " + str(term) + " > " + str(currentTerm) + " LOG INDEX " + str(lastLogIndex) + " > " + str(len(log)))
respond_msg['voteGranted'] = True
votedFor = dst
last_append = time.time()
sock.send(bytes(json.dumps(respond_msg), encoding='utf8'))
# OK TYPE
def recv_ok(dst, mid):
print("Server received OK.")
# BALLOT TYPE
def recv_ballot(dst, voteGranted, term):
global currentTerm
global vote_received
if currentTerm >= term:
if voteGranted:
vote_received.add(dst)
else:
currentTerm = term
# APPEND_RESPONSE TYPE
def recv_append_response(src, term, success):
global currentTerm
global leader
global votedFor
# check term, (leader) converts to follower if stale term
if term > currentTerm:
currentTerm = term
votedFor = 'FFFF'
if success:
matchIndex[src] = lastSent
nextIndex[src] = matchIndex[src] + 1
update_commitIdx()
else:
nextIndex[src] = nextIndex[src] - 1
# ----------------
# * OTHER HELPERS *
# ----------------
# process message
def process_msg():
global commitIndex
ready = select.select([sock], [], [], 0.025)[0]
if sock in ready:
msg_raw = sock.recv(32768)
# Ignore empty messages
if len(msg_raw) == 0:
return
msg = json.loads(msg_raw)
if msg['type'] == 'get':
recv_get(msg['src'], msg['MID'], msg)
# Automatically commit for now, update with replication
elif msg['type'] == 'put':
recv_put(msg['src'], msg['MID'], msg)
# Automatically commit for now, update with replication
elif msg['type'] == 'append':
recv_append(msg['src'], msg['MID'], msg['term'], msg['prevLogIndex'], msg['prevLogTerm'],
msg['log'], msg['leaderCommit'])
elif msg['type'] == 'request':
recv_request(msg['src'], msg['MID'], msg['term'], msg['lastLogIndex'], msg['lastLogTerm'])
elif msg['type'] == 'ok':
recv_ok(msg['src'], msg['MID'])
elif msg['type'] == 'ballot':
recv_ballot(msg['src'], msg['voteGranted'], msg['term'])
elif msg['type'] == 'append_response':
recv_append_response(msg['src'], msg['term'], msg['success'])
else:
print(my_id + " received unhandled type: " + msg['type'])
else:
return
# Run election
def run_election():
#print(my_id + " STARTED AN ELECTION")
global currentTerm
global leader
global votedFor
global last_append
global vote_received
global nextIndex
global matchIndex
votedFor = my_id
leader = "FFFF"
currentTerm += 1
send_request()
last_append = time.time()
vote_received.clear()
vote_received.add(my_id)
while True:
process_msg()
# wins election
if len(vote_received) > (len(replica_ids) + 1) / 2:
leader = my_id
for rid in replica_ids:
nextIndex[rid] = len(log) + 1
matchIndex[rid] = 0
# send heartbeat to establish authority
send_append()
return
# another server establishes itself as leader
if leader != 'FFFF':
return
# time out no winner -> start new election
loc_clock = time.time()
if loc_clock - last_append > random.uniform(.150, .350):
run_election()
return
# [Leader] check matchIndex to update commitIndex
def update_commitIdx():
global commitIndex
while commitIndex <= len(log):
logged_next = 1
for last in matchIndex.values():
if last > commitIndex and log[last - 1][0] == currentTerm:
logged_next += 1
if logged_next > (len(replica_ids) + 1) / 2:
commitIndex += 1
else:
break
while True:
process_msg()
#print("LOG:", log)
# Apply messages to the state machine which have been committed
while lastApplied < commitIndex:
lastApplied += 1
# Index offset because Raft arrays start from 1
toApply = log[lastApplied - 1][1]
if toApply['type'] == 'get' and leader == my_id:
# Leader returns ok with corresponding value or blank string
value = ""
if toApply['key'] in data:
value = data[toApply['key']]
#print("KEY is " + toApply['key'] + " VALUE is " + value + " MID " + toApply['MID'])
send_ok(toApply['src'], toApply['MID'], value)
elif toApply['type'] == 'put':
# Update data
data[toApply['key']] = toApply['value']
#print("STORING KEY_VALUE " + toApply['key'] + " VALUE " + toApply['value'] + " LEADER " + leader + " MY ID " + my_id)
# Leader returns ok
if leader == my_id:
send_ok(toApply['src'], toApply['MID'], None)
clock = time.time()
# Non-leaders initiate election after random timer expires
if leader != my_id and clock - last_append > random.uniform(0.150, 0.350):
run_election()
clock = time.time()
# Leaders send heartbeat when applicable
if leader == my_id and (lastSent < len(log) or clock - last_append > 0.1):
send_append()