diff --git a/balancer/grpc.go b/balancer/grpc.go index af51813..f2f6d5e 100644 --- a/balancer/grpc.go +++ b/balancer/grpc.go @@ -3,9 +3,12 @@ package balancer import ( "context" "crypto/sha256" + "fmt" "github.com/pysel/dkvs/prototypes" pbbalancer "github.com/pysel/dkvs/prototypes/balancer" + pbpartition "github.com/pysel/dkvs/prototypes/partition" + "google.golang.org/protobuf/proto" "github.com/pysel/dkvs/types" ) @@ -50,19 +53,30 @@ func (bs *BalancerServer) Get(ctx context.Context, req *prototypes.GetRequest) ( } var response *prototypes.GetResponse - errCounter := 0 + maxLamport := uint64(0) for _, client := range responsibleClients { resp, err := client.Get(ctx, &prototypes.GetRequest{Key: key}) if err != nil { - errCounter++ continue } - response = resp - break // break here since other replicas would return the same value + // since returned value will be a tuple of lamport timestamp and value, check which returned value + // has the highest lamport timestamp + var storedValue pbpartition.StoredValue + err = proto.Unmarshal(resp.Value, &storedValue) + if err != nil { + // TODO: partition is in incorrect state, should remove it from active set + fmt.Println("Error unmarshalling value from partition", err) + continue + } + + if storedValue.Lamport > maxLamport { + maxLamport = storedValue.Lamport + response = resp + } } - if errCounter == len(responsibleClients) { + if response == nil { return nil, ErrAllReplicasFailed } diff --git a/partition/stored_value.go b/partition/stored_value.go index ba8d9e1..b60598f 100644 --- a/partition/stored_value.go +++ b/partition/stored_value.go @@ -1,17 +1,13 @@ package partition import ( - "encoding/binary" - pbpartition "github.com/pysel/dkvs/prototypes/partition" ) // toStoredValue converts a value with lamport timestamp to a stored value. func toStoredValue(lamport uint64, value []byte) *pbpartition.StoredValue { - lamportBz := make([]byte, 8) - binary.BigEndian.PutUint64(lamportBz, lamport) return &pbpartition.StoredValue{ - Lamport: lamportBz, + Lamport: lamport, Value: value, } } diff --git a/proto/dkvs/partition/store.proto b/proto/dkvs/partition/store.proto index c093936..05dbaed 100644 --- a/proto/dkvs/partition/store.proto +++ b/proto/dkvs/partition/store.proto @@ -4,6 +4,6 @@ package dkvs.partition; option go_package = "github.com/pysel/dkvs/prototypes/partition"; message StoredValue { - bytes lamport = 1; - bytes value = 2; + bytes value = 1; + uint64 lamport = 2; } diff --git a/prototypes/partition/store.pb.go b/prototypes/partition/store.pb.go index a39d0f5..ae99dce 100644 --- a/prototypes/partition/store.pb.go +++ b/prototypes/partition/store.pb.go @@ -25,8 +25,8 @@ type StoredValue struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Lamport []byte `protobuf:"bytes,1,opt,name=lamport,proto3" json:"lamport,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` + Lamport uint64 `protobuf:"varint,2,opt,name=lamport,proto3" json:"lamport,omitempty"` } func (x *StoredValue) Reset() { @@ -61,18 +61,18 @@ func (*StoredValue) Descriptor() ([]byte, []int) { return file_dkvs_partition_store_proto_rawDescGZIP(), []int{0} } -func (x *StoredValue) GetLamport() []byte { +func (x *StoredValue) GetValue() []byte { if x != nil { - return x.Lamport + return x.Value } return nil } -func (x *StoredValue) GetValue() []byte { +func (x *StoredValue) GetLamport() uint64 { if x != nil { - return x.Value + return x.Lamport } - return nil + return 0 } var File_dkvs_partition_store_proto protoreflect.FileDescriptor @@ -81,10 +81,10 @@ var file_dkvs_partition_store_proto_rawDesc = []byte{ 0x0a, 0x1a, 0x64, 0x6b, 0x76, 0x73, 0x2f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x64, 0x6b, 0x76, 0x73, 0x2e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x0b, - 0x53, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6c, - 0x61, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6c, 0x61, - 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x2c, 0x5a, 0x2a, 0x67, + 0x53, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6c, 0x61, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x07, 0x6c, 0x61, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x79, 0x73, 0x65, 0x6c, 0x2f, 0x64, 0x6b, 0x76, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, diff --git a/prototypes/partition/store.pb.validate.go b/prototypes/partition/store.pb.validate.go index fb2e2b2..9b4dfc6 100644 --- a/prototypes/partition/store.pb.validate.go +++ b/prototypes/partition/store.pb.validate.go @@ -57,10 +57,10 @@ func (m *StoredValue) validate(all bool) error { var errors []error - // no validation rules for Lamport - // no validation rules for Value + // no validation rules for Lamport + if len(errors) > 0 { return StoredValueMultiError(errors) }