-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1694 from dedis/work-be1-mariembaccari-fine-grain…
…-locking-hub Improve hub locking
- Loading branch information
Showing
17 changed files
with
366 additions
and
199 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,4 +11,4 @@ | |
"other-servers": [ | ||
"localhost:9001" | ||
] | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package hub_state | ||
|
||
import "popstellar/channel" | ||
|
||
// Channels stores channel ids with their corresponding channels | ||
type Channels struct { | ||
ThreadSafeMap[string, channel.Channel] | ||
} | ||
|
||
// NewChannelsMap creates a new Channels structure | ||
func NewChannelsMap() Channels { | ||
return Channels{ | ||
ThreadSafeMap: NewThreadSafeMap[string, channel.Channel](), | ||
} | ||
} | ||
|
||
// ForEach iterates over all channels and applies the given function | ||
func (c *Channels) ForEach(f func(channel.Channel)) { | ||
c.Lock() | ||
defer c.Unlock() | ||
for _, channel := range c.table { | ||
f(channel) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package hub_state | ||
|
||
import ( | ||
"golang.org/x/exp/slices" | ||
) | ||
|
||
// MessageIds stores a channel id with its corresponding message ids | ||
type MessageIds struct { | ||
ThreadSafeMap[string, []string] | ||
} | ||
|
||
// NewMessageIdsMap creates a new MessageIds structure | ||
func NewMessageIdsMap() MessageIds { | ||
return MessageIds{ | ||
ThreadSafeMap: NewThreadSafeMap[string, []string](), | ||
} | ||
} | ||
|
||
// Add adds a message id to the slice of message ids of the channel | ||
func (i *MessageIds) Add(channel string, id string) { | ||
i.Lock() | ||
defer i.Unlock() | ||
messageIds, channelStored := i.table[channel] | ||
if !channelStored { | ||
i.table[channel] = append(i.table[channel], id) | ||
return | ||
} | ||
alreadyStoredId := slices.Contains(messageIds, id) | ||
if !alreadyStoredId { | ||
i.table[channel] = append(i.table[channel], id) | ||
} | ||
} | ||
|
||
// AddAll adds a slice of message ids to the slice of message ids of the channel | ||
func (i *MessageIds) AddAll(channel string, ids []string) { | ||
i.Lock() | ||
defer i.Unlock() | ||
i.table[channel] = append(i.table[channel], ids...) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package hub_state | ||
|
||
import ( | ||
"popstellar/message/query/method" | ||
"sync" | ||
|
||
"golang.org/x/exp/maps" | ||
"golang.org/x/exp/slices" | ||
) | ||
|
||
// Peers stores the peers' information | ||
type Peers struct { | ||
sync.RWMutex | ||
// peersInfo stores the info of the peers: public key, client and server endpoints associated with the socket ID | ||
peersInfo map[string]method.ServerInfo | ||
// peersGreeted stores the peers that were greeted by the socket ID | ||
peersGreeted map[string]struct{} | ||
} | ||
|
||
// NewPeers creates a new Peers structure | ||
func NewPeers() Peers { | ||
return Peers{ | ||
peersInfo: make(map[string]method.ServerInfo), | ||
peersGreeted: make(map[string]struct{}), | ||
} | ||
} | ||
|
||
// AddPeerInfo adds a peer's info to the table | ||
func (p *Peers) AddPeerInfo(socketId string, info method.ServerInfo) { | ||
p.Lock() | ||
defer p.Unlock() | ||
p.peersInfo[socketId] = info | ||
} | ||
|
||
// AddPeerGreeted adds a peer's socket ID to the slice of peers greeted | ||
func (p *Peers) AddPeerGreeted(socketId string) { | ||
p.Lock() | ||
defer p.Unlock() | ||
p.peersGreeted[socketId] = struct{}{} | ||
} | ||
|
||
// GetAllPeersInfo returns a copy of the peers' info slice | ||
func (p *Peers) GetAllPeersInfo() []method.ServerInfo { | ||
p.RLock() | ||
defer p.RUnlock() | ||
peersInfo := make([]method.ServerInfo, 0, len(p.peersInfo)) | ||
for _, info := range p.peersInfo { | ||
if !slices.Contains(peersInfo, info) { | ||
peersInfo = append(peersInfo, info) | ||
} | ||
} | ||
return peersInfo | ||
} | ||
|
||
// IsPeerGreeted returns true if the peer was greeted, otherwise it returns false | ||
func (p *Peers) IsPeerGreeted(socketId string) bool { | ||
p.RLock() | ||
defer p.RUnlock() | ||
return slices.Contains(maps.Keys(p.peersGreeted), socketId) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package hub_state | ||
|
||
import ( | ||
"popstellar/message/query/method" | ||
"sync" | ||
|
||
"golang.org/x/xerrors" | ||
) | ||
|
||
// Queries let the hub remember all queries that it sent to other servers | ||
type Queries struct { | ||
sync.Mutex | ||
// state stores the ID of the server's queries and their state. False for a | ||
// query not yet answered, else true. | ||
state map[int]bool | ||
// getMessagesByIdQueries stores the server's getMessagesByIds queries by their ID. | ||
getMessagesByIdQueries map[int]method.GetMessagesById | ||
// nextID store the ID of the next query | ||
nextID int | ||
} | ||
|
||
// NewQueries creates a new queries struct | ||
func NewQueries() Queries { | ||
return Queries{ | ||
state: make(map[int]bool), | ||
getMessagesByIdQueries: make(map[int]method.GetMessagesById), | ||
} | ||
} | ||
|
||
// GetQueryState returns a given query's state | ||
func (q *Queries) GetQueryState(id int) (bool, error) { | ||
q.Lock() | ||
defer q.Unlock() | ||
|
||
state, ok := q.state[id] | ||
if !ok { | ||
return false, xerrors.Errorf("query with id %d not found", id) | ||
} | ||
return state, nil | ||
} | ||
|
||
// GetNextID returns the next query ID | ||
func (q *Queries) GetNextID() int { | ||
q.Lock() | ||
defer q.Unlock() | ||
|
||
id := q.nextID | ||
q.nextID++ | ||
return id | ||
} | ||
|
||
// SetQueryReceived sets the state of the query with the given ID as received | ||
func (q *Queries) SetQueryReceived(id int) error { | ||
q.Lock() | ||
defer q.Unlock() | ||
|
||
currentState, ok := q.state[id] | ||
|
||
if !ok { | ||
return xerrors.Errorf("query with id %d not found", id) | ||
} | ||
|
||
if currentState { | ||
return xerrors.Errorf("query with id %d already answered", id) | ||
} | ||
|
||
q.state[id] = true | ||
return nil | ||
} | ||
|
||
// AddQuery adds the given query to the table | ||
func (q *Queries) AddQuery(id int, query method.GetMessagesById) { | ||
q.Lock() | ||
defer q.Unlock() | ||
|
||
q.getMessagesByIdQueries[id] = query | ||
q.state[id] = false | ||
} |
Oops, something went wrong.