diff --git a/Makefile b/Makefile index 8239470..c178dd6 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ build: go build ./cmd/xconn run: - go run ./cmd/xconn + go run ./cmd/xconn start build-docs: mkdir -p site/xconn/ diff --git a/go.mod b/go.mod index 99dbcd4..1b5d2b0 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/gammazero/workerpool v1.1.3 github.com/gobwas/ws v1.4.0 github.com/stretchr/testify v1.8.4 - github.com/xconnio/wampproto-go v0.0.0-20240612115507-11680983472a + github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7 github.com/xconnio/wampproto-protobuf/go v0.0.0-20240611092706-1e859744b5a2 golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 37efa6e..ac5fad1 100644 --- a/go.sum +++ b/go.sum @@ -36,8 +36,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= -github.com/xconnio/wampproto-go v0.0.0-20240612115507-11680983472a h1:/JukUnOTvgfduNO9qh8A4Wl+pqJvsy57/Crh0Xc/kVo= -github.com/xconnio/wampproto-go v0.0.0-20240612115507-11680983472a/go.mod h1:/b7EyR1X9EkOHPQBJGz1KvdjClo1GsalBGIzjQU5+i4= +github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7 h1:DrQEqUq14XFZCHXpErCeuOxmCRpFW9Z1hnIb/8Avpk8= +github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7/go.mod h1:/b7EyR1X9EkOHPQBJGz1KvdjClo1GsalBGIzjQU5+i4= github.com/xconnio/wampproto-protobuf/go v0.0.0-20240611092706-1e859744b5a2 h1:1WkQ68ICoin0wTerMn5FrWl+ZbK4XS3/W2Wr86jS9K8= github.com/xconnio/wampproto-protobuf/go v0.0.0-20240611092706-1e859744b5a2/go.mod h1:SZAkbKSDucIUBrPgcKlT0SzVmktfrsD7OdP1chFR+vw= github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc= diff --git a/realm.go b/realm.go index 8396868..9ea0584 100644 --- a/realm.go +++ b/realm.go @@ -9,12 +9,14 @@ import ( ) type Realm struct { + broker *wampproto.Broker dealer *wampproto.Dealer clients internal.Map[int64, BaseSession] } func NewRealm() *Realm { return &Realm{ + broker: wampproto.NewBroker(), dealer: wampproto.NewDealer(), clients: internal.Map[int64, BaseSession]{}, } @@ -25,12 +27,30 @@ func (r *Realm) AttachClient(base BaseSession) error { details := wampproto.NewSessionDetails(base.ID(), base.Realm(), base.AuthID(), base.AuthRole(), base.Serializer().Static()) - return r.dealer.AddSession(details) + + if err := r.broker.AddSession(details); err != nil { + return err + } + + if err := r.dealer.AddSession(details); err != nil { + return err + } + + return nil } func (r *Realm) DetachClient(base BaseSession) error { r.clients.Delete(base.ID()) - return r.dealer.RemoveSession(base.ID()) + + if err := r.broker.RemoveSession(base.ID()); err != nil { + return err + } + + if err := r.dealer.RemoveSession(base.ID()); err != nil { + return err + } + + return nil } func (r *Realm) ReceiveMessage(sessionID int64, msg messages.Message) error { @@ -44,6 +64,32 @@ func (r *Realm) ReceiveMessage(sessionID int64, msg messages.Message) error { client, _ := r.clients.Load(msgWithRecipient.Recipient) return client.WriteMessage(msgWithRecipient.Message) + case messages.MessageTypeSubscribe, messages.MessageTypeUnSubscribe: + msgWithRecipient, err := r.broker.ReceiveMessage(sessionID, msg) + if err != nil { + return err + } + + client, _ := r.clients.Load(msgWithRecipient.Recipient) + return client.WriteMessage(msgWithRecipient.Message) + case messages.MessageTypePublish: + publish := msg.(*messages.Publish) + publication, err := r.broker.ReceivePublish(sessionID, publish) + if err != nil { + return err + } + + for _, recipientID := range publication.Recipients { + client, _ := r.clients.Load(recipientID) + _ = client.WriteMessage(publication.Event) + } + + if publication.Ack != nil { + client, _ := r.clients.Load(publication.Ack.Recipient) + _ = client.WriteMessage(publication.Ack.Message) + } + + return nil case messages.MessageTypeGoodbye: if err := r.dealer.RemoveSession(sessionID); err != nil { return err