From ea2bdbefe132cb95949b068c0c9913355dcae34d Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Tue, 3 Oct 2023 07:32:21 +0200 Subject: [PATCH 01/28] Upgrade go version and go mod tidy --- be1-go/go.mod | 2 +- be1-go/go.sum | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/be1-go/go.mod b/be1-go/go.mod index f74e63ae84..f0f862f5c6 100644 --- a/be1-go/go.mod +++ b/be1-go/go.mod @@ -1,6 +1,6 @@ module popstellar -go 1.19 +go 1.21 require ( github.com/aaronarduino/goqrsvg v0.0.0-20220419053939-17e843f1dd40 diff --git a/be1-go/go.sum b/be1-go/go.sum index 1498a8b6c6..153d8a0139 100644 --- a/be1-go/go.sum +++ b/be1-go/go.sum @@ -21,13 +21,16 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/schema v1.2.0 h1:YufUaxZYCKGFuAq3c96BOhjgd5nmXiOY9NGzF247Tsc= @@ -146,6 +149,7 @@ google.golang.org/protobuf v1.29.1 h1:7QBf+IK2gx70Ap/hDsOmam3GE0v9HicjfEdAxE62Uo google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI= gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI= From 74425607a42519b3412b99d4fe275693e480693c Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Tue, 3 Oct 2023 08:24:20 +0200 Subject: [PATCH 02/28] Add message ids thread safe structure --- .../standard_hub/helpers/data_structures.go | 60 +++++++++++++++++++ be1-go/hub/standard_hub/message_handling.go | 14 ++--- be1-go/hub/standard_hub/mod.go | 51 ++++++---------- 3 files changed, 85 insertions(+), 40 deletions(-) create mode 100644 be1-go/hub/standard_hub/helpers/data_structures.go diff --git a/be1-go/hub/standard_hub/helpers/data_structures.go b/be1-go/hub/standard_hub/helpers/data_structures.go new file mode 100644 index 0000000000..142d6ddbf2 --- /dev/null +++ b/be1-go/hub/standard_hub/helpers/data_structures.go @@ -0,0 +1,60 @@ +package helpers + +import ( + "maps" + "popstellar/channel" + "slices" + "sync" +) + +// MessageIds provides a thread-safe structure that stores a channel id with its corresponding message ids +type IdsByChannel struct { + sync.RWMutex + table map[string][]string +} + +func NewIdsByChannel() IdsByChannel { + return IdsByChannel{ + table: make(map[string][]string), + } +} + +func (i *IdsByChannel) Add(channel string, id string) { + i.Lock() + defer i.Unlock() + messageIds, channelStored := i.table[channel] + if !channelStored { + i.table[channel] = append(i.table[channel], id) + return + } + alreadyStoredId := slices.Contains(messageIds, id) + if !alreadyStoredId { + i.table[channel] = append(i.table[channel], id) + } +} + +func (i *IdsByChannel) GetAll() map[string][]string { + i.RLock() + defer i.RUnlock() + tableCopy := make(map[string][]string) + maps.Copy(tableCopy, i.table) + return tableCopy +} + +func (i *IdsByChannel) IsEmpty() bool { + i.RLock() + defer i.RUnlock() + + return len(i.table) == 0 +} + +type ChannelByID struct { + sync.RWMutex + table map[string]channel.Channel +} + +func NewChannelById() ChannelByID { + return ChannelByID{ + table: make(map[string]channel.Channel), + } +} diff --git a/be1-go/hub/standard_hub/message_handling.go b/be1-go/hub/standard_hub/message_handling.go index e465427dd4..86bcef3cd4 100644 --- a/be1-go/hub/standard_hub/message_handling.go +++ b/be1-go/hub/standard_hub/message_handling.go @@ -78,7 +78,7 @@ func (h *Hub) handleRootChannelPublishMessage(sock socket.Socket, publish method h.rootInbox.StoreMessage(publish.Params.Message) h.hubInbox.StoreMessage(publish.Params.Message) - h.addMessageId(publish.Params.Channel, publish.Params.Message.MessageID) + h.messageIdsByChannel.Add(publish.Params.Channel, publish.Params.Message.MessageID) return nil } @@ -143,7 +143,7 @@ func (h *Hub) handleRootChannelBroadcastMessage(sock socket.Socket, h.rootInbox.StoreMessage(broadcast.Params.Message) h.hubInbox.StoreMessage(broadcast.Params.Message) - h.addMessageId(broadcast.Params.Channel, broadcast.Params.Message.MessageID) + h.messageIdsByChannel.Add(broadcast.Params.Channel, broadcast.Params.Message.MessageID) return nil } @@ -275,7 +275,7 @@ func (h *Hub) handlePublish(socket socket.Socket, byteMessage []byte) (int, erro return publish.ID, err } h.hubInbox.StoreMessage(publish.Params.Message) - h.addMessageId(publish.Params.Channel, publish.Params.Message.MessageID) + h.messageIdsByChannel.Add(publish.Params.Channel, publish.Params.Message.MessageID) return publish.ID, nil } @@ -295,7 +295,7 @@ func (h *Hub) handlePublish(socket socket.Socket, byteMessage []byte) (int, erro } h.hubInbox.StoreMessage(publish.Params.Message) - h.addMessageId(publish.Params.Channel, publish.Params.Message.MessageID) + h.messageIdsByChannel.Add(publish.Params.Channel, publish.Params.Message.MessageID) return publish.ID, nil } @@ -325,7 +325,7 @@ func (h *Hub) handleBroadcast(socket socket.Socket, byteMessage []byte) error { return nil } h.hubInbox.StoreMessage(broadcast.Params.Message) - h.addMessageId(broadcast.Params.Channel, broadcast.Params.Message.MessageID) + h.messageIdsByChannel.Add(broadcast.Params.Channel, broadcast.Params.Message.MessageID) h.Unlock() @@ -435,7 +435,7 @@ func (h *Hub) handleHeartbeat(socket socket.Socket, receivedIds := heartbeat.Params - missingIds := getMissingIds(receivedIds, h.messageIdsByChannel, h.blacklist) + missingIds := getMissingIds(receivedIds, h.messageIdsByChannel.GetAll(), h.blacklist) if len(missingIds) > 0 { err = h.sendGetMessagesByIdToServer(socket, missingIds) @@ -592,7 +592,7 @@ func (h *Hub) handleReceivedMessage(socket socket.Socket, messageData message.Me h.Lock() h.hubInbox.StoreMessage(publish.Params.Message) - h.addMessageId(publish.Params.Channel, publish.Params.Message.MessageID) + h.messageIdsByChannel.Add(publish.Params.Channel, publish.Params.Message.MessageID) h.Unlock() return nil } diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index 76478906a2..f26db06977 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -4,9 +4,9 @@ import ( "context" "encoding/base64" "encoding/json" - "golang.org/x/exp/slices" "popstellar/channel" "popstellar/crypto" + "popstellar/hub/standard_hub/helpers" "popstellar/inbox" jsonrpc "popstellar/message" "popstellar/message/answer" @@ -88,7 +88,7 @@ type Hub struct { // messageIdsByChannel stores all the message ids and the corresponding channel ids // to help servers determine in which channel the message ids go - messageIdsByChannel map[string][]string + messageIdsByChannel helpers.IdsByChannel // peersInfo stores the info of the peers: public key, client and server endpoints associated with the socket ID peersInfo map[string]method.ServerInfo @@ -154,7 +154,7 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd hubInbox: *inbox.NewInbox(rootChannel), rootInbox: *inbox.NewInbox(rootChannel), queries: newQueries(), - messageIdsByChannel: make(map[string][]string), + messageIdsByChannel: helpers.NewIdsByChannel(), peersInfo: make(map[string]method.ServerInfo), peersGreeted: make([]string, 0), blacklist: make([]string, 0), @@ -528,26 +528,24 @@ func (h *Hub) sendGetMessagesByIdToServer(socket socket.Socket, missingIds map[s // sendHeartbeatToServers sends a heartbeat message to all servers func (h *Hub) sendHeartbeatToServers() { - h.Lock() - defer h.Unlock() - if len(h.messageIdsByChannel) > 0 { - heartbeatMessage := method.Heartbeat{ - Base: query.Base{ - JSONRPCBase: jsonrpc.JSONRPCBase{ - JSONRPC: "2.0", - }, - Method: "heartbeat", + if h.messageIdsByChannel.IsEmpty() { + return + } + heartbeatMessage := method.Heartbeat{ + Base: query.Base{ + JSONRPCBase: jsonrpc.JSONRPCBase{ + JSONRPC: "2.0", }, - Params: h.messageIdsByChannel, - } - - buf, err := json.Marshal(heartbeatMessage) - if err != nil { - h.log.Err(err).Msg("Failed to marshal and send heartbeat query") - } + Method: "heartbeat", + }, + Params: h.messageIdsByChannel.GetAll(), + } - h.serverSockets.SendToAll(buf) + buf, err := json.Marshal(heartbeatMessage) + if err != nil { + h.log.Err(err).Msg("Failed to marshal and send heartbeat query") } + h.serverSockets.SendToAll(buf) } // createLao creates a new LAO using the data in the publish parameter. @@ -667,16 +665,3 @@ func generateKeys() (kyber.Point, kyber.Scalar) { return point, secret } - -// addMessageId adds a message ID to the map of messageIds by channel of the hub -func (h *Hub) addMessageId(channelId string, messageId string) { - messageIds, channelStored := h.messageIdsByChannel[channelId] - if !channelStored { - h.messageIdsByChannel[channelId] = append(h.messageIdsByChannel[channelId], messageId) - } else { - alreadyStored := slices.Contains(messageIds, messageId) - if !alreadyStored { - h.messageIdsByChannel[channelId] = append(h.messageIdsByChannel[channelId], messageId) - } - } -} From 66155dc50e53e6e2ed54da042dd678bdd5df8a56 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Tue, 3 Oct 2023 09:43:26 +0200 Subject: [PATCH 03/28] fix hub tests --- be1-go/hub/standard_hub/helpers/data_structures.go | 6 ++++++ be1-go/hub/standard_hub/mod_test.go | 12 ++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/be1-go/hub/standard_hub/helpers/data_structures.go b/be1-go/hub/standard_hub/helpers/data_structures.go index 142d6ddbf2..75ac8b6eea 100644 --- a/be1-go/hub/standard_hub/helpers/data_structures.go +++ b/be1-go/hub/standard_hub/helpers/data_structures.go @@ -33,6 +33,12 @@ func (i *IdsByChannel) Add(channel string, id string) { } } +func (i *IdsByChannel) AddAll(channel string, ids []string) { + i.Lock() + defer i.Unlock() + i.table[channel] = append(i.table[channel], ids...) +} + func (i *IdsByChannel) GetAll() map[string][]string { i.RLock() defer i.RUnlock() diff --git a/be1-go/hub/standard_hub/mod_test.go b/be1-go/hub/standard_hub/mod_test.go index d15cbc8d5c..0c1d50ec5b 100644 --- a/be1-go/hub/standard_hub/mod_test.go +++ b/be1-go/hub/standard_hub/mod_test.go @@ -1615,8 +1615,8 @@ func Test_Send_Heartbeat_Message(t *testing.T) { hub.hubInbox.StoreMessage(msg2) hub.hubInbox.StoreMessage(msg3) - hub.messageIdsByChannel["/root"] = idsRoot - hub.messageIdsByChannel["/root/channel1"] = idsChannel1 + hub.messageIdsByChannel.AddAll("/root", idsRoot) + hub.messageIdsByChannel.AddAll("/root/channel1", idsChannel1) hub.sendHeartbeatToServers() @@ -1630,7 +1630,7 @@ func Test_Send_Heartbeat_Message(t *testing.T) { messageIdsSent := heartbeat.Params //Check that all the stored messages where sent - for storedChannel, storedIds := range hub.messageIdsByChannel { + for storedChannel, storedIds := range hub.messageIdsByChannel.GetAll() { sentIds, exists := messageIdsSent[storedChannel] require.True(t, exists) for _, storedId := range storedIds { @@ -1649,7 +1649,7 @@ func Test_Handle_Heartbeat(t *testing.T) { hub.hubInbox.StoreMessage(msg1) - hub.messageIdsByChannel["/root"] = []string{msg1.MessageID} + hub.messageIdsByChannel.Add("/root", msg1.MessageID) sock := &fakeSocket{} @@ -1715,8 +1715,8 @@ func Test_Handle_GetMessagesById(t *testing.T) { hub.hubInbox.StoreMessage(msg2) hub.hubInbox.StoreMessage(msg3) - hub.messageIdsByChannel["/root"] = idsRoot - hub.messageIdsByChannel["/root/channel1"] = idsChannel1 + hub.messageIdsByChannel.AddAll("/root", idsRoot) + hub.messageIdsByChannel.AddAll("/root/channel1", idsChannel1) //The missing Ids requested by the server missingIds := make(map[string][]string) From 0e11844b61c3dec2ae93786c038bdabb9ab71aa5 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Wed, 4 Oct 2023 09:16:25 +0200 Subject: [PATCH 04/28] Add peers struct --- .../standard_hub/helpers/data_structures.go | 44 +++++++++++++++++++ be1-go/hub/standard_hub/message_handling.go | 8 +--- be1-go/hub/standard_hub/mod.go | 22 +++------- be1-go/hub/standard_hub/mod_test.go | 2 +- 4 files changed, 52 insertions(+), 24 deletions(-) diff --git a/be1-go/hub/standard_hub/helpers/data_structures.go b/be1-go/hub/standard_hub/helpers/data_structures.go index 75ac8b6eea..42472c39c4 100644 --- a/be1-go/hub/standard_hub/helpers/data_structures.go +++ b/be1-go/hub/standard_hub/helpers/data_structures.go @@ -3,10 +3,54 @@ package helpers import ( "maps" "popstellar/channel" + "popstellar/message/query/method" "slices" "sync" ) +type Peers struct { + sync.RWMutex + // peersInfo stores the info of the peers: public key, client and server endpoints associated with the socket ID + peersInfo map[string]method.ServerInfo + // peersGreeted stores the peers that were greeted by the socket ID + peersGreeted []string +} + +func NewPeers() Peers { + return Peers{ + peersInfo: make(map[string]method.ServerInfo), + peersGreeted: make([]string, 0), + } +} + +func (p *Peers) AddPeerInfo(socketId string, info method.ServerInfo) { + p.Lock() + defer p.Unlock() + p.peersInfo[socketId] = info +} + +func (p *Peers) AddPeerGreeted(socketId string) { + p.Lock() + defer p.Unlock() + p.peersGreeted = append(p.peersGreeted, socketId) +} + +func (p *Peers) GetAllPeersInfo() []method.ServerInfo { + p.RLock() + defer p.RUnlock() + peersInfo := make([]method.ServerInfo, 0) + for _, info := range p.peersInfo { + peersInfo = append(peersInfo, info) + } + return peersInfo +} + +func (p *Peers) IsPeerGreeted(socketId string) bool { + p.RLock() + defer p.RUnlock() + return slices.Contains(p.peersGreeted, socketId) +} + // MessageIds provides a thread-safe structure that stores a channel id with its corresponding message ids type IdsByChannel struct { sync.RWMutex diff --git a/be1-go/hub/standard_hub/message_handling.go b/be1-go/hub/standard_hub/message_handling.go index 86bcef3cd4..928fb06a43 100644 --- a/be1-go/hub/standard_hub/message_handling.go +++ b/be1-go/hub/standard_hub/message_handling.go @@ -473,16 +473,12 @@ func (h *Hub) handleGreetServer(socket socket.Socket, byteMessage []byte) error return xerrors.Errorf("failed to unmarshal greetServer message: %v", err) } - h.Lock() // store information about the server - h.peersInfo[socket.ID()] = greetServer.Params + h.peers.AddPeerInfo(socket.ID(), greetServer.Params) - // check if the server is already greeted - if slices.Contains(h.peersGreeted, socket.ID()) { - h.Unlock() + if h.peers.IsPeerGreeted(socket.ID()) { return nil } - h.Unlock() err = h.SendGreetServer(socket) if err != nil { diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index f26db06977..11fc7d768e 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -90,11 +90,8 @@ type Hub struct { // to help servers determine in which channel the message ids go messageIdsByChannel helpers.IdsByChannel - // peersInfo stores the info of the peers: public key, client and server endpoints associated with the socket ID - peersInfo map[string]method.ServerInfo - - // peersGreeted stores the peers that were greeted by the socket ID - peersGreeted []string + // peers stores information about the peers + peers helpers.Peers // blacklist stores the IDs of the messages that failed to be processed by the hub // the server will not ask for them again in the heartbeat @@ -155,8 +152,7 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd rootInbox: *inbox.NewInbox(rootChannel), queries: newQueries(), messageIdsByChannel: helpers.NewIdsByChannel(), - peersInfo: make(map[string]method.ServerInfo), - peersGreeted: make([]string, 0), + peers: helpers.NewPeers(), blacklist: make([]string, 0), } @@ -295,7 +291,7 @@ func (h *Hub) SendGreetServer(socket socket.Socket) error { socket.Send(buf) - h.peersGreeted = append(h.peersGreeted, socket.ID()) + h.peers.AddPeerGreeted(socket.ID()) return nil } @@ -648,15 +644,7 @@ func (h *Hub) NotifyWitnessMessage(messageId string, publicKey string, signature } func (h *Hub) GetPeersInfo() []method.ServerInfo { - h.Lock() - defer h.Unlock() - - var peersInfo []method.ServerInfo - for _, info := range h.peersInfo { - peersInfo = append(peersInfo, info) - } - - return peersInfo + return h.peers.GetAllPeersInfo() } func generateKeys() (kyber.Point, kyber.Scalar) { diff --git a/be1-go/hub/standard_hub/mod_test.go b/be1-go/hub/standard_hub/mod_test.go index 0c1d50ec5b..11aadd67c2 100644 --- a/be1-go/hub/standard_hub/mod_test.go +++ b/be1-go/hub/standard_hub/mod_test.go @@ -1846,7 +1846,7 @@ func Test_Handle_GreetServer_Already_Greeted(t *testing.T) { err = hub.SendGreetServer(sock) require.NoError(t, err) - require.True(t, slices.Contains(hub.peersGreeted, sock.ID())) + require.True(t, hub.peers.IsPeerGreeted(sock.ID())) //reset socket message sock.msg = nil From 3205c9e1ecadb290843118defd01bdc24de3f718 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Wed, 4 Oct 2023 10:59:07 +0200 Subject: [PATCH 05/28] fix queries data structure --- .../standard_hub/helpers/data_structures.go | 32 +++++++++ be1-go/hub/standard_hub/message_handling.go | 18 +---- be1-go/hub/standard_hub/mod.go | 68 ++++++++++++------- be1-go/hub/standard_hub/mod_test.go | 50 +++++++------- be1-go/inbox/mod.go | 2 +- 5 files changed, 102 insertions(+), 68 deletions(-) diff --git a/be1-go/hub/standard_hub/helpers/data_structures.go b/be1-go/hub/standard_hub/helpers/data_structures.go index 42472c39c4..f8eebd9e06 100644 --- a/be1-go/hub/standard_hub/helpers/data_structures.go +++ b/be1-go/hub/standard_hub/helpers/data_structures.go @@ -51,6 +51,38 @@ func (p *Peers) IsPeerGreeted(socketId string) bool { return slices.Contains(p.peersGreeted, socketId) } +type ChannelsById struct { + sync.RWMutex + table map[string]channel.Channel +} + +func NewChannelsById() ChannelsById { + return ChannelsById{ + table: make(map[string]channel.Channel), + } +} + +func (c *ChannelsById) Add(channelId string, channel channel.Channel) { + c.Lock() + defer c.Unlock() + c.table[channelId] = channel +} + +func (c *ChannelsById) GetChannel(channelId string) (channel.Channel, bool) { + c.RLock() + defer c.RUnlock() + channel, ok := c.table[channelId] + return channel, ok +} + +func (c *ChannelsById) GetAll() map[string]channel.Channel { + c.RLock() + defer c.RUnlock() + channelsCopy := make(map[string]channel.Channel) + maps.Copy(channelsCopy, c.table) + return channelsCopy +} + // MessageIds provides a thread-safe structure that stores a channel id with its corresponding message ids type IdsByChannel struct { sync.RWMutex diff --git a/be1-go/hub/standard_hub/message_handling.go b/be1-go/hub/standard_hub/message_handling.go index 928fb06a43..7ea6be3a3e 100644 --- a/be1-go/hub/standard_hub/message_handling.go +++ b/be1-go/hub/standard_hub/message_handling.go @@ -189,21 +189,16 @@ func (h *Hub) handleAnswer(senderSocket socket.Socket, byteMessage []byte) error return nil } - h.queries.Lock() - - val := h.queries.state[*answerMsg.ID] + val := h.queries.getQueryState(*answerMsg.ID) if val == nil { - h.queries.Unlock() return xerrors.Errorf("no query sent with id %v", answerMsg.ID) } if *val { - h.queries.Unlock() return xerrors.Errorf("query %v already got an answer", answerMsg.ID) } - *h.queries.state[*answerMsg.ID] = true - h.queries.Unlock() + h.queries.setQueryState(*answerMsg.ID, true) err = h.handleGetMessagesByIdAnswer(senderSocket, answerMsg) if err != nil { @@ -318,7 +313,6 @@ func (h *Hub) handleBroadcast(socket socket.Socket, byteMessage []byte) error { expectedMessageID, messageID) } - h.Lock() _, ok := h.hubInbox.GetMessage(broadcast.Params.Message.MessageID) if ok { h.log.Info().Msg("message was already received") @@ -327,8 +321,6 @@ func (h *Hub) handleBroadcast(socket socket.Socket, byteMessage []byte) error { h.hubInbox.StoreMessage(broadcast.Params.Message) h.messageIdsByChannel.Add(broadcast.Params.Channel, broadcast.Params.Message.MessageID) - h.Unlock() - if err != nil { return xerrors.Errorf("failed to broadcast message: %v", err) } @@ -531,7 +523,6 @@ func (h *Hub) getMissingMessages(missingIds map[string][]string) (map[string][]m // handleReceivedMessage handle a message obtained by the server receiving a // getMessagesById result func (h *Hub) handleReceivedMessage(socket socket.Socket, messageData message.Message, targetChannel string) error { - h.Lock() signature := messageData.Signature messageID := messageData.MessageID data := messageData.Data @@ -539,7 +530,6 @@ func (h *Hub) handleReceivedMessage(socket socket.Socket, messageData message.Me expectedMessageID := messagedata.Hash(data, signature) if expectedMessageID != messageID { - h.Unlock() return xerrors.Errorf(wrongMessageIdError, expectedMessageID, messageID) } @@ -563,10 +553,8 @@ func (h *Hub) handleReceivedMessage(socket socket.Socket, messageData message.Me _, stored := h.hubInbox.GetMessage(publish.Params.Message.MessageID) if stored { h.log.Info().Msgf("Already stored message %s", publish.Params.Message.MessageID) - h.Unlock() return nil } - h.Unlock() if publish.Params.Channel == rootChannel { err := h.handleRootChannelPublishMessage(socket, publish) @@ -586,10 +574,8 @@ func (h *Hub) handleReceivedMessage(socket socket.Socket, messageData message.Me return xerrors.Errorf(publishError, err) } - h.Lock() h.hubInbox.StoreMessage(publish.Params.Message) h.messageIdsByChannel.Add(publish.Params.Channel, publish.Params.Message.MessageID) - h.Unlock() return nil } diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index 11fc7d768e..2f6f1c4e81 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -58,7 +58,7 @@ type Hub struct { messageChan chan socket.IncomingMessage sync.RWMutex - channelByID map[string]channel.Channel + channelByID helpers.ChannelsById closedSockets chan string @@ -120,6 +120,39 @@ type queries struct { nextID int } +func (q *queries) getQueryState(id int) *bool { + q.Lock() + defer q.Unlock() + + return q.state[id] +} + +// getNextID returns the next query ID +func (q *queries) getNextID() int { + q.Lock() + defer q.Unlock() + + id := q.nextID + q.nextID++ + return id +} + +// setQueryState sets the state of the query with the given ID +func (q *queries) setQueryState(id int, state bool) { + q.Lock() + defer q.Unlock() + + q.state[id] = &state +} + +// addQuery adds the given query to the table +func (q *queries) addQuery(id int, query method.GetMessagesById) { + q.Lock() + defer q.Unlock() + + q.getMessagesByIdQueries[id] = query +} + // NewHub returns a new Hub. func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAddress string, log zerolog.Logger, laoFac channel.LaoFactory) (*Hub, error) { @@ -137,7 +170,7 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd clientServerAddress: clientServerAddress, serverServerAddress: serverServerAddress, messageChan: make(chan socket.IncomingMessage), - channelByID: make(map[string]channel.Channel), + channelByID: helpers.NewChannelsById(), closedSockets: make(chan string), pubKeyOwner: pubKeyOwner, pubKeyServ: pubServ, @@ -194,12 +227,10 @@ func (h *Hub) Start() { } }() case id := <-h.closedSockets: - h.RLock() - for _, channel := range h.channelByID { + for _, channel := range h.channelByID.GetAll() { // dummy Unsubscribe message because it's only used for logging... channel.Unsubscribe(id, method.Unsubscribe{}) } - h.RUnlock() case <-h.stop: h.log.Info().Msg("stopping the hub") return @@ -260,9 +291,6 @@ func (h *Hub) OnSocketClose() chan<- string { // SendGreetServer implements hub.Hub func (h *Hub) SendGreetServer(socket socket.Socket) error { - h.Lock() - defer h.Unlock() - pk, err := h.pubKeyServ.MarshalBinary() if err != nil { return xerrors.Errorf("failed to marshal server public key: %v", err) @@ -300,10 +328,7 @@ func (h *Hub) getChan(channelPath string) (channel.Channel, error) { return nil, xerrors.Errorf("channel not prefixed with '%s': %q", rootPrefix, channelPath) } - h.RLock() - defer h.RUnlock() - - channel, ok := h.channelByID[channelPath] + channel, ok := h.channelByID.GetChannel(channelPath) if !ok { return nil, xerrors.Errorf("channel %s does not exist", channelPath) } @@ -491,12 +516,8 @@ func (h *Hub) handleIncomingMessage(incomingMessage *socket.IncomingMessage) err // sendGetMessagesByIdToServer sends a getMessagesById message to a server func (h *Hub) sendGetMessagesByIdToServer(socket socket.Socket, missingIds map[string][]string) error { - h.Lock() - defer h.Unlock() - - queryId := h.queries.nextID - baseValue := false - h.queries.state[queryId] = &baseValue + queryId := h.queries.getNextID() + h.queries.setQueryState(queryId, false) getMessagesById := method.GetMessagesById{ Base: query.Base{ @@ -514,8 +535,7 @@ func (h *Hub) sendGetMessagesByIdToServer(socket socket.Socket, missingIds map[s return xerrors.Errorf("failed to marshal getMessagesById query: %v", err) } - h.queries.getMessagesByIdQueries[queryId] = getMessagesById - h.queries.nextID++ + h.queries.addQuery(queryId, getMessagesById) socket.Send(buf) @@ -550,7 +570,7 @@ func (h *Hub) createLao(msg message.Message, laoCreate messagedata.LaoCreate, laoChannelPath := rootPrefix + laoCreate.ID - if _, ok := h.channelByID[laoChannelPath]; ok { + if _, ok := h.channelByID.GetChannel(laoChannelPath); ok { return answer.NewDuplicateResourceError("failed to create lao: duplicate lao path: %q", laoChannelPath) } @@ -631,16 +651,12 @@ func (h *Hub) GetSchemaValidator() validation.SchemaValidator { // NotifyNewChannel implements channel.HubFunctionalities func (h *Hub) NotifyNewChannel(channelID string, channel channel.Channel, sock socket.Socket) { - h.Lock() - h.channelByID[channelID] = channel - h.Unlock() + h.channelByID.Add(channelID, channel) } // NotifyWitnessMessage implements channel.HubFunctionalities func (h *Hub) NotifyWitnessMessage(messageId string, publicKey string, signature string) { - h.Lock() h.hubInbox.AddWitnessSignature(messageId, publicKey, signature) - h.Unlock() } func (h *Hub) GetPeersInfo() []method.ServerInfo { diff --git a/be1-go/hub/standard_hub/mod_test.go b/be1-go/hub/standard_hub/mod_test.go index 11aadd67c2..f0165979e4 100644 --- a/be1-go/hub/standard_hub/mod_test.go +++ b/be1-go/hub/standard_hub/mod_test.go @@ -729,8 +729,10 @@ func Test_Create_LAO(t *testing.T) { // the server should have saved the channel locally - require.Contains(t, hub.channelByID, rootPrefix+data.ID) - require.Equal(t, fakeChannelFac.c, hub.channelByID[rootPrefix+data.ID]) + require.Contains(t, hub.channelByID.GetAll(), rootPrefix+data.ID) + + channel, _ := hub.channelByID.GetChannel(rootPrefix + data.ID) + require.Equal(t, fakeChannelFac.c, channel) } func Test_Wrong_Root_Publish(t *testing.T) { @@ -743,7 +745,7 @@ func Test_Wrong_Root_Publish(t *testing.T) { laoID := "/root" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Add(rootPrefix+laoID, c) data := messagedata.LaoState{ Object: messagedata.LAOObject, @@ -874,14 +876,13 @@ func Test_Handle_Answer(t *testing.T) { answerBisBuf, err := json.Marshal(serverAnswerBis) require.NoError(t, err) - queryState := false - hub.queries.state[1] = &queryState - hub.queries.getMessagesByIdQueries[1] = method.GetMessagesById{ + hub.queries.setQueryState(1, false) + query := method.GetMessagesById{ Base: query.Base{}, ID: 1, Params: nil, } - + hub.queries.addQuery(1, query) sock := &fakeSocket{} hub.handleMessageFromClient(&socket.IncomingMessage{ @@ -890,21 +891,21 @@ func Test_Handle_Answer(t *testing.T) { }) require.Error(t, sock.err, "rpc message sent by a client should be a query") sock.err = nil - require.False(t, queryState) + require.False(t, *hub.queries.getQueryState(1)) hub.handleMessageFromServer(&socket.IncomingMessage{ Socket: sock, Message: resultBuf, }) require.NoError(t, sock.err) - require.False(t, queryState) + require.False(t, *hub.queries.getQueryState(1)) hub.handleMessageFromServer(&socket.IncomingMessage{ Socket: sock, Message: answerBuf, }) require.NoError(t, sock.err) - require.True(t, queryState) + require.True(t, *hub.queries.getQueryState(1)) hub.handleMessageFromServer(&socket.IncomingMessage{ Socket: sock, @@ -932,7 +933,7 @@ func Test_Handle_Publish_From_Client(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Add(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) @@ -999,7 +1000,7 @@ func Test_Handle_Publish_From_Server(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Add(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) @@ -1066,7 +1067,7 @@ func Test_Receive_Publish_Twice(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Add(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) @@ -1191,9 +1192,8 @@ func Test_Create_LAO_GetMessagesById_Result(t *testing.T) { Params: missingMessages, } - queryState := false - hub.queries.state[1] = &queryState - hub.queries.getMessagesByIdQueries[1] = getMessagesByIdQuery + hub.queries.setQueryState(1, false) + hub.queries.addQuery(1, getMessagesByIdQuery) ans := struct { JSONRPC string `json:"jsonrpc"` @@ -1228,8 +1228,9 @@ func Test_Create_LAO_GetMessagesById_Result(t *testing.T) { // the server should have saved the channel locally - require.Contains(t, hub.channelByID, rootPrefix+data.ID) - require.Equal(t, fakeChannelFac.c, hub.channelByID[rootPrefix+data.ID]) + require.Contains(t, hub.channelByID.GetAll(), rootPrefix+data.ID) + channel, _ := hub.channelByID.GetChannel(rootPrefix + laoID) + require.Equal(t, fakeChannelFac.c, channel) } // Tests that an answer to a getMessagesById without a valid message id returns an error @@ -1294,9 +1295,8 @@ func Test_Create_LAO_GetMessagesById_Wrong_MessageID(t *testing.T) { Params: missingMessages, } - queryState := false - hub.queries.state[1] = &queryState - hub.queries.getMessagesByIdQueries[1] = getMessagesByIdQuery + hub.queries.setQueryState(1, false) + hub.queries.addQuery(1, getMessagesByIdQuery) ans := struct { JSONRPC string `json:"jsonrpc"` @@ -1334,7 +1334,7 @@ func Test_Handle_Subscribe(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Add(rootPrefix+laoID, c) subscribe := method.Subscribe{ Base: query.Base{ @@ -1397,7 +1397,7 @@ func TestServer_Handle_Unsubscribe(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Add(rootPrefix+laoID, c) unsubscribe := method.Unsubscribe{ Base: query.Base{ @@ -1471,7 +1471,7 @@ func TestServer_Handle_Catchup(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Add(rootPrefix+laoID, c) catchup := method.Catchup{ Base: query.Base{ @@ -1543,7 +1543,7 @@ func Test_Send_And_Handle_Message(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Add(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) diff --git a/be1-go/inbox/mod.go b/be1-go/inbox/mod.go index 0639e9c4de..4079671be6 100644 --- a/be1-go/inbox/mod.go +++ b/be1-go/inbox/mod.go @@ -38,9 +38,9 @@ func NewInbox(channelID string) *Inbox { // `messageID`. If the message was not yet received, the signature is added // to the pending signatures map. func (i *Inbox) AddWitnessSignature(messageID string, public string, signature string) { - msg, ok := i.GetMessage(messageID) i.mutex.Lock() defer i.mutex.Unlock() + msg, ok := i.GetMessage(messageID) if !ok { // Add the signature to the pending signatures i.pendingSignatures[messageID] = append(i.pendingSignatures[messageID], message.WitnessSignature{ From 5c912bb7e51a6b285ab902f8191171ac95db0495 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Tue, 10 Oct 2023 20:46:20 +0200 Subject: [PATCH 06/28] fix inbox tests --- be1-go/inbox/mod.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be1-go/inbox/mod.go b/be1-go/inbox/mod.go index 4079671be6..e51cd43e44 100644 --- a/be1-go/inbox/mod.go +++ b/be1-go/inbox/mod.go @@ -40,7 +40,7 @@ func NewInbox(channelID string) *Inbox { func (i *Inbox) AddWitnessSignature(messageID string, public string, signature string) { i.mutex.Lock() defer i.mutex.Unlock() - msg, ok := i.GetMessage(messageID) + msg, ok := i.msgsMap[messageID] if !ok { // Add the signature to the pending signatures i.pendingSignatures[messageID] = append(i.pendingSignatures[messageID], message.WitnessSignature{ @@ -48,7 +48,7 @@ func (i *Inbox) AddWitnessSignature(messageID string, public string, signature s Signature: signature, }) } else { - msg.WitnessSignatures = append(msg.WitnessSignatures, message.WitnessSignature{ + msg.message.WitnessSignatures = append(msg.message.WitnessSignatures, message.WitnessSignature{ Witness: public, Signature: signature, }) From f15fd575dcd8b303a68400ea77b891b99de6c025 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Tue, 10 Oct 2023 21:38:10 +0200 Subject: [PATCH 07/28] debugging --- be1-go/configServer1.json | 2 +- be1-go/configServer2.json | 4 +--- be1-go/configServer3.json | 12 ++++++++++++ be1-go/configServer4.json | 12 ++++++++++++ be1-go/hub/standard_hub/mod.go | 2 +- 5 files changed, 27 insertions(+), 5 deletions(-) create mode 100644 be1-go/configServer3.json create mode 100644 be1-go/configServer4.json diff --git a/be1-go/configServer1.json b/be1-go/configServer1.json index fb8796572b..ff84e0749b 100644 --- a/be1-go/configServer1.json +++ b/be1-go/configServer1.json @@ -8,5 +8,5 @@ "client-port" : 9000, "server-port" : 9001, "auth-port" : 9100, - "other-servers": [] + "other-servers": ["localhost:9003"] } \ No newline at end of file diff --git a/be1-go/configServer2.json b/be1-go/configServer2.json index 5238986b74..ab1e0434a4 100644 --- a/be1-go/configServer2.json +++ b/be1-go/configServer2.json @@ -8,7 +8,5 @@ "client-port" : 9002, "server-port" : 9003, "auth-port" : 9101, - "other-servers": [ - "localhost:9001" - ] + "other-servers": ["localhost:9001"] } \ No newline at end of file diff --git a/be1-go/configServer3.json b/be1-go/configServer3.json new file mode 100644 index 0000000000..68e2e68be3 --- /dev/null +++ b/be1-go/configServer3.json @@ -0,0 +1,12 @@ +{ + "public-key" : "", + "server-address" : "ws://127.0.0.1:9003/server", + "client-address" : "ws://127.0.0.1:9002/client", + "server-public-address" : "localhost", + "server-listen-address" : "localhost", + "auth-server-address" : "localhost", + "client-port" : 9004, + "server-port" : 9005, + "auth-port" : 9102, + "other-servers": ["localhost:9001", "localhost:9003", "localhost:9007"] +} \ No newline at end of file diff --git a/be1-go/configServer4.json b/be1-go/configServer4.json new file mode 100644 index 0000000000..a9c2a2b7f6 --- /dev/null +++ b/be1-go/configServer4.json @@ -0,0 +1,12 @@ +{ + "public-key" : "", + "server-address" : "ws://127.0.0.1:9003/server", + "client-address" : "ws://127.0.0.1:9002/client", + "server-public-address" : "localhost", + "server-listen-address" : "localhost", + "auth-server-address" : "localhost", + "client-port" : 9006, + "server-port" : 9007, + "auth-port" : 9103, + "other-servers": ["localhost:9001", "localhost:9005", "localhost:9003"] +} \ No newline at end of file diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index 2f6f1c4e81..b199a292d5 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -501,7 +501,7 @@ func (h *Hub) handleMessageFromServer(incomingMessage *socket.IncomingMessage) e func (h *Hub) handleIncomingMessage(incomingMessage *socket.IncomingMessage) error { defer h.workers.Release(1) - h.log.Info().Str("msg", string(incomingMessage.Message)).Msg("handle incoming message") + h.log.Info().Str("msg", string(incomingMessage.Message)).Msgf("handle incoming message") switch incomingMessage.Socket.Type() { case socket.ClientSocketType: From 864ec8cfb4c851945c9ccdfcd4e7472920855e55 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Tue, 10 Oct 2023 22:49:37 +0200 Subject: [PATCH 08/28] Add hub_state --- be1-go/.gitignore | 4 +- be1-go/channel/lao/mod.go | 8 +- .../standard_hub/helpers/data_structures.go | 142 ------------------ .../standard_hub/hub_state/channels_by_id.go | 40 +++++ .../standard_hub/hub_state/ids_by_channel.go | 54 +++++++ be1-go/hub/standard_hub/hub_state/peers.go | 54 +++++++ be1-go/hub/standard_hub/hub_state/queries.go | 60 ++++++++ be1-go/hub/standard_hub/mod.go | 77 ++-------- 8 files changed, 229 insertions(+), 210 deletions(-) delete mode 100644 be1-go/hub/standard_hub/helpers/data_structures.go create mode 100644 be1-go/hub/standard_hub/hub_state/channels_by_id.go create mode 100644 be1-go/hub/standard_hub/hub_state/ids_by_channel.go create mode 100644 be1-go/hub/standard_hub/hub_state/peers.go create mode 100644 be1-go/hub/standard_hub/hub_state/queries.go diff --git a/be1-go/.gitignore b/be1-go/.gitignore index 96c0aa4fe9..c99d6a4e40 100644 --- a/be1-go/.gitignore +++ b/be1-go/.gitignore @@ -4,4 +4,6 @@ report.json coverage.out coverage.html configServer1.json -configServer2.json \ No newline at end of file +configServer2.json +configServer3.json +configServer4.json \ No newline at end of file diff --git a/be1-go/channel/lao/mod.go b/be1-go/channel/lao/mod.go index 58e73bde74..53004a3ad5 100644 --- a/be1-go/channel/lao/mod.go +++ b/be1-go/channel/lao/mod.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "golang.org/x/exp/slices" popstellar "popstellar" "popstellar/channel" "popstellar/channel/authentication" @@ -705,9 +706,12 @@ func (c *Channel) createAndSendLAOGreet() error { peers := []messagedata.Peer{} for _, info := range c.hub.GetPeersInfo() { - peers = append(peers, messagedata.Peer{ + peer := messagedata.Peer{ Address: info.ClientAddress, - }) + } + if !slices.Contains(peers, peer) { + peers = append(peers, peer) + } } msgData := messagedata.LaoGreet{ diff --git a/be1-go/hub/standard_hub/helpers/data_structures.go b/be1-go/hub/standard_hub/helpers/data_structures.go deleted file mode 100644 index f8eebd9e06..0000000000 --- a/be1-go/hub/standard_hub/helpers/data_structures.go +++ /dev/null @@ -1,142 +0,0 @@ -package helpers - -import ( - "maps" - "popstellar/channel" - "popstellar/message/query/method" - "slices" - "sync" -) - -type Peers struct { - sync.RWMutex - // peersInfo stores the info of the peers: public key, client and server endpoints associated with the socket ID - peersInfo map[string]method.ServerInfo - // peersGreeted stores the peers that were greeted by the socket ID - peersGreeted []string -} - -func NewPeers() Peers { - return Peers{ - peersInfo: make(map[string]method.ServerInfo), - peersGreeted: make([]string, 0), - } -} - -func (p *Peers) AddPeerInfo(socketId string, info method.ServerInfo) { - p.Lock() - defer p.Unlock() - p.peersInfo[socketId] = info -} - -func (p *Peers) AddPeerGreeted(socketId string) { - p.Lock() - defer p.Unlock() - p.peersGreeted = append(p.peersGreeted, socketId) -} - -func (p *Peers) GetAllPeersInfo() []method.ServerInfo { - p.RLock() - defer p.RUnlock() - peersInfo := make([]method.ServerInfo, 0) - for _, info := range p.peersInfo { - peersInfo = append(peersInfo, info) - } - return peersInfo -} - -func (p *Peers) IsPeerGreeted(socketId string) bool { - p.RLock() - defer p.RUnlock() - return slices.Contains(p.peersGreeted, socketId) -} - -type ChannelsById struct { - sync.RWMutex - table map[string]channel.Channel -} - -func NewChannelsById() ChannelsById { - return ChannelsById{ - table: make(map[string]channel.Channel), - } -} - -func (c *ChannelsById) Add(channelId string, channel channel.Channel) { - c.Lock() - defer c.Unlock() - c.table[channelId] = channel -} - -func (c *ChannelsById) GetChannel(channelId string) (channel.Channel, bool) { - c.RLock() - defer c.RUnlock() - channel, ok := c.table[channelId] - return channel, ok -} - -func (c *ChannelsById) GetAll() map[string]channel.Channel { - c.RLock() - defer c.RUnlock() - channelsCopy := make(map[string]channel.Channel) - maps.Copy(channelsCopy, c.table) - return channelsCopy -} - -// MessageIds provides a thread-safe structure that stores a channel id with its corresponding message ids -type IdsByChannel struct { - sync.RWMutex - table map[string][]string -} - -func NewIdsByChannel() IdsByChannel { - return IdsByChannel{ - table: make(map[string][]string), - } -} - -func (i *IdsByChannel) Add(channel string, id string) { - i.Lock() - defer i.Unlock() - messageIds, channelStored := i.table[channel] - if !channelStored { - i.table[channel] = append(i.table[channel], id) - return - } - alreadyStoredId := slices.Contains(messageIds, id) - if !alreadyStoredId { - i.table[channel] = append(i.table[channel], id) - } -} - -func (i *IdsByChannel) AddAll(channel string, ids []string) { - i.Lock() - defer i.Unlock() - i.table[channel] = append(i.table[channel], ids...) -} - -func (i *IdsByChannel) GetAll() map[string][]string { - i.RLock() - defer i.RUnlock() - tableCopy := make(map[string][]string) - maps.Copy(tableCopy, i.table) - return tableCopy -} - -func (i *IdsByChannel) IsEmpty() bool { - i.RLock() - defer i.RUnlock() - - return len(i.table) == 0 -} - -type ChannelByID struct { - sync.RWMutex - table map[string]channel.Channel -} - -func NewChannelById() ChannelByID { - return ChannelByID{ - table: make(map[string]channel.Channel), - } -} diff --git a/be1-go/hub/standard_hub/hub_state/channels_by_id.go b/be1-go/hub/standard_hub/hub_state/channels_by_id.go new file mode 100644 index 0000000000..67ba6b379d --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/channels_by_id.go @@ -0,0 +1,40 @@ +package hub_state + +import ( + "maps" + "popstellar/channel" + "sync" +) + +// ChannelsById provides a thread-safe structure that stores channel ids with their corresponding channels +type ChannelsById struct { + sync.RWMutex + table map[string]channel.Channel +} + +func NewChannelsById() ChannelsById { + return ChannelsById{ + table: make(map[string]channel.Channel), + } +} + +func (c *ChannelsById) Add(channelId string, channel channel.Channel) { + c.Lock() + defer c.Unlock() + c.table[channelId] = channel +} + +func (c *ChannelsById) GetChannel(channelId string) (channel.Channel, bool) { + c.RLock() + defer c.RUnlock() + channel, ok := c.table[channelId] + return channel, ok +} + +func (c *ChannelsById) GetAll() map[string]channel.Channel { + c.RLock() + defer c.RUnlock() + channelsCopy := make(map[string]channel.Channel) + maps.Copy(channelsCopy, c.table) + return channelsCopy +} diff --git a/be1-go/hub/standard_hub/hub_state/ids_by_channel.go b/be1-go/hub/standard_hub/hub_state/ids_by_channel.go new file mode 100644 index 0000000000..f208374424 --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/ids_by_channel.go @@ -0,0 +1,54 @@ +package hub_state + +import ( + "golang.org/x/exp/slices" + "maps" + "sync" +) + +// IdsByChannel provides a thread-safe structure that stores a channel id with its corresponding message ids +type IdsByChannel struct { + sync.RWMutex + table map[string][]string +} + +func NewIdsByChannel() IdsByChannel { + return IdsByChannel{ + table: make(map[string][]string), + } +} + +func (i *IdsByChannel) Add(channel string, id string) { + i.Lock() + defer i.Unlock() + messageIds, channelStored := i.table[channel] + if !channelStored { + i.table[channel] = append(i.table[channel], id) + return + } + alreadyStoredId := slices.Contains(messageIds, id) + if !alreadyStoredId { + i.table[channel] = append(i.table[channel], id) + } +} + +func (i *IdsByChannel) AddAll(channel string, ids []string) { + i.Lock() + defer i.Unlock() + i.table[channel] = append(i.table[channel], ids...) +} + +func (i *IdsByChannel) GetAll() map[string][]string { + i.RLock() + defer i.RUnlock() + tableCopy := make(map[string][]string) + maps.Copy(tableCopy, i.table) + return tableCopy +} + +func (i *IdsByChannel) IsEmpty() bool { + i.RLock() + defer i.RUnlock() + + return len(i.table) == 0 +} diff --git a/be1-go/hub/standard_hub/hub_state/peers.go b/be1-go/hub/standard_hub/hub_state/peers.go new file mode 100644 index 0000000000..c76d1941eb --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/peers.go @@ -0,0 +1,54 @@ +package hub_state + +import ( + "golang.org/x/exp/slices" + "popstellar/message/query/method" + "sync" +) + +// Peers provides a thread-safe structure that stores the peers' information +type Peers struct { + sync.RWMutex + // peersInfo stores the info of the peers: public key, client and server endpoints associated with the socket ID + peersInfo map[string]method.ServerInfo + // peersGreeted stores the peers that were greeted by the socket ID + peersGreeted []string +} + +func NewPeers() Peers { + return Peers{ + peersInfo: make(map[string]method.ServerInfo), + peersGreeted: make([]string, 0), + } +} + +func (p *Peers) AddPeerInfo(socketId string, info method.ServerInfo) { + p.Lock() + defer p.Unlock() + p.peersInfo[socketId] = info +} + +func (p *Peers) AddPeerGreeted(socketId string) { + p.Lock() + defer p.Unlock() + if slices.Contains(p.peersGreeted, socketId) { + return + } + p.peersGreeted = append(p.peersGreeted, socketId) +} + +func (p *Peers) GetAllPeersInfo() []method.ServerInfo { + p.RLock() + defer p.RUnlock() + peersInfo := make([]method.ServerInfo, 0) + for _, info := range p.peersInfo { + peersInfo = append(peersInfo, info) + } + return peersInfo +} + +func (p *Peers) IsPeerGreeted(socketId string) bool { + p.RLock() + defer p.RUnlock() + return slices.Contains(p.peersGreeted, socketId) +} diff --git a/be1-go/hub/standard_hub/hub_state/queries.go b/be1-go/hub/standard_hub/hub_state/queries.go new file mode 100644 index 0000000000..7f7de69496 --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/queries.go @@ -0,0 +1,60 @@ +package hub_state + +import ( + "popstellar/message/query/method" + "sync" +) + +// Queries let the hub remember all queries that it sent to other servers +type Queries struct { + sync.Mutex + // state stores the ID of the server's queries and their state. False for a + // query not yet answered, else true. + state map[int]*bool + // getMessagesByIdQueries stores the server's getMessagesByIds queries by their ID. + getMessagesByIdQueries map[int]method.GetMessagesById + // nextID store the ID of the next query + nextID int +} + +// NewQueries creates a new queries struct +func NewQueries() Queries { + return Queries{ + state: make(map[int]*bool), + getMessagesByIdQueries: make(map[int]method.GetMessagesById), + } +} + +// GetQueryState returns a given query's state +func (q *Queries) GetQueryState(id int) *bool { + q.Lock() + defer q.Unlock() + + return q.state[id] +} + +// GetNextID returns the next query ID +func (q *Queries) GetNextID() int { + q.Lock() + defer q.Unlock() + + id := q.nextID + q.nextID++ + return id +} + +// SetQueryState sets the state of the query with the given ID +func (q *Queries) SetQueryState(id int, state bool) { + q.Lock() + defer q.Unlock() + + q.state[id] = &state +} + +// AddQuery adds the given query to the table +func (q *Queries) AddQuery(id int, query method.GetMessagesById) { + q.Lock() + defer q.Unlock() + + q.getMessagesByIdQueries[id] = query +} diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index b199a292d5..97fcd956df 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -6,7 +6,7 @@ import ( "encoding/json" "popstellar/channel" "popstellar/crypto" - "popstellar/hub/standard_hub/helpers" + state "popstellar/hub/standard_hub/hub_state" "popstellar/inbox" jsonrpc "popstellar/message" "popstellar/message/answer" @@ -58,7 +58,7 @@ type Hub struct { messageChan chan socket.IncomingMessage sync.RWMutex - channelByID helpers.ChannelsById + channelByID state.ChannelsById closedSockets chan string @@ -84,14 +84,14 @@ type Hub struct { // rootInbox and queries are used to help servers catchup to each other rootInbox inbox.Inbox - queries queries + queries state.Queries // messageIdsByChannel stores all the message ids and the corresponding channel ids // to help servers determine in which channel the message ids go - messageIdsByChannel helpers.IdsByChannel + messageIdsByChannel state.IdsByChannel // peers stores information about the peers - peers helpers.Peers + peers state.Peers // blacklist stores the IDs of the messages that failed to be processed by the hub // the server will not ask for them again in the heartbeat @@ -100,59 +100,6 @@ type Hub struct { blacklist []string } -// newQueries creates a new queries struct -func newQueries() queries { - return queries{ - state: make(map[int]*bool), - getMessagesByIdQueries: make(map[int]method.GetMessagesById), - } -} - -// queries let the hub remember all queries that it sent to other servers -type queries struct { - sync.Mutex - // state stores the ID of the server's queries and their state. False for a - // query not yet answered, else true. - state map[int]*bool - // getMessagesByIdQueries stores the server's getMessagesByIds queries by their ID. - getMessagesByIdQueries map[int]method.GetMessagesById - // nextID store the ID of the next query - nextID int -} - -func (q *queries) getQueryState(id int) *bool { - q.Lock() - defer q.Unlock() - - return q.state[id] -} - -// getNextID returns the next query ID -func (q *queries) getNextID() int { - q.Lock() - defer q.Unlock() - - id := q.nextID - q.nextID++ - return id -} - -// setQueryState sets the state of the query with the given ID -func (q *queries) setQueryState(id int, state bool) { - q.Lock() - defer q.Unlock() - - q.state[id] = &state -} - -// addQuery adds the given query to the table -func (q *queries) addQuery(id int, query method.GetMessagesById) { - q.Lock() - defer q.Unlock() - - q.getMessagesByIdQueries[id] = query -} - // NewHub returns a new Hub. func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAddress string, log zerolog.Logger, laoFac channel.LaoFactory) (*Hub, error) { @@ -170,7 +117,7 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd clientServerAddress: clientServerAddress, serverServerAddress: serverServerAddress, messageChan: make(chan socket.IncomingMessage), - channelByID: helpers.NewChannelsById(), + channelByID: state.NewChannelsById(), closedSockets: make(chan string), pubKeyOwner: pubKeyOwner, pubKeyServ: pubServ, @@ -183,9 +130,9 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd serverSockets: channel.NewSockets(), hubInbox: *inbox.NewInbox(rootChannel), rootInbox: *inbox.NewInbox(rootChannel), - queries: newQueries(), - messageIdsByChannel: helpers.NewIdsByChannel(), - peers: helpers.NewPeers(), + queries: state.NewQueries(), + messageIdsByChannel: state.NewIdsByChannel(), + peers: state.NewPeers(), blacklist: make([]string, 0), } @@ -516,8 +463,8 @@ func (h *Hub) handleIncomingMessage(incomingMessage *socket.IncomingMessage) err // sendGetMessagesByIdToServer sends a getMessagesById message to a server func (h *Hub) sendGetMessagesByIdToServer(socket socket.Socket, missingIds map[string][]string) error { - queryId := h.queries.getNextID() - h.queries.setQueryState(queryId, false) + queryId := h.queries.GetNextID() + h.queries.SetQueryState(queryId, false) getMessagesById := method.GetMessagesById{ Base: query.Base{ @@ -535,7 +482,7 @@ func (h *Hub) sendGetMessagesByIdToServer(socket socket.Socket, missingIds map[s return xerrors.Errorf("failed to marshal getMessagesById query: %v", err) } - h.queries.addQuery(queryId, getMessagesById) + h.queries.AddQuery(queryId, getMessagesById) socket.Send(buf) From a45c2d9296b8db05b17d533cfff900a46e5bafb3 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Tue, 10 Oct 2023 22:51:13 +0200 Subject: [PATCH 09/28] remove additional config files --- be1-go/.gitignore | 4 +--- be1-go/configServer3.json | 12 ------------ be1-go/configServer4.json | 12 ------------ 3 files changed, 1 insertion(+), 27 deletions(-) delete mode 100644 be1-go/configServer3.json delete mode 100644 be1-go/configServer4.json diff --git a/be1-go/.gitignore b/be1-go/.gitignore index c99d6a4e40..96c0aa4fe9 100644 --- a/be1-go/.gitignore +++ b/be1-go/.gitignore @@ -4,6 +4,4 @@ report.json coverage.out coverage.html configServer1.json -configServer2.json -configServer3.json -configServer4.json \ No newline at end of file +configServer2.json \ No newline at end of file diff --git a/be1-go/configServer3.json b/be1-go/configServer3.json deleted file mode 100644 index 68e2e68be3..0000000000 --- a/be1-go/configServer3.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "public-key" : "", - "server-address" : "ws://127.0.0.1:9003/server", - "client-address" : "ws://127.0.0.1:9002/client", - "server-public-address" : "localhost", - "server-listen-address" : "localhost", - "auth-server-address" : "localhost", - "client-port" : 9004, - "server-port" : 9005, - "auth-port" : 9102, - "other-servers": ["localhost:9001", "localhost:9003", "localhost:9007"] -} \ No newline at end of file diff --git a/be1-go/configServer4.json b/be1-go/configServer4.json deleted file mode 100644 index a9c2a2b7f6..0000000000 --- a/be1-go/configServer4.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "public-key" : "", - "server-address" : "ws://127.0.0.1:9003/server", - "client-address" : "ws://127.0.0.1:9002/client", - "server-public-address" : "localhost", - "server-listen-address" : "localhost", - "auth-server-address" : "localhost", - "client-port" : 9006, - "server-port" : 9007, - "auth-port" : 9103, - "other-servers": ["localhost:9001", "localhost:9005", "localhost:9003"] -} \ No newline at end of file From 25977d0440e754753ac4c3b2f764e6ef7b53edb9 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Tue, 10 Oct 2023 22:59:29 +0200 Subject: [PATCH 10/28] fix typo --- be1-go/hub/standard_hub/message_handling.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be1-go/hub/standard_hub/message_handling.go b/be1-go/hub/standard_hub/message_handling.go index 7ea6be3a3e..880f71b25a 100644 --- a/be1-go/hub/standard_hub/message_handling.go +++ b/be1-go/hub/standard_hub/message_handling.go @@ -189,7 +189,7 @@ func (h *Hub) handleAnswer(senderSocket socket.Socket, byteMessage []byte) error return nil } - val := h.queries.getQueryState(*answerMsg.ID) + val := h.queries.GetQueryState(*answerMsg.ID) if val == nil { return xerrors.Errorf("no query sent with id %v", answerMsg.ID) } @@ -198,7 +198,7 @@ func (h *Hub) handleAnswer(senderSocket socket.Socket, byteMessage []byte) error return xerrors.Errorf("query %v already got an answer", answerMsg.ID) } - h.queries.setQueryState(*answerMsg.ID, true) + h.queries.SetQueryState(*answerMsg.ID, true) err = h.handleGetMessagesByIdAnswer(senderSocket, answerMsg) if err != nil { From f853897e2717202c4ee748d7a25050bfc1761fd7 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Tue, 10 Oct 2023 23:23:23 +0200 Subject: [PATCH 11/28] Add documentation and fix CI --- be1-go/channel/lao/mod.go | 5 +++-- be1-go/configServer1.json | 2 +- be1-go/configServer2.json | 4 +++- .../standard_hub/hub_state/channels_by_id.go | 4 ++++ .../standard_hub/hub_state/ids_by_channel.go | 5 +++++ be1-go/hub/standard_hub/hub_state/peers.go | 5 +++++ be1-go/hub/standard_hub/mod_test.go | 18 +++++++++--------- 7 files changed, 30 insertions(+), 13 deletions(-) diff --git a/be1-go/channel/lao/mod.go b/be1-go/channel/lao/mod.go index 53004a3ad5..8ae31330ac 100644 --- a/be1-go/channel/lao/mod.go +++ b/be1-go/channel/lao/mod.go @@ -709,9 +709,10 @@ func (c *Channel) createAndSendLAOGreet() error { peer := messagedata.Peer{ Address: info.ClientAddress, } - if !slices.Contains(peers, peer) { - peers = append(peers, peer) + if slices.Contains(peers, peer) { + continue } + peers = append(peers, peer) } msgData := messagedata.LaoGreet{ diff --git a/be1-go/configServer1.json b/be1-go/configServer1.json index ff84e0749b..fb8796572b 100644 --- a/be1-go/configServer1.json +++ b/be1-go/configServer1.json @@ -8,5 +8,5 @@ "client-port" : 9000, "server-port" : 9001, "auth-port" : 9100, - "other-servers": ["localhost:9003"] + "other-servers": [] } \ No newline at end of file diff --git a/be1-go/configServer2.json b/be1-go/configServer2.json index ab1e0434a4..5238986b74 100644 --- a/be1-go/configServer2.json +++ b/be1-go/configServer2.json @@ -8,5 +8,7 @@ "client-port" : 9002, "server-port" : 9003, "auth-port" : 9101, - "other-servers": ["localhost:9001"] + "other-servers": [ + "localhost:9001" + ] } \ No newline at end of file diff --git a/be1-go/hub/standard_hub/hub_state/channels_by_id.go b/be1-go/hub/standard_hub/hub_state/channels_by_id.go index 67ba6b379d..e0f5d40a4c 100644 --- a/be1-go/hub/standard_hub/hub_state/channels_by_id.go +++ b/be1-go/hub/standard_hub/hub_state/channels_by_id.go @@ -12,18 +12,21 @@ type ChannelsById struct { table map[string]channel.Channel } +// NewChannelsById creates a new ChannelsById structure func NewChannelsById() ChannelsById { return ChannelsById{ table: make(map[string]channel.Channel), } } +// Add adds a channel to the table func (c *ChannelsById) Add(channelId string, channel channel.Channel) { c.Lock() defer c.Unlock() c.table[channelId] = channel } +// GetChannel returns a channel from the table if it exists, otherwise it returns false and nil func (c *ChannelsById) GetChannel(channelId string) (channel.Channel, bool) { c.RLock() defer c.RUnlock() @@ -31,6 +34,7 @@ func (c *ChannelsById) GetChannel(channelId string) (channel.Channel, bool) { return channel, ok } +// GetAll returns a copy of the table func (c *ChannelsById) GetAll() map[string]channel.Channel { c.RLock() defer c.RUnlock() diff --git a/be1-go/hub/standard_hub/hub_state/ids_by_channel.go b/be1-go/hub/standard_hub/hub_state/ids_by_channel.go index f208374424..18fca96af9 100644 --- a/be1-go/hub/standard_hub/hub_state/ids_by_channel.go +++ b/be1-go/hub/standard_hub/hub_state/ids_by_channel.go @@ -12,12 +12,14 @@ type IdsByChannel struct { table map[string][]string } +// NewIdsByChannel creates a new IdsByChannel structure func NewIdsByChannel() IdsByChannel { return IdsByChannel{ table: make(map[string][]string), } } +// Add adds a message id to the table func (i *IdsByChannel) Add(channel string, id string) { i.Lock() defer i.Unlock() @@ -32,12 +34,14 @@ func (i *IdsByChannel) Add(channel string, id string) { } } +// AddAll adds a slice of message ids to the table func (i *IdsByChannel) AddAll(channel string, ids []string) { i.Lock() defer i.Unlock() i.table[channel] = append(i.table[channel], ids...) } +// GetAll returns a copy of the table func (i *IdsByChannel) GetAll() map[string][]string { i.RLock() defer i.RUnlock() @@ -46,6 +50,7 @@ func (i *IdsByChannel) GetAll() map[string][]string { return tableCopy } +// IsEmpty returns true if the table is empty, otherwise it returns false func (i *IdsByChannel) IsEmpty() bool { i.RLock() defer i.RUnlock() diff --git a/be1-go/hub/standard_hub/hub_state/peers.go b/be1-go/hub/standard_hub/hub_state/peers.go index c76d1941eb..24127ddca8 100644 --- a/be1-go/hub/standard_hub/hub_state/peers.go +++ b/be1-go/hub/standard_hub/hub_state/peers.go @@ -15,6 +15,7 @@ type Peers struct { peersGreeted []string } +// NewPeers creates a new Peers structure func NewPeers() Peers { return Peers{ peersInfo: make(map[string]method.ServerInfo), @@ -22,12 +23,14 @@ func NewPeers() Peers { } } +// AddPeerInfo adds a peer's info to the table func (p *Peers) AddPeerInfo(socketId string, info method.ServerInfo) { p.Lock() defer p.Unlock() p.peersInfo[socketId] = info } +// AddPeerGreeted adds a peer's socket ID to the slice of peers greeted func (p *Peers) AddPeerGreeted(socketId string) { p.Lock() defer p.Unlock() @@ -37,6 +40,7 @@ func (p *Peers) AddPeerGreeted(socketId string) { p.peersGreeted = append(p.peersGreeted, socketId) } +// GetAllPeersInfo returns a copy of the peers' info slice func (p *Peers) GetAllPeersInfo() []method.ServerInfo { p.RLock() defer p.RUnlock() @@ -47,6 +51,7 @@ func (p *Peers) GetAllPeersInfo() []method.ServerInfo { return peersInfo } +// IsPeerGreeted returns true if the peer was greeted, otherwise it returns false func (p *Peers) IsPeerGreeted(socketId string) bool { p.RLock() defer p.RUnlock() diff --git a/be1-go/hub/standard_hub/mod_test.go b/be1-go/hub/standard_hub/mod_test.go index f0165979e4..a38387b0bd 100644 --- a/be1-go/hub/standard_hub/mod_test.go +++ b/be1-go/hub/standard_hub/mod_test.go @@ -876,13 +876,13 @@ func Test_Handle_Answer(t *testing.T) { answerBisBuf, err := json.Marshal(serverAnswerBis) require.NoError(t, err) - hub.queries.setQueryState(1, false) + hub.queries.SetQueryState(1, false) query := method.GetMessagesById{ Base: query.Base{}, ID: 1, Params: nil, } - hub.queries.addQuery(1, query) + hub.queries.AddQuery(1, query) sock := &fakeSocket{} hub.handleMessageFromClient(&socket.IncomingMessage{ @@ -891,21 +891,21 @@ func Test_Handle_Answer(t *testing.T) { }) require.Error(t, sock.err, "rpc message sent by a client should be a query") sock.err = nil - require.False(t, *hub.queries.getQueryState(1)) + require.False(t, *hub.queries.GetQueryState(1)) hub.handleMessageFromServer(&socket.IncomingMessage{ Socket: sock, Message: resultBuf, }) require.NoError(t, sock.err) - require.False(t, *hub.queries.getQueryState(1)) + require.False(t, *hub.queries.GetQueryState(1)) hub.handleMessageFromServer(&socket.IncomingMessage{ Socket: sock, Message: answerBuf, }) require.NoError(t, sock.err) - require.True(t, *hub.queries.getQueryState(1)) + require.True(t, *hub.queries.GetQueryState(1)) hub.handleMessageFromServer(&socket.IncomingMessage{ Socket: sock, @@ -1192,8 +1192,8 @@ func Test_Create_LAO_GetMessagesById_Result(t *testing.T) { Params: missingMessages, } - hub.queries.setQueryState(1, false) - hub.queries.addQuery(1, getMessagesByIdQuery) + hub.queries.SetQueryState(1, false) + hub.queries.AddQuery(1, getMessagesByIdQuery) ans := struct { JSONRPC string `json:"jsonrpc"` @@ -1295,8 +1295,8 @@ func Test_Create_LAO_GetMessagesById_Wrong_MessageID(t *testing.T) { Params: missingMessages, } - hub.queries.setQueryState(1, false) - hub.queries.addQuery(1, getMessagesByIdQuery) + hub.queries.SetQueryState(1, false) + hub.queries.AddQuery(1, getMessagesByIdQuery) ans := struct { JSONRPC string `json:"jsonrpc"` From 2c6af8e7c29a36575e30712f8a99246b991b3222 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Wed, 11 Oct 2023 11:33:33 +0200 Subject: [PATCH 12/28] fix hub log --- be1-go/hub/standard_hub/mod.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index 97fcd956df..8660e3be06 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -448,7 +448,7 @@ func (h *Hub) handleMessageFromServer(incomingMessage *socket.IncomingMessage) e func (h *Hub) handleIncomingMessage(incomingMessage *socket.IncomingMessage) error { defer h.workers.Release(1) - h.log.Info().Str("msg", string(incomingMessage.Message)).Msgf("handle incoming message") + h.log.Info().Str("msg", string(incomingMessage.Message)).Msg("handle incoming message") switch incomingMessage.Socket.Type() { case socket.ClientSocketType: From feecb9897e829e3a1eda2d50102594dac3c8459e Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Sat, 14 Oct 2023 14:29:38 +0200 Subject: [PATCH 13/28] Add thread safe map --- .../standard_hub/hub_state/ThreadSafeMap.go | 43 ++++++++++++++ .../standard_hub/hub_state/channels_by_id.go | 44 -------------- be1-go/hub/standard_hub/hub_state/content.go | 32 ++++++++++ .../standard_hub/hub_state/ids_by_channel.go | 59 ------------------- 4 files changed, 75 insertions(+), 103 deletions(-) create mode 100644 be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go delete mode 100644 be1-go/hub/standard_hub/hub_state/channels_by_id.go create mode 100644 be1-go/hub/standard_hub/hub_state/content.go delete mode 100644 be1-go/hub/standard_hub/hub_state/ids_by_channel.go diff --git a/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go b/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go new file mode 100644 index 0000000000..8826a249e0 --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go @@ -0,0 +1,43 @@ +package hub_state + +import "sync" + +type ThreadSafeMap[K comparable, V any] struct { + sync.RWMutex + table map[K]V +} + +func NewThreadSafeMap[K comparable, V any]() ThreadSafeMap[K, V] { + return ThreadSafeMap[K, V]{ + table: make(map[K]V), + } +} + +func (t *ThreadSafeMap[K, V]) Get(key K) (V, bool) { + t.RLock() + defer t.RUnlock() + val, ok := t.table[key] + return val, ok +} + +func (t *ThreadSafeMap[K, V]) Set(key K, val V) { + t.Lock() + defer t.Unlock() + t.table[key] = val +} + +func (t *ThreadSafeMap[K, V]) GetTable() map[K]V { + t.Lock() + defer t.Unlock() + tableCopy := make(map[K]V) + for key, val := range t.table { + tableCopy[key] = val + } + return tableCopy +} + +func (t *ThreadSafeMap[K, V]) IsEmpty() bool { + t.Lock() + defer t.Unlock() + return len(t.table) == 0 +} diff --git a/be1-go/hub/standard_hub/hub_state/channels_by_id.go b/be1-go/hub/standard_hub/hub_state/channels_by_id.go deleted file mode 100644 index e0f5d40a4c..0000000000 --- a/be1-go/hub/standard_hub/hub_state/channels_by_id.go +++ /dev/null @@ -1,44 +0,0 @@ -package hub_state - -import ( - "maps" - "popstellar/channel" - "sync" -) - -// ChannelsById provides a thread-safe structure that stores channel ids with their corresponding channels -type ChannelsById struct { - sync.RWMutex - table map[string]channel.Channel -} - -// NewChannelsById creates a new ChannelsById structure -func NewChannelsById() ChannelsById { - return ChannelsById{ - table: make(map[string]channel.Channel), - } -} - -// Add adds a channel to the table -func (c *ChannelsById) Add(channelId string, channel channel.Channel) { - c.Lock() - defer c.Unlock() - c.table[channelId] = channel -} - -// GetChannel returns a channel from the table if it exists, otherwise it returns false and nil -func (c *ChannelsById) GetChannel(channelId string) (channel.Channel, bool) { - c.RLock() - defer c.RUnlock() - channel, ok := c.table[channelId] - return channel, ok -} - -// GetAll returns a copy of the table -func (c *ChannelsById) GetAll() map[string]channel.Channel { - c.RLock() - defer c.RUnlock() - channelsCopy := make(map[string]channel.Channel) - maps.Copy(channelsCopy, c.table) - return channelsCopy -} diff --git a/be1-go/hub/standard_hub/hub_state/content.go b/be1-go/hub/standard_hub/hub_state/content.go new file mode 100644 index 0000000000..679034e4b4 --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/content.go @@ -0,0 +1,32 @@ +package hub_state + +import ( + "golang.org/x/exp/slices" + "popstellar/channel" +) + +// Channels provides a thread-safe structure that stores channel ids with their corresponding channels +type Channels ThreadSafeMap[string, channel.Channel] + +// MessageIds provides a thread-safe structure that stores a channel id with its corresponding message ids +type MessageIds ThreadSafeMap[string, []string] + +func (i *MessageIds) Add(channel string, id string) { + i.Lock() + defer i.Unlock() + messageIds, channelStored := i.table[channel] + if !channelStored { + i.table[channel] = append(i.table[channel], id) + return + } + alreadyStoredId := slices.Contains(messageIds, id) + if !alreadyStoredId { + i.table[channel] = append(i.table[channel], id) + } +} + +func (i *MessageIds) AddAll(channel string, ids []string) { + i.Lock() + defer i.Unlock() + i.table[channel] = append(i.table[channel], ids...) +} diff --git a/be1-go/hub/standard_hub/hub_state/ids_by_channel.go b/be1-go/hub/standard_hub/hub_state/ids_by_channel.go deleted file mode 100644 index 18fca96af9..0000000000 --- a/be1-go/hub/standard_hub/hub_state/ids_by_channel.go +++ /dev/null @@ -1,59 +0,0 @@ -package hub_state - -import ( - "golang.org/x/exp/slices" - "maps" - "sync" -) - -// IdsByChannel provides a thread-safe structure that stores a channel id with its corresponding message ids -type IdsByChannel struct { - sync.RWMutex - table map[string][]string -} - -// NewIdsByChannel creates a new IdsByChannel structure -func NewIdsByChannel() IdsByChannel { - return IdsByChannel{ - table: make(map[string][]string), - } -} - -// Add adds a message id to the table -func (i *IdsByChannel) Add(channel string, id string) { - i.Lock() - defer i.Unlock() - messageIds, channelStored := i.table[channel] - if !channelStored { - i.table[channel] = append(i.table[channel], id) - return - } - alreadyStoredId := slices.Contains(messageIds, id) - if !alreadyStoredId { - i.table[channel] = append(i.table[channel], id) - } -} - -// AddAll adds a slice of message ids to the table -func (i *IdsByChannel) AddAll(channel string, ids []string) { - i.Lock() - defer i.Unlock() - i.table[channel] = append(i.table[channel], ids...) -} - -// GetAll returns a copy of the table -func (i *IdsByChannel) GetAll() map[string][]string { - i.RLock() - defer i.RUnlock() - tableCopy := make(map[string][]string) - maps.Copy(tableCopy, i.table) - return tableCopy -} - -// IsEmpty returns true if the table is empty, otherwise it returns false -func (i *IdsByChannel) IsEmpty() bool { - i.RLock() - defer i.RUnlock() - - return len(i.table) == 0 -} From 55bf030375fd3cef9c138e25fbd197b77098a4d5 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Sat, 14 Oct 2023 15:28:19 +0200 Subject: [PATCH 14/28] Fix threadsafemap usage --- be1-go/go.mod | 10 +++--- be1-go/go.sum | 20 ++++++------ .../standard_hub/hub_state/ThreadSafeMap.go | 32 +++++++++---------- be1-go/hub/standard_hub/hub_state/content.go | 6 ++-- be1-go/hub/standard_hub/message_handling.go | 2 +- be1-go/hub/standard_hub/mod.go | 24 +++++++------- be1-go/hub/standard_hub/mod_test.go | 26 +++++++-------- 7 files changed, 62 insertions(+), 58 deletions(-) diff --git a/be1-go/go.mod b/be1-go/go.mod index f0f862f5c6..1d52d3f62e 100644 --- a/be1-go/go.mod +++ b/be1-go/go.mod @@ -20,7 +20,7 @@ require ( github.com/zitadel/oidc/v2 v2.1.2 go.dedis.ch/kyber/v3 v3.0.13 golang.org/x/exp v0.0.0-20230321023759-10a507213a29 - golang.org/x/sync v0.1.0 + golang.org/x/sync v0.4.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 gopkg.in/yaml.v2 v2.2.3 ) @@ -38,11 +38,11 @@ require ( github.com/russross/blackfriday/v2 v2.0.1 // indirect github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect go.dedis.ch/fixbuf v1.0.3 // indirect - golang.org/x/crypto v0.7.0 // indirect - golang.org/x/net v0.8.0 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/net v0.16.0 // indirect golang.org/x/oauth2 v0.6.0 // indirect - golang.org/x/sys v0.6.0 // indirect - golang.org/x/text v0.8.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.29.1 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect diff --git a/be1-go/go.sum b/be1-go/go.sum index 153d8a0139..30c5019d86 100644 --- a/be1-go/go.sum +++ b/be1-go/go.sum @@ -94,8 +94,8 @@ golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= -golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -105,15 +105,15 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos= +golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw= golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190124100055-b90733256f2e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -123,14 +123,14 @@ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= diff --git a/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go b/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go index 8826a249e0..9deedadc34 100644 --- a/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go +++ b/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go @@ -13,31 +13,31 @@ func NewThreadSafeMap[K comparable, V any]() ThreadSafeMap[K, V] { } } -func (t *ThreadSafeMap[K, V]) Get(key K) (V, bool) { - t.RLock() - defer t.RUnlock() - val, ok := t.table[key] +func (i *ThreadSafeMap[K, V]) Get(key K) (V, bool) { + i.RLock() + defer i.RUnlock() + val, ok := i.table[key] return val, ok } -func (t *ThreadSafeMap[K, V]) Set(key K, val V) { - t.Lock() - defer t.Unlock() - t.table[key] = val +func (i *ThreadSafeMap[K, V]) Set(key K, val V) { + i.Lock() + defer i.Unlock() + i.table[key] = val } -func (t *ThreadSafeMap[K, V]) GetTable() map[K]V { - t.Lock() - defer t.Unlock() +func (i *ThreadSafeMap[K, V]) GetTable() map[K]V { + i.Lock() + defer i.Unlock() tableCopy := make(map[K]V) - for key, val := range t.table { + for key, val := range i.table { tableCopy[key] = val } return tableCopy } -func (t *ThreadSafeMap[K, V]) IsEmpty() bool { - t.Lock() - defer t.Unlock() - return len(t.table) == 0 +func (i *ThreadSafeMap[K, V]) IsEmpty() bool { + i.Lock() + defer i.Unlock() + return len(i.table) == 0 } diff --git a/be1-go/hub/standard_hub/hub_state/content.go b/be1-go/hub/standard_hub/hub_state/content.go index 679034e4b4..68f266915f 100644 --- a/be1-go/hub/standard_hub/hub_state/content.go +++ b/be1-go/hub/standard_hub/hub_state/content.go @@ -6,10 +6,12 @@ import ( ) // Channels provides a thread-safe structure that stores channel ids with their corresponding channels -type Channels ThreadSafeMap[string, channel.Channel] +type Channels = ThreadSafeMap[string, channel.Channel] // MessageIds provides a thread-safe structure that stores a channel id with its corresponding message ids -type MessageIds ThreadSafeMap[string, []string] +type MessageIds struct { + ThreadSafeMap[string, []string] +} func (i *MessageIds) Add(channel string, id string) { i.Lock() diff --git a/be1-go/hub/standard_hub/message_handling.go b/be1-go/hub/standard_hub/message_handling.go index 880f71b25a..f7950ccb95 100644 --- a/be1-go/hub/standard_hub/message_handling.go +++ b/be1-go/hub/standard_hub/message_handling.go @@ -427,7 +427,7 @@ func (h *Hub) handleHeartbeat(socket socket.Socket, receivedIds := heartbeat.Params - missingIds := getMissingIds(receivedIds, h.messageIdsByChannel.GetAll(), h.blacklist) + missingIds := getMissingIds(receivedIds, h.messageIdsByChannel.GetTable(), h.blacklist) if len(missingIds) > 0 { err = h.sendGetMessagesByIdToServer(socket, missingIds) diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index 8660e3be06..dd88f92fdf 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -58,7 +58,7 @@ type Hub struct { messageChan chan socket.IncomingMessage sync.RWMutex - channelByID state.ChannelsById + channelByID state.Channels closedSockets chan string @@ -88,7 +88,7 @@ type Hub struct { // messageIdsByChannel stores all the message ids and the corresponding channel ids // to help servers determine in which channel the message ids go - messageIdsByChannel state.IdsByChannel + messageIdsByChannel state.MessageIds // peers stores information about the peers peers state.Peers @@ -117,7 +117,7 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd clientServerAddress: clientServerAddress, serverServerAddress: serverServerAddress, messageChan: make(chan socket.IncomingMessage), - channelByID: state.NewChannelsById(), + channelByID: state.NewThreadSafeMap[string, channel.Channel](), closedSockets: make(chan string), pubKeyOwner: pubKeyOwner, pubKeyServ: pubServ, @@ -131,9 +131,11 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd hubInbox: *inbox.NewInbox(rootChannel), rootInbox: *inbox.NewInbox(rootChannel), queries: state.NewQueries(), - messageIdsByChannel: state.NewIdsByChannel(), - peers: state.NewPeers(), - blacklist: make([]string, 0), + messageIdsByChannel: state.MessageIds{ + state.NewThreadSafeMap[string, []string](), + }, + peers: state.NewPeers(), + blacklist: make([]string, 0), } return &hub, nil @@ -174,7 +176,7 @@ func (h *Hub) Start() { } }() case id := <-h.closedSockets: - for _, channel := range h.channelByID.GetAll() { + for _, channel := range h.channelByID.GetTable() { // dummy Unsubscribe message because it's only used for logging... channel.Unsubscribe(id, method.Unsubscribe{}) } @@ -275,7 +277,7 @@ func (h *Hub) getChan(channelPath string) (channel.Channel, error) { return nil, xerrors.Errorf("channel not prefixed with '%s': %q", rootPrefix, channelPath) } - channel, ok := h.channelByID.GetChannel(channelPath) + channel, ok := h.channelByID.Get(channelPath) if !ok { return nil, xerrors.Errorf("channel %s does not exist", channelPath) } @@ -501,7 +503,7 @@ func (h *Hub) sendHeartbeatToServers() { }, Method: "heartbeat", }, - Params: h.messageIdsByChannel.GetAll(), + Params: h.messageIdsByChannel.GetTable(), } buf, err := json.Marshal(heartbeatMessage) @@ -517,7 +519,7 @@ func (h *Hub) createLao(msg message.Message, laoCreate messagedata.LaoCreate, laoChannelPath := rootPrefix + laoCreate.ID - if _, ok := h.channelByID.GetChannel(laoChannelPath); ok { + if _, ok := h.channelByID.Get(laoChannelPath); ok { return answer.NewDuplicateResourceError("failed to create lao: duplicate lao path: %q", laoChannelPath) } @@ -598,7 +600,7 @@ func (h *Hub) GetSchemaValidator() validation.SchemaValidator { // NotifyNewChannel implements channel.HubFunctionalities func (h *Hub) NotifyNewChannel(channelID string, channel channel.Channel, sock socket.Socket) { - h.channelByID.Add(channelID, channel) + h.channelByID.Set(channelID, channel) } // NotifyWitnessMessage implements channel.HubFunctionalities diff --git a/be1-go/hub/standard_hub/mod_test.go b/be1-go/hub/standard_hub/mod_test.go index a38387b0bd..25a6934145 100644 --- a/be1-go/hub/standard_hub/mod_test.go +++ b/be1-go/hub/standard_hub/mod_test.go @@ -729,9 +729,9 @@ func Test_Create_LAO(t *testing.T) { // the server should have saved the channel locally - require.Contains(t, hub.channelByID.GetAll(), rootPrefix+data.ID) + require.Contains(t, hub.channelByID.GetTable(), rootPrefix+data.ID) - channel, _ := hub.channelByID.GetChannel(rootPrefix + data.ID) + channel, _ := hub.channelByID.Get(rootPrefix + data.ID) require.Equal(t, fakeChannelFac.c, channel) } @@ -745,7 +745,7 @@ func Test_Wrong_Root_Publish(t *testing.T) { laoID := "/root" - hub.channelByID.Add(rootPrefix+laoID, c) + hub.channelByID.Set(rootPrefix+laoID, c) data := messagedata.LaoState{ Object: messagedata.LAOObject, @@ -933,7 +933,7 @@ func Test_Handle_Publish_From_Client(t *testing.T) { laoID := "XXX" - hub.channelByID.Add(rootPrefix+laoID, c) + hub.channelByID.Set(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) @@ -1000,7 +1000,7 @@ func Test_Handle_Publish_From_Server(t *testing.T) { laoID := "XXX" - hub.channelByID.Add(rootPrefix+laoID, c) + hub.channelByID.Set(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) @@ -1067,7 +1067,7 @@ func Test_Receive_Publish_Twice(t *testing.T) { laoID := "XXX" - hub.channelByID.Add(rootPrefix+laoID, c) + hub.channelByID.Set(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) @@ -1228,8 +1228,8 @@ func Test_Create_LAO_GetMessagesById_Result(t *testing.T) { // the server should have saved the channel locally - require.Contains(t, hub.channelByID.GetAll(), rootPrefix+data.ID) - channel, _ := hub.channelByID.GetChannel(rootPrefix + laoID) + require.Contains(t, hub.channelByID.GetTable(), rootPrefix+data.ID) + channel, _ := hub.channelByID.Get(rootPrefix + laoID) require.Equal(t, fakeChannelFac.c, channel) } @@ -1334,7 +1334,7 @@ func Test_Handle_Subscribe(t *testing.T) { laoID := "XXX" - hub.channelByID.Add(rootPrefix+laoID, c) + hub.channelByID.Set(rootPrefix+laoID, c) subscribe := method.Subscribe{ Base: query.Base{ @@ -1397,7 +1397,7 @@ func TestServer_Handle_Unsubscribe(t *testing.T) { laoID := "XXX" - hub.channelByID.Add(rootPrefix+laoID, c) + hub.channelByID.Set(rootPrefix+laoID, c) unsubscribe := method.Unsubscribe{ Base: query.Base{ @@ -1471,7 +1471,7 @@ func TestServer_Handle_Catchup(t *testing.T) { laoID := "XXX" - hub.channelByID.Add(rootPrefix+laoID, c) + hub.channelByID.Set(rootPrefix+laoID, c) catchup := method.Catchup{ Base: query.Base{ @@ -1543,7 +1543,7 @@ func Test_Send_And_Handle_Message(t *testing.T) { laoID := "XXX" - hub.channelByID.Add(rootPrefix+laoID, c) + hub.channelByID.Set(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) @@ -1630,7 +1630,7 @@ func Test_Send_Heartbeat_Message(t *testing.T) { messageIdsSent := heartbeat.Params //Check that all the stored messages where sent - for storedChannel, storedIds := range hub.messageIdsByChannel.GetAll() { + for storedChannel, storedIds := range hub.messageIdsByChannel.GetTable() { sentIds, exists := messageIdsSent[storedChannel] require.True(t, exists) for _, storedId := range storedIds { From 47a176c043741b8925e329c238f4855e7f9fe368 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Sat, 14 Oct 2023 15:46:01 +0200 Subject: [PATCH 15/28] Add usage of sets --- be1-go/channel/lao/mod.go | 18 +++++++----------- be1-go/hub/standard_hub/hub_state/peers.go | 11 ++++++----- be1-go/hub/standard_hub/mod.go | 2 +- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/be1-go/channel/lao/mod.go b/be1-go/channel/lao/mod.go index 8ae31330ac..69feabf537 100644 --- a/be1-go/channel/lao/mod.go +++ b/be1-go/channel/lao/mod.go @@ -4,7 +4,7 @@ import ( "encoding/base64" "encoding/json" "fmt" - "golang.org/x/exp/slices" + "golang.org/x/exp/maps" popstellar "popstellar" "popstellar/channel" "popstellar/channel/authentication" @@ -703,16 +703,12 @@ func (c *Channel) createAndSendLAOGreet() error { return xerrors.Errorf("failed to marshal the organizer key: %v", err) } - peers := []messagedata.Peer{} + peersInfo := c.hub.GetPeersInfo() - for _, info := range c.hub.GetPeersInfo() { - peer := messagedata.Peer{ - Address: info.ClientAddress, - } - if slices.Contains(peers, peer) { - continue - } - peers = append(peers, peer) + peers := make(map[messagedata.Peer]struct{}, len(peersInfo)) + + for _, info := range peersInfo { + peers[messagedata.Peer{Address: info.ClientAddress}] = struct{}{} } msgData := messagedata.LaoGreet{ @@ -721,7 +717,7 @@ func (c *Channel) createAndSendLAOGreet() error { LaoID: c.extractLaoID(), Frontend: base64.URLEncoding.EncodeToString(orgPkBuf), Address: c.hub.GetClientServerAddress(), - Peers: peers, + Peers: maps.Keys(peers), } // Marshalls the message data diff --git a/be1-go/hub/standard_hub/hub_state/peers.go b/be1-go/hub/standard_hub/hub_state/peers.go index 24127ddca8..c6fc0a8e75 100644 --- a/be1-go/hub/standard_hub/hub_state/peers.go +++ b/be1-go/hub/standard_hub/hub_state/peers.go @@ -1,6 +1,7 @@ package hub_state import ( + "golang.org/x/exp/maps" "golang.org/x/exp/slices" "popstellar/message/query/method" "sync" @@ -12,14 +13,14 @@ type Peers struct { // peersInfo stores the info of the peers: public key, client and server endpoints associated with the socket ID peersInfo map[string]method.ServerInfo // peersGreeted stores the peers that were greeted by the socket ID - peersGreeted []string + peersGreeted map[string]struct{} } // NewPeers creates a new Peers structure func NewPeers() Peers { return Peers{ peersInfo: make(map[string]method.ServerInfo), - peersGreeted: make([]string, 0), + peersGreeted: make(map[string]struct{}), } } @@ -34,10 +35,10 @@ func (p *Peers) AddPeerInfo(socketId string, info method.ServerInfo) { func (p *Peers) AddPeerGreeted(socketId string) { p.Lock() defer p.Unlock() - if slices.Contains(p.peersGreeted, socketId) { + if slices.Contains(maps.Keys(p.peersGreeted), socketId) { return } - p.peersGreeted = append(p.peersGreeted, socketId) + p.peersGreeted[socketId] = struct{}{} } // GetAllPeersInfo returns a copy of the peers' info slice @@ -55,5 +56,5 @@ func (p *Peers) GetAllPeersInfo() []method.ServerInfo { func (p *Peers) IsPeerGreeted(socketId string) bool { p.RLock() defer p.RUnlock() - return slices.Contains(p.peersGreeted, socketId) + return slices.Contains(maps.Keys(p.peersGreeted), socketId) } diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index dd88f92fdf..616a7a2478 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -132,7 +132,7 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd rootInbox: *inbox.NewInbox(rootChannel), queries: state.NewQueries(), messageIdsByChannel: state.MessageIds{ - state.NewThreadSafeMap[string, []string](), + ThreadSafeMap: state.NewThreadSafeMap[string, []string](), }, peers: state.NewPeers(), blacklist: make([]string, 0), From 87e6a8896f95fe0fba67951189772d16211271af Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Wed, 18 Oct 2023 15:05:25 +0200 Subject: [PATCH 16/28] Address Pierluca's comments --- be1-go/hub/standard_hub/hub_state/queries.go | 31 +++++++++++++++----- be1-go/hub/standard_hub/message_handling.go | 12 ++------ be1-go/hub/standard_hub/mod.go | 5 ++-- be1-go/hub/standard_hub/mod_test.go | 15 ++++++---- 4 files changed, 38 insertions(+), 25 deletions(-) diff --git a/be1-go/hub/standard_hub/hub_state/queries.go b/be1-go/hub/standard_hub/hub_state/queries.go index 7f7de69496..7a4afc11fd 100644 --- a/be1-go/hub/standard_hub/hub_state/queries.go +++ b/be1-go/hub/standard_hub/hub_state/queries.go @@ -1,6 +1,7 @@ package hub_state import ( + "golang.org/x/xerrors" "popstellar/message/query/method" "sync" ) @@ -10,7 +11,7 @@ type Queries struct { sync.Mutex // state stores the ID of the server's queries and their state. False for a // query not yet answered, else true. - state map[int]*bool + state map[int]bool // getMessagesByIdQueries stores the server's getMessagesByIds queries by their ID. getMessagesByIdQueries map[int]method.GetMessagesById // nextID store the ID of the next query @@ -20,17 +21,21 @@ type Queries struct { // NewQueries creates a new queries struct func NewQueries() Queries { return Queries{ - state: make(map[int]*bool), + state: make(map[int]bool), getMessagesByIdQueries: make(map[int]method.GetMessagesById), } } // GetQueryState returns a given query's state -func (q *Queries) GetQueryState(id int) *bool { +func (q *Queries) GetQueryState(id int) (bool, error) { q.Lock() defer q.Unlock() - return q.state[id] + state, ok := q.state[id] + if !ok { + return false, xerrors.Errorf("query with id %d not found", id) + } + return state, nil } // GetNextID returns the next query ID @@ -43,12 +48,23 @@ func (q *Queries) GetNextID() int { return id } -// SetQueryState sets the state of the query with the given ID -func (q *Queries) SetQueryState(id int, state bool) { +// SetQueryReceived sets the state of the query with the given ID as received +func (q *Queries) SetQueryReceived(id int) error { q.Lock() defer q.Unlock() - q.state[id] = &state + currentState, ok := q.state[id] + + if !ok { + return xerrors.Errorf("query with id %d not found", id) + } + + if currentState { + return xerrors.Errorf("query with id %d already answered", id) + } + + q.state[id] = true + return nil } // AddQuery adds the given query to the table @@ -57,4 +73,5 @@ func (q *Queries) AddQuery(id int, query method.GetMessagesById) { defer q.Unlock() q.getMessagesByIdQueries[id] = query + q.state[id] = false } diff --git a/be1-go/hub/standard_hub/message_handling.go b/be1-go/hub/standard_hub/message_handling.go index f7950ccb95..341092ef85 100644 --- a/be1-go/hub/standard_hub/message_handling.go +++ b/be1-go/hub/standard_hub/message_handling.go @@ -189,17 +189,11 @@ func (h *Hub) handleAnswer(senderSocket socket.Socket, byteMessage []byte) error return nil } - val := h.queries.GetQueryState(*answerMsg.ID) - if val == nil { - return xerrors.Errorf("no query sent with id %v", answerMsg.ID) - } - - if *val { - return xerrors.Errorf("query %v already got an answer", answerMsg.ID) + err = h.queries.SetQueryReceived(*answerMsg.ID) + if err != nil { + return xerrors.Errorf("failed to set query state: %v", err) } - h.queries.SetQueryState(*answerMsg.ID, true) - err = h.handleGetMessagesByIdAnswer(senderSocket, answerMsg) if err != nil { return err diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index 616a7a2478..cd609cbc0b 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -466,7 +466,6 @@ func (h *Hub) handleIncomingMessage(incomingMessage *socket.IncomingMessage) err // sendGetMessagesByIdToServer sends a getMessagesById message to a server func (h *Hub) sendGetMessagesByIdToServer(socket socket.Socket, missingIds map[string][]string) error { queryId := h.queries.GetNextID() - h.queries.SetQueryState(queryId, false) getMessagesById := method.GetMessagesById{ Base: query.Base{ @@ -484,10 +483,10 @@ func (h *Hub) sendGetMessagesByIdToServer(socket socket.Socket, missingIds map[s return xerrors.Errorf("failed to marshal getMessagesById query: %v", err) } - h.queries.AddQuery(queryId, getMessagesById) - socket.Send(buf) + h.queries.AddQuery(queryId, getMessagesById) + return nil } diff --git a/be1-go/hub/standard_hub/mod_test.go b/be1-go/hub/standard_hub/mod_test.go index 25a6934145..e5faa86513 100644 --- a/be1-go/hub/standard_hub/mod_test.go +++ b/be1-go/hub/standard_hub/mod_test.go @@ -876,7 +876,6 @@ func Test_Handle_Answer(t *testing.T) { answerBisBuf, err := json.Marshal(serverAnswerBis) require.NoError(t, err) - hub.queries.SetQueryState(1, false) query := method.GetMessagesById{ Base: query.Base{}, ID: 1, @@ -891,21 +890,27 @@ func Test_Handle_Answer(t *testing.T) { }) require.Error(t, sock.err, "rpc message sent by a client should be a query") sock.err = nil - require.False(t, *hub.queries.GetQueryState(1)) + queryState, err := hub.queries.GetQueryState(1) + require.NoError(t, err) + require.False(t, queryState) hub.handleMessageFromServer(&socket.IncomingMessage{ Socket: sock, Message: resultBuf, }) require.NoError(t, sock.err) - require.False(t, *hub.queries.GetQueryState(1)) + queryState, _ = hub.queries.GetQueryState(1) + require.NoError(t, err) + require.False(t, queryState) hub.handleMessageFromServer(&socket.IncomingMessage{ Socket: sock, Message: answerBuf, }) require.NoError(t, sock.err) - require.True(t, *hub.queries.GetQueryState(1)) + queryState, _ = hub.queries.GetQueryState(1) + require.NoError(t, err) + require.True(t, queryState) hub.handleMessageFromServer(&socket.IncomingMessage{ Socket: sock, @@ -1192,7 +1197,6 @@ func Test_Create_LAO_GetMessagesById_Result(t *testing.T) { Params: missingMessages, } - hub.queries.SetQueryState(1, false) hub.queries.AddQuery(1, getMessagesByIdQuery) ans := struct { @@ -1295,7 +1299,6 @@ func Test_Create_LAO_GetMessagesById_Wrong_MessageID(t *testing.T) { Params: missingMessages, } - hub.queries.SetQueryState(1, false) hub.queries.AddQuery(1, getMessagesByIdQuery) ans := struct { From 70eb68a1743e96d4764aaeca4915dace76f0c0a7 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Wed, 18 Oct 2023 15:48:29 +0200 Subject: [PATCH 17/28] Add imports and foreach channels --- be1-go/hub/standard_hub/hub_state/content.go | 15 ++++++-- be1-go/hub/standard_hub/hub_state/peers.go | 5 +-- be1-go/hub/standard_hub/hub_state/queries.go | 3 +- be1-go/hub/standard_hub/message_handling.go | 3 +- be1-go/hub/standard_hub/mod.go | 37 ++++++++++---------- be1-go/hub/standard_hub/mod_test.go | 3 +- be1-go/make.bat | 2 ++ 7 files changed, 43 insertions(+), 25 deletions(-) diff --git a/be1-go/hub/standard_hub/hub_state/content.go b/be1-go/hub/standard_hub/hub_state/content.go index 68f266915f..698e2285b1 100644 --- a/be1-go/hub/standard_hub/hub_state/content.go +++ b/be1-go/hub/standard_hub/hub_state/content.go @@ -1,12 +1,23 @@ package hub_state import ( - "golang.org/x/exp/slices" "popstellar/channel" + + "golang.org/x/exp/slices" ) // Channels provides a thread-safe structure that stores channel ids with their corresponding channels -type Channels = ThreadSafeMap[string, channel.Channel] +type Channels struct { + ThreadSafeMap[string, channel.Channel] +} + +func (c *Channels) ForEach(f func(channel.Channel)) { + c.Lock() + defer c.Unlock() + for _, channel := range c.table { + f(channel) + } +} // MessageIds provides a thread-safe structure that stores a channel id with its corresponding message ids type MessageIds struct { diff --git a/be1-go/hub/standard_hub/hub_state/peers.go b/be1-go/hub/standard_hub/hub_state/peers.go index c6fc0a8e75..d8a188066f 100644 --- a/be1-go/hub/standard_hub/hub_state/peers.go +++ b/be1-go/hub/standard_hub/hub_state/peers.go @@ -1,10 +1,11 @@ package hub_state import ( - "golang.org/x/exp/maps" - "golang.org/x/exp/slices" "popstellar/message/query/method" "sync" + + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" ) // Peers provides a thread-safe structure that stores the peers' information diff --git a/be1-go/hub/standard_hub/hub_state/queries.go b/be1-go/hub/standard_hub/hub_state/queries.go index 7a4afc11fd..48aaed1707 100644 --- a/be1-go/hub/standard_hub/hub_state/queries.go +++ b/be1-go/hub/standard_hub/hub_state/queries.go @@ -1,9 +1,10 @@ package hub_state import ( - "golang.org/x/xerrors" "popstellar/message/query/method" "sync" + + "golang.org/x/xerrors" ) // Queries let the hub remember all queries that it sent to other servers diff --git a/be1-go/hub/standard_hub/message_handling.go b/be1-go/hub/standard_hub/message_handling.go index 341092ef85..94fcbb681d 100644 --- a/be1-go/hub/standard_hub/message_handling.go +++ b/be1-go/hub/standard_hub/message_handling.go @@ -3,7 +3,6 @@ package standard_hub import ( "encoding/base64" "encoding/json" - "github.com/rs/zerolog/log" "popstellar/crypto" jsonrpc "popstellar/message" "popstellar/message/answer" @@ -14,6 +13,8 @@ import ( "popstellar/network/socket" "popstellar/validation" + "github.com/rs/zerolog/log" + "go.dedis.ch/kyber/v3/sign/schnorr" "golang.org/x/exp/slices" diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index cd609cbc0b..abcb3c5eb2 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -117,20 +117,22 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd clientServerAddress: clientServerAddress, serverServerAddress: serverServerAddress, messageChan: make(chan socket.IncomingMessage), - channelByID: state.NewThreadSafeMap[string, channel.Channel](), - closedSockets: make(chan string), - pubKeyOwner: pubKeyOwner, - pubKeyServ: pubServ, - secKeyServ: secServ, - schemaValidator: schemaValidator, - stop: make(chan struct{}), - workers: semaphore.NewWeighted(numWorkers), - log: log, - laoFac: laoFac, - serverSockets: channel.NewSockets(), - hubInbox: *inbox.NewInbox(rootChannel), - rootInbox: *inbox.NewInbox(rootChannel), - queries: state.NewQueries(), + channelByID: state.Channels{ + ThreadSafeMap: state.NewThreadSafeMap[string, channel.Channel](), + }, + closedSockets: make(chan string), + pubKeyOwner: pubKeyOwner, + pubKeyServ: pubServ, + secKeyServ: secServ, + schemaValidator: schemaValidator, + stop: make(chan struct{}), + workers: semaphore.NewWeighted(numWorkers), + log: log, + laoFac: laoFac, + serverSockets: channel.NewSockets(), + hubInbox: *inbox.NewInbox(rootChannel), + rootInbox: *inbox.NewInbox(rootChannel), + queries: state.NewQueries(), messageIdsByChannel: state.MessageIds{ ThreadSafeMap: state.NewThreadSafeMap[string, []string](), }, @@ -176,10 +178,9 @@ func (h *Hub) Start() { } }() case id := <-h.closedSockets: - for _, channel := range h.channelByID.GetTable() { - // dummy Unsubscribe message because it's only used for logging... - channel.Unsubscribe(id, method.Unsubscribe{}) - } + h.channelByID.ForEach(func(c channel.Channel) { + c.Unsubscribe(id, method.Unsubscribe{}) + }) case <-h.stop: h.log.Info().Msg("stopping the hub") return diff --git a/be1-go/hub/standard_hub/mod_test.go b/be1-go/hub/standard_hub/mod_test.go index e5faa86513..7c756497d1 100644 --- a/be1-go/hub/standard_hub/mod_test.go +++ b/be1-go/hub/standard_hub/mod_test.go @@ -4,7 +4,6 @@ import ( "encoding/base64" "encoding/json" "fmt" - "golang.org/x/exp/slices" "io" "os" "path/filepath" @@ -19,6 +18,8 @@ import ( "testing" "time" + "golang.org/x/exp/slices" + "github.com/stretchr/testify/assert" "github.com/rs/zerolog" diff --git a/be1-go/make.bat b/be1-go/make.bat index 620a7ae791..c8350a0156 100644 --- a/be1-go/make.bat +++ b/be1-go/make.bat @@ -15,8 +15,10 @@ GOTO error :lint go get -v honnef.co/go/tools/cmd/staticcheck + go get golang.org/x/tools/cmd/goimports@latest go mod tidy staticcheck ./... + goimports -w . GOTO :EOF :check From e0ae6a92406e85c1975ddbff69392e29a811bbe3 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Wed, 18 Oct 2023 15:58:14 +0200 Subject: [PATCH 18/28] Change go version in workflows --- .github/workflows/ci.yaml | 4 ++-- .github/workflows/deploy.yaml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7b906f66d1..e0e1475566 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -115,10 +115,10 @@ jobs: working-directory: ./be1-go steps: - - name: Use go >= 1.19 + - name: Use go >= 1.21 uses: actions/setup-go@v3 with: - go-version: ">=1.19" + go-version: ">=1.21" - name: Setup repo uses: actions/checkout@v3 diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml index affccafc7c..232effb1a6 100644 --- a/.github/workflows/deploy.yaml +++ b/.github/workflows/deploy.yaml @@ -91,10 +91,10 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Use go >= 1.19 + - name: Use go >= 1.21 uses: actions/setup-go@v3 with: - go-version: ">=1.19" + go-version: ">=1.21" - name: build run: | From de9f7e41141dd7f1aa3d0ff035f582ac5348f528 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Wed, 18 Oct 2023 16:26:50 +0200 Subject: [PATCH 19/28] change go version on karate workflow --- .github/workflows/karate_be1-go.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/karate_be1-go.yaml b/.github/workflows/karate_be1-go.yaml index 18337a7b71..42cf6e7572 100644 --- a/.github/workflows/karate_be1-go.yaml +++ b/.github/workflows/karate_be1-go.yaml @@ -11,10 +11,10 @@ jobs: runs-on: ubuntu-latest steps: - - name: Use go >= 1.19 + - name: Use go >= 1.21 uses: actions/setup-go@v3 with: - go-version: ">=1.19" + go-version: ">=1.21" - name: Setup repo uses: actions/checkout@v3 From 1314dbd6348bf04d9079bca9f9f73c06e5866a10 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Wed, 18 Oct 2023 16:30:02 +0200 Subject: [PATCH 20/28] experimenting with CI and goimports --- .github/workflows/ci.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e0e1475566..6767a197b2 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -144,8 +144,8 @@ jobs: - name: Format if: ${{ matrix.platform == 'ubuntu-latest' }} run: | - make check-fmt - if [ "$(gofmt -s -l ./ | wc -l)" -gt 0 ]; then exit 1; fi + make check-imports + if [ "$(goimports -s -l ./ | wc -l)" -gt 0 ]; then exit 1; fi - name: Run check target if: ${{ matrix.platform == 'ubuntu-latest' }} From 962663f2096dc4199ad79c4b125db4db919e5b4a Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Wed, 18 Oct 2023 16:34:48 +0200 Subject: [PATCH 21/28] experimenting with CI and goimports --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 6767a197b2..bfc684f690 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -144,7 +144,7 @@ jobs: - name: Format if: ${{ matrix.platform == 'ubuntu-latest' }} run: | - make check-imports + make check-fmt if [ "$(goimports -s -l ./ | wc -l)" -gt 0 ]; then exit 1; fi - name: Run check target From 87934ac9e1c4906985aaf3d5fe1623878eb017e8 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Wed, 18 Oct 2023 16:39:43 +0200 Subject: [PATCH 22/28] try to add goimports --- .github/workflows/ci.yaml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index bfc684f690..7594a952f7 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -145,7 +145,13 @@ jobs: if: ${{ matrix.platform == 'ubuntu-latest' }} run: | make check-fmt - if [ "$(goimports -s -l ./ | wc -l)" -gt 0 ]; then exit 1; fi + if [ "$(gofmt -s -l ./ | wc -l)" -gt 0 ]; then exit 1; fi + + - name: GoImports install + run: go get golang.org/x/tools/cmd/goimports@latest + + - name: GoImports run + run: goimports -w . - name: Run check target if: ${{ matrix.platform == 'ubuntu-latest' }} From 05a8a8b9ce398336ef59a45a9283ec2d3c0ca894 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Wed, 18 Oct 2023 16:48:08 +0200 Subject: [PATCH 23/28] remove goimports --- .github/workflows/ci.yaml | 6 ------ be1-go/make.bat | 2 -- 2 files changed, 8 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7594a952f7..e0e1475566 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -147,12 +147,6 @@ jobs: make check-fmt if [ "$(gofmt -s -l ./ | wc -l)" -gt 0 ]; then exit 1; fi - - name: GoImports install - run: go get golang.org/x/tools/cmd/goimports@latest - - - name: GoImports run - run: goimports -w . - - name: Run check target if: ${{ matrix.platform == 'ubuntu-latest' }} run: | diff --git a/be1-go/make.bat b/be1-go/make.bat index c8350a0156..620a7ae791 100644 --- a/be1-go/make.bat +++ b/be1-go/make.bat @@ -15,10 +15,8 @@ GOTO error :lint go get -v honnef.co/go/tools/cmd/staticcheck - go get golang.org/x/tools/cmd/goimports@latest go mod tidy staticcheck ./... - goimports -w . GOTO :EOF :check From 54d4b758ff39dcb60f0a290f78921e39dc7160eb Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Tue, 24 Oct 2023 13:57:30 +0200 Subject: [PATCH 24/28] fix documentation --- be1-go/configServer1.json | 5 +++-- be1-go/configServer2.json | 5 ++--- be1-go/hub/standard_hub/hub_state/Channels.go | 17 +++++++++++++++++ .../hub_state/{content.go => MessageIds.go} | 19 +++---------------- .../standard_hub/hub_state/ThreadSafeMap.go | 3 ++- be1-go/hub/standard_hub/hub_state/peers.go | 4 ++-- be1-go/hub/standard_hub/mod.go | 1 + 7 files changed, 30 insertions(+), 24 deletions(-) create mode 100644 be1-go/hub/standard_hub/hub_state/Channels.go rename be1-go/hub/standard_hub/hub_state/{content.go => MessageIds.go} (58%) diff --git a/be1-go/configServer1.json b/be1-go/configServer1.json index fb8796572b..7dfafec811 100644 --- a/be1-go/configServer1.json +++ b/be1-go/configServer1.json @@ -8,5 +8,6 @@ "client-port" : 9000, "server-port" : 9001, "auth-port" : 9100, - "other-servers": [] -} \ No newline at end of file + "other-servers": [ + "localhost:9003", "localhost:9005", "localhost:9007" + ]} \ No newline at end of file diff --git a/be1-go/configServer2.json b/be1-go/configServer2.json index 5238986b74..7cf8afcc0b 100644 --- a/be1-go/configServer2.json +++ b/be1-go/configServer2.json @@ -9,6 +9,5 @@ "server-port" : 9003, "auth-port" : 9101, "other-servers": [ - "localhost:9001" - ] -} \ No newline at end of file + "localhost:9001", "localhost:9005", "localhost:9007" + ]} diff --git a/be1-go/hub/standard_hub/hub_state/Channels.go b/be1-go/hub/standard_hub/hub_state/Channels.go new file mode 100644 index 0000000000..7cb9272d0d --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/Channels.go @@ -0,0 +1,17 @@ +package hub_state + +import "popstellar/channel" + +// Channels stores channel ids with their corresponding channels +type Channels struct { + ThreadSafeMap[string, channel.Channel] +} + +// ForEach iterates over all channels and applies the given function +func (c *Channels) ForEach(f func(channel.Channel)) { + c.Lock() + defer c.Unlock() + for _, channel := range c.table { + f(channel) + } +} diff --git a/be1-go/hub/standard_hub/hub_state/content.go b/be1-go/hub/standard_hub/hub_state/MessageIds.go similarity index 58% rename from be1-go/hub/standard_hub/hub_state/content.go rename to be1-go/hub/standard_hub/hub_state/MessageIds.go index 698e2285b1..d7ea94134f 100644 --- a/be1-go/hub/standard_hub/hub_state/content.go +++ b/be1-go/hub/standard_hub/hub_state/MessageIds.go @@ -1,29 +1,15 @@ package hub_state import ( - "popstellar/channel" - "golang.org/x/exp/slices" ) -// Channels provides a thread-safe structure that stores channel ids with their corresponding channels -type Channels struct { - ThreadSafeMap[string, channel.Channel] -} - -func (c *Channels) ForEach(f func(channel.Channel)) { - c.Lock() - defer c.Unlock() - for _, channel := range c.table { - f(channel) - } -} - -// MessageIds provides a thread-safe structure that stores a channel id with its corresponding message ids +// MessageIds stores a channel id with its corresponding message ids type MessageIds struct { ThreadSafeMap[string, []string] } +// Add adds a message id to the slice of message ids of the channel func (i *MessageIds) Add(channel string, id string) { i.Lock() defer i.Unlock() @@ -38,6 +24,7 @@ func (i *MessageIds) Add(channel string, id string) { } } +// AddAll adds a slice of message ids to the slice of message ids of the channel func (i *MessageIds) AddAll(channel string, ids []string) { i.Lock() defer i.Unlock() diff --git a/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go b/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go index 9deedadc34..19a88605f6 100644 --- a/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go +++ b/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go @@ -26,10 +26,11 @@ func (i *ThreadSafeMap[K, V]) Set(key K, val V) { i.table[key] = val } +// GetTable returns a shallow copy of the map func (i *ThreadSafeMap[K, V]) GetTable() map[K]V { i.Lock() defer i.Unlock() - tableCopy := make(map[K]V) + tableCopy := make(map[K]V, len(i.table)) for key, val := range i.table { tableCopy[key] = val } diff --git a/be1-go/hub/standard_hub/hub_state/peers.go b/be1-go/hub/standard_hub/hub_state/peers.go index d8a188066f..043cb049ac 100644 --- a/be1-go/hub/standard_hub/hub_state/peers.go +++ b/be1-go/hub/standard_hub/hub_state/peers.go @@ -8,7 +8,7 @@ import ( "golang.org/x/exp/slices" ) -// Peers provides a thread-safe structure that stores the peers' information +// Peers stores the peers' information type Peers struct { sync.RWMutex // peersInfo stores the info of the peers: public key, client and server endpoints associated with the socket ID @@ -46,7 +46,7 @@ func (p *Peers) AddPeerGreeted(socketId string) { func (p *Peers) GetAllPeersInfo() []method.ServerInfo { p.RLock() defer p.RUnlock() - peersInfo := make([]method.ServerInfo, 0) + peersInfo := make([]method.ServerInfo, 0, len(p.peersInfo)) for _, info := range p.peersInfo { peersInfo = append(peersInfo, info) } diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index abcb3c5eb2..849140ff8b 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -179,6 +179,7 @@ func (h *Hub) Start() { }() case id := <-h.closedSockets: h.channelByID.ForEach(func(c channel.Channel) { + // dummy Unsubscribe message because it's only used for logging... c.Unsubscribe(id, method.Unsubscribe{}) }) case <-h.stop: From dd78bc2bdbaeeabc070cf6e4f7782a7b0021ead7 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Tue, 24 Oct 2023 13:59:48 +0200 Subject: [PATCH 25/28] cancel config changes --- be1-go/configServer1.json | 5 ++--- be1-go/configServer2.json | 5 +++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/be1-go/configServer1.json b/be1-go/configServer1.json index 7dfafec811..fb8796572b 100644 --- a/be1-go/configServer1.json +++ b/be1-go/configServer1.json @@ -8,6 +8,5 @@ "client-port" : 9000, "server-port" : 9001, "auth-port" : 9100, - "other-servers": [ - "localhost:9003", "localhost:9005", "localhost:9007" - ]} \ No newline at end of file + "other-servers": [] +} \ No newline at end of file diff --git a/be1-go/configServer2.json b/be1-go/configServer2.json index 7cf8afcc0b..327ec28b77 100644 --- a/be1-go/configServer2.json +++ b/be1-go/configServer2.json @@ -9,5 +9,6 @@ "server-port" : 9003, "auth-port" : 9101, "other-servers": [ - "localhost:9001", "localhost:9005", "localhost:9007" - ]} + "localhost:9001" + ] +} From 2bc39dbf97ded0eacf162df515ba81af796b5e87 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Mon, 30 Oct 2023 15:53:11 +0100 Subject: [PATCH 26/28] adress pierluca's comments --- be1-go/.gitignore | 4 +- be1-go/channel/lao/mod.go | 7 ++-- be1-go/hub/standard_hub/hub_state/Channels.go | 7 ++++ .../hub/standard_hub/hub_state/MessageIds.go | 7 ++++ .../standard_hub/hub_state/ThreadSafeMap.go | 8 ++-- be1-go/hub/standard_hub/hub_state/peers.go | 7 ++-- be1-go/hub/standard_hub/mod.go | 38 +++++++++---------- 7 files changed, 44 insertions(+), 34 deletions(-) diff --git a/be1-go/.gitignore b/be1-go/.gitignore index 96c0aa4fe9..c99d6a4e40 100644 --- a/be1-go/.gitignore +++ b/be1-go/.gitignore @@ -4,4 +4,6 @@ report.json coverage.out coverage.html configServer1.json -configServer2.json \ No newline at end of file +configServer2.json +configServer3.json +configServer4.json \ No newline at end of file diff --git a/be1-go/channel/lao/mod.go b/be1-go/channel/lao/mod.go index 69feabf537..53585078d3 100644 --- a/be1-go/channel/lao/mod.go +++ b/be1-go/channel/lao/mod.go @@ -4,7 +4,6 @@ import ( "encoding/base64" "encoding/json" "fmt" - "golang.org/x/exp/maps" popstellar "popstellar" "popstellar/channel" "popstellar/channel/authentication" @@ -705,10 +704,10 @@ func (c *Channel) createAndSendLAOGreet() error { peersInfo := c.hub.GetPeersInfo() - peers := make(map[messagedata.Peer]struct{}, len(peersInfo)) + peers := make([]messagedata.Peer, len(peersInfo)) for _, info := range peersInfo { - peers[messagedata.Peer{Address: info.ClientAddress}] = struct{}{} + peers = append(peers, messagedata.Peer{Address: info.ClientAddress}) } msgData := messagedata.LaoGreet{ @@ -717,7 +716,7 @@ func (c *Channel) createAndSendLAOGreet() error { LaoID: c.extractLaoID(), Frontend: base64.URLEncoding.EncodeToString(orgPkBuf), Address: c.hub.GetClientServerAddress(), - Peers: maps.Keys(peers), + Peers: peers, } // Marshalls the message data diff --git a/be1-go/hub/standard_hub/hub_state/Channels.go b/be1-go/hub/standard_hub/hub_state/Channels.go index 7cb9272d0d..29c3a0c95d 100644 --- a/be1-go/hub/standard_hub/hub_state/Channels.go +++ b/be1-go/hub/standard_hub/hub_state/Channels.go @@ -7,6 +7,13 @@ type Channels struct { ThreadSafeMap[string, channel.Channel] } +// NewChannelsMap creates a new Channels structure +func NewChannelsMap() Channels { + return Channels{ + ThreadSafeMap: NewThreadSafeMap[string, channel.Channel](), + } +} + // ForEach iterates over all channels and applies the given function func (c *Channels) ForEach(f func(channel.Channel)) { c.Lock() diff --git a/be1-go/hub/standard_hub/hub_state/MessageIds.go b/be1-go/hub/standard_hub/hub_state/MessageIds.go index d7ea94134f..ebd495c03a 100644 --- a/be1-go/hub/standard_hub/hub_state/MessageIds.go +++ b/be1-go/hub/standard_hub/hub_state/MessageIds.go @@ -9,6 +9,13 @@ type MessageIds struct { ThreadSafeMap[string, []string] } +// NewMessageIdsMap creates a new MessageIds structure +func NewMessageIdsMap() MessageIds { + return MessageIds{ + ThreadSafeMap: NewThreadSafeMap[string, []string](), + } +} + // Add adds a message id to the slice of message ids of the channel func (i *MessageIds) Add(channel string, id string) { i.Lock() diff --git a/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go b/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go index 19a88605f6..46f1c070a8 100644 --- a/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go +++ b/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go @@ -28,8 +28,8 @@ func (i *ThreadSafeMap[K, V]) Set(key K, val V) { // GetTable returns a shallow copy of the map func (i *ThreadSafeMap[K, V]) GetTable() map[K]V { - i.Lock() - defer i.Unlock() + i.RLock() + defer i.RUnlock() tableCopy := make(map[K]V, len(i.table)) for key, val := range i.table { tableCopy[key] = val @@ -38,7 +38,7 @@ func (i *ThreadSafeMap[K, V]) GetTable() map[K]V { } func (i *ThreadSafeMap[K, V]) IsEmpty() bool { - i.Lock() - defer i.Unlock() + i.RLock() + defer i.RUnlock() return len(i.table) == 0 } diff --git a/be1-go/hub/standard_hub/hub_state/peers.go b/be1-go/hub/standard_hub/hub_state/peers.go index 043cb049ac..9950b50f33 100644 --- a/be1-go/hub/standard_hub/hub_state/peers.go +++ b/be1-go/hub/standard_hub/hub_state/peers.go @@ -36,9 +36,6 @@ func (p *Peers) AddPeerInfo(socketId string, info method.ServerInfo) { func (p *Peers) AddPeerGreeted(socketId string) { p.Lock() defer p.Unlock() - if slices.Contains(maps.Keys(p.peersGreeted), socketId) { - return - } p.peersGreeted[socketId] = struct{}{} } @@ -48,7 +45,9 @@ func (p *Peers) GetAllPeersInfo() []method.ServerInfo { defer p.RUnlock() peersInfo := make([]method.ServerInfo, 0, len(p.peersInfo)) for _, info := range p.peersInfo { - peersInfo = append(peersInfo, info) + if !slices.Contains(peersInfo, info) { + peersInfo = append(peersInfo, info) + } } return peersInfo } diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index 849140ff8b..116fec80c3 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -117,27 +117,23 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd clientServerAddress: clientServerAddress, serverServerAddress: serverServerAddress, messageChan: make(chan socket.IncomingMessage), - channelByID: state.Channels{ - ThreadSafeMap: state.NewThreadSafeMap[string, channel.Channel](), - }, - closedSockets: make(chan string), - pubKeyOwner: pubKeyOwner, - pubKeyServ: pubServ, - secKeyServ: secServ, - schemaValidator: schemaValidator, - stop: make(chan struct{}), - workers: semaphore.NewWeighted(numWorkers), - log: log, - laoFac: laoFac, - serverSockets: channel.NewSockets(), - hubInbox: *inbox.NewInbox(rootChannel), - rootInbox: *inbox.NewInbox(rootChannel), - queries: state.NewQueries(), - messageIdsByChannel: state.MessageIds{ - ThreadSafeMap: state.NewThreadSafeMap[string, []string](), - }, - peers: state.NewPeers(), - blacklist: make([]string, 0), + channelByID: state.NewChannelsMap(), + closedSockets: make(chan string), + pubKeyOwner: pubKeyOwner, + pubKeyServ: pubServ, + secKeyServ: secServ, + schemaValidator: schemaValidator, + stop: make(chan struct{}), + workers: semaphore.NewWeighted(numWorkers), + log: log, + laoFac: laoFac, + serverSockets: channel.NewSockets(), + hubInbox: *inbox.NewInbox(rootChannel), + rootInbox: *inbox.NewInbox(rootChannel), + queries: state.NewQueries(), + messageIdsByChannel: state.NewMessageIdsMap(), + peers: state.NewPeers(), + blacklist: make([]string, 0), } return &hub, nil From 8b7482d8d4c3078081f66a44d2389b100ba2e979 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Mon, 30 Oct 2023 16:10:39 +0100 Subject: [PATCH 27/28] fix small typo --- be1-go/channel/lao/mod.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be1-go/channel/lao/mod.go b/be1-go/channel/lao/mod.go index 53585078d3..6394c52d06 100644 --- a/be1-go/channel/lao/mod.go +++ b/be1-go/channel/lao/mod.go @@ -704,7 +704,7 @@ func (c *Channel) createAndSendLAOGreet() error { peersInfo := c.hub.GetPeersInfo() - peers := make([]messagedata.Peer, len(peersInfo)) + peers := make([]messagedata.Peer, 0, len(peersInfo)) for _, info := range peersInfo { peers = append(peers, messagedata.Peer{Address: info.ClientAddress}) From 94aba087fa6fa1aedbe0fcbea27b84776c2725a9 Mon Sep 17 00:00:00 2001 From: MariemBaccari Date: Sat, 4 Nov 2023 23:26:25 +0100 Subject: [PATCH 28/28] fix filenames --- be1-go/hub/standard_hub/hub_state/{peers.go => Peers.go} | 0 be1-go/hub/standard_hub/hub_state/{queries.go => Queries.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename be1-go/hub/standard_hub/hub_state/{peers.go => Peers.go} (100%) rename be1-go/hub/standard_hub/hub_state/{queries.go => Queries.go} (100%) diff --git a/be1-go/hub/standard_hub/hub_state/peers.go b/be1-go/hub/standard_hub/hub_state/Peers.go similarity index 100% rename from be1-go/hub/standard_hub/hub_state/peers.go rename to be1-go/hub/standard_hub/hub_state/Peers.go diff --git a/be1-go/hub/standard_hub/hub_state/queries.go b/be1-go/hub/standard_hub/hub_state/Queries.go similarity index 100% rename from be1-go/hub/standard_hub/hub_state/queries.go rename to be1-go/hub/standard_hub/hub_state/Queries.go