Skip to content

Commit

Permalink
Merge pull request #1946 from dedis/work-be1-arnauds5-federation-data…
Browse files Browse the repository at this point in the history
…-exchange

[BE1] Federation Data Exchange
  • Loading branch information
sgueissa authored Jun 27, 2024
2 parents 4945cad + f0b9e9c commit 4ed41a1
Show file tree
Hide file tree
Showing 11 changed files with 467 additions and 101 deletions.
1 change: 1 addition & 0 deletions be1-go/internal/handler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
FederationActionInit = "init"
FederationActionExpect = "expect"
FederationActionResult = "result"
FederationActionTokensExchange = "tokens_exchange"

ChirpObject = "chirp"
ChirpActionAdd = "add"
Expand Down
182 changes: 124 additions & 58 deletions be1-go/internal/handler/channel/federation/hfederation/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"popstellar/internal/handler/jsonrpc/mjsonrpc"
"popstellar/internal/handler/message/mmessage"
"popstellar/internal/handler/method/publish/mpublish"
"popstellar/internal/handler/method/subscribe/msubscribe"
"popstellar/internal/handler/query/mquery"
"popstellar/internal/network/socket"
"popstellar/internal/validation"
Expand All @@ -44,7 +43,18 @@ type Subscribers interface {
SendToAll(buf []byte, channel string) error
}

type Sockets interface {
Upsert(socket socket.Socket)
}

type Config interface {
GetServerInfo() (string, string, string, error)
}

