Skip to content

Commit

Permalink
use lamport timestamps when reading data from multiple replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
pysel committed Nov 13, 2023
1 parent e53dbae commit ca4ff20
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 25 deletions.
24 changes: 19 additions & 5 deletions balancer/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package balancer
import (
"context"
"crypto/sha256"
"fmt"

"github.com/pysel/dkvs/prototypes"
pbbalancer "github.com/pysel/dkvs/prototypes/balancer"
pbpartition "github.com/pysel/dkvs/prototypes/partition"
"google.golang.org/protobuf/proto"

"github.com/pysel/dkvs/types"
)
Expand Down Expand Up @@ -50,19 +53,30 @@ func (bs *BalancerServer) Get(ctx context.Context, req *prototypes.GetRequest) (
}

var response *prototypes.GetResponse
errCounter := 0
maxLamport := uint64(0)
for _, client := range responsibleClients {
resp, err := client.Get(ctx, &prototypes.GetRequest{Key: key})
if err != nil {
errCounter++
continue
}

response = resp
break // break here since other replicas would return the same value
// since returned value will be a tuple of lamport timestamp and value, check which returned value
// has the highest lamport timestamp
var storedValue pbpartition.StoredValue
err = proto.Unmarshal(resp.Value, &storedValue)
if err != nil {
// TODO: partition is in incorrect state, should remove it from active set
fmt.Println("Error unmarshalling value from partition", err)
continue
}

if storedValue.Lamport > maxLamport {
maxLamport = storedValue.Lamport
response = resp
}
}

if errCounter == len(responsibleClients) {
if response == nil {
return nil, ErrAllReplicasFailed
}

Expand Down
6 changes: 1 addition & 5 deletions partition/stored_value.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package partition

import (
"encoding/binary"

pbpartition "github.com/pysel/dkvs/prototypes/partition"
)

// toStoredValue converts a value with lamport timestamp to a stored value.
func toStoredValue(lamport uint64, value []byte) *pbpartition.StoredValue {
lamportBz := make([]byte, 8)
binary.BigEndian.PutUint64(lamportBz, lamport)
return &pbpartition.StoredValue{
Lamport: lamportBz,
Lamport: lamport,
Value: value,
}
}
4 changes: 2 additions & 2 deletions proto/dkvs/partition/store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ package dkvs.partition;
option go_package = "github.com/pysel/dkvs/prototypes/partition";

message StoredValue {
bytes lamport = 1;
bytes value = 2;
bytes value = 1;
uint64 lamport = 2;
}
22 changes: 11 additions & 11 deletions prototypes/partition/store.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions prototypes/partition/store.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ca4ff20

Please sign in to comment.