type Repository interface {
// HasMessage returns true if the message already exists.
HasMessage(messageID string) (bool, error)

// GetOrganizerPubKey returns the organizer public key of a LAO.
GetOrganizerPubKey(laoID string) (kyber.Point, error)

Expand All @@ -70,24 +80,45 @@ type Repository interface {
}

type Handler struct {
hub Hub
subs Subscribers
db Repository
schema *validation.SchemaValidator
log zerolog.Logger
hub Hub
subs Subscribers
sockets Sockets
conf Config
db Repository
schema *validation.SchemaValidator
log zerolog.Logger
}

func New(hub Hub, subs Subscribers, db Repository, schema *validation.SchemaValidator, log zerolog.Logger) *Handler {
func New(hub Hub, subs Subscribers, sockets Sockets, conf Config,
db Repository, schema *validation.SchemaValidator,
log zerolog.Logger) *Handler {
return &Handler{
hub: hub,
subs: subs,
db: db,
schema: schema,
log: log.With().Str("module", "federation").Logger(),
hub: hub,
subs: subs,
sockets: sockets,
conf: conf,
db: db,
schema: schema,
log: log.With().Str("module", "federation").Logger(),
}
}

func (h *Handler) Handle(channelPath string, msg mmessage.Message) error {
func (h *Handler) Handle(channelPath string, msg mmessage.Message,
socket socket.Socket) error {
err := msg.VerifyMessage()
if err != nil {
return err
}

alreadyExist, err := h.db.HasMessage(msg.MessageID)
if err != nil {
return err
}

if alreadyExist {
return nil
}

jsonData, err := base64.URLEncoding.DecodeString(msg.Data)
if err != nil {
return errors.NewInvalidMessageFieldError("failed to decode message data: %v", err)
Expand Down Expand Up @@ -115,9 +146,11 @@ func (h *Handler) Handle(channelPath string, msg mmessage.Message) error {
case channel.FederationActionExpect:
err = h.handleExpect(msg, channelPath)
case channel.FederationActionChallenge:
err = h.handleChallenge(msg, channelPath)
err = h.handleChallenge(msg, channelPath, socket)
case channel.FederationActionResult:
err = h.handleResult(msg, channelPath)
case channel.FederationActionTokensExchange:
err = h.handleTokensExchange(msg, channelPath)
default:
err = errors.NewInvalidMessageFieldError("failed to Handle %s#%s, invalid object#action", object, action)
}
Expand Down Expand Up @@ -192,6 +225,11 @@ func (h *Handler) handleExpect(msg mmessage.Message, channelPath string) error {
return err
}

err = federationExpect.ChallengeMsg.VerifyMessage()
if err != nil {
return err
}

var challenge mfederation.FederationChallenge
err = federationExpect.ChallengeMsg.UnmarshalData(&challenge)
if err != nil {
Expand All @@ -213,9 +251,6 @@ func (h *Handler) handleExpect(msg mmessage.Message, channelPath string) error {
return err
}

remoteChannel := fmt.Sprintf(channelPattern, federationExpect.LaoId)
_ = h.subs.AddChannel(remoteChannel)

return h.db.StoreMessageAndData(channelPath, msg)
}

Expand Down Expand Up @@ -257,45 +292,29 @@ func (h *Handler) handleInit(msg mmessage.Message, channelPath string) error {
return err
}

remote, err := h.connectTo(federationInit.ServerAddress)
if err != nil {
return err
}

//Force the remote server to be subscribed to /root/<remote_lao>/federation
remoteChannel := fmt.Sprintf(channelPattern, federationInit.LaoId)
_ = h.subs.AddChannel(remoteChannel)
err = h.subs.Subscribe(remoteChannel, remote)
if err != nil {
return err
if h.isOnSameServer(federationInit.ServerAddress) {
// In the edge case where the two LAOs are on the same server,
// there is no need to create a websocket connection to the other
// server and message from one "server" to the "other" could be
// directly handled.
// In that case the ack result of the federation_init will be sent
// only after any federation_result sent when handling the challenge.
_ = h.handleChallenge(federationInit.ChallengeMsg, remoteChannel, nil)
return nil
}

subscribeMsg := msubscribe.Subscribe{
Base: mquery.Base{
JSONRPCBase: mjsonrpc.JSONRPCBase{
JSONRPC: "2.0",
},
Method: "subscribe",
},
Params: msubscribe.SubscribeParams{Channel: channelPath},
}

subscribeBytes, err := json.Marshal(subscribeMsg)
if err != nil {
return errors.NewJsonMarshalError(err.Error())
}

// Subscribe to /root/<local_lao>/federation on the remote server
err = h.subs.SendToAll(subscribeBytes, remoteChannel)
remote, err := h.connectTo(federationInit.ServerAddress)
if err != nil {
return err
}

// send the challenge to a channelPath where the remote server is subscribed to
return h.publishTo(federationInit.ChallengeMsg, remoteChannel)
// send the challenge to the remote channelPath on the remote socket
return h.publishTo(federationInit.ChallengeMsg, remoteChannel, remote)
}

func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string) error {
func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string,
socket socket.Socket) error {
var federationChallenge mfederation.FederationChallenge
err := msg.UnmarshalData(&federationChallenge)
if err != nil {
Expand Down Expand Up @@ -340,13 +359,26 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string) erro
return err
}

// publish the FederationResult to the other server
remoteChannel := fmt.Sprintf(channelPattern, federationExpect.LaoId)
err = h.publishTo(resultMsg, remoteChannel)
if err != nil {
return err
if h.isOnSameServer(federationExpect.ServerAddress) || socket == nil {
// In the edge case where the two LAOs are on the same server, the
// result message would already be stored and handleResult will not be
// called => broadcast the result to both federation channels directly.
_ = h.db.StoreMessageAndData(remoteChannel, resultMsg)
_ = h.subs.BroadcastToAllClients(resultMsg, remoteChannel)
} else {
// publish the FederationResult to the other server
err = h.publishTo(resultMsg, remoteChannel, socket)
if err != nil {
return err
}

// Add the socket to the list of server sockets
h.sockets.Upsert(socket)
}

h.log.Info().Msgf("A federation was successfully")

// broadcast the FederationResult to the local organizer
return h.subs.BroadcastToAllClients(resultMsg, channelPath)
}
Expand Down Expand Up @@ -405,6 +437,26 @@ func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error {
return h.subs.BroadcastToAllClients(msg, channelPath)
}

func (h *Handler) handleTokensExchange(msg mmessage.Message, channelPath string) error {
var tokensExchange mfederation.FederationTokensExchange
err := msg.UnmarshalData(&tokensExchange)
if err != nil {
return err
}

err = h.verifyLocalOrganizer(msg, channelPath)
if err != nil {
return err
}

err = h.db.StoreMessageAndData(channelPath, msg)
if err != nil {
return err
}

return h.subs.BroadcastToAllClients(msg, channelPath)
}

func (h *Handler) getOrganizerPk(federationChannel string) (string, error) {
laoChannel := strings.TrimSuffix(federationChannel, "/federation")

Expand Down Expand Up @@ -448,6 +500,17 @@ func (h *Handler) verifyLocalOrganizer(msg mmessage.Message, channelPath string)
return nil
}

func (h *Handler) isOnSameServer(address string) bool {
_, clientServerAddress, serverServerAddress, _ := h.conf.GetServerInfo()

isSameAddress := address == clientServerAddress || address == serverServerAddress

h.log.Info().Msgf("isOnSameServer=%v, remote=%s, client=%s, server=%s",
isSameAddress, address, clientServerAddress, serverServerAddress)

return isSameAddress
}

func (h *Handler) connectTo(serverAddress string) (socket.Socket, error) {
ws, _, err := websocket.DefaultDialer.Dial(serverAddress, nil)
if err != nil {
Expand All @@ -459,14 +522,15 @@ func (h *Handler) connectTo(serverAddress string) (socket.Socket, error) {
wg := h.hub.GetWaitGroup()
stopChan := h.hub.GetStopChan()

client := socket.NewClientSocket(messageChan, closedSockets, ws, wg, stopChan, h.log)
server := socket.NewServerSocket(messageChan, closedSockets, ws, wg, stopChan, h.log)
h.sockets.Upsert(server)

wg.Add(2)

go client.WritePump()
go client.ReadPump()
go server.WritePump()
go server.ReadPump()

return client, nil
return server, nil
}

func (h *Handler) createMessage(data channel.MessageData) (mmessage.Message, error) {
Expand Down Expand Up @@ -505,13 +569,14 @@ func (h *Handler) createMessage(data channel.MessageData) (mmessage.Message, err
return msg, nil
}

func (h *Handler) publishTo(msg mmessage.Message, channelPath string) error {
func (h *Handler) publishTo(msg mmessage.Message, channelPath string,
socket socket.Socket) error {
publishMsg := mpublish.Publish{
Base: mquery.Base{
JSONRPCBase: mjsonrpc.JSONRPCBase{
JSONRPC: "2.0",
},
Method: "publish",
Method: mquery.MethodPublish,
},
Params: mpublish.PublishParams{
Channel: channelPath,
Expand All @@ -524,5 +589,6 @@ func (h *Handler) publishTo(msg mmessage.Message, channelPath string) error {
return errors.NewJsonMarshalError(err.Error())
}

return h.subs.SendToAll(publishBytes, channelPath)
socket.Send(publishBytes)
return nil
}
Loading

0 comments on commit 4ed41a1

Please sign in to comment.