Skip to content

Commit

Permalink
Merge pull request #20 from xconnio/pubsub
Browse files Browse the repository at this point in the history
Implement Publish/Subscribe
  • Loading branch information
om26er authored Jun 30, 2024
2 parents f035f68 + 59b45a9 commit 260f672
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ build:
go build ./cmd/xconn

run:
go run ./cmd/xconn
go run ./cmd/xconn start

build-docs:
mkdir -p site/xconn/
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/gammazero/workerpool v1.1.3
github.com/gobwas/ws v1.4.0
github.com/stretchr/testify v1.8.4
github.com/xconnio/wampproto-go v0.0.0-20240612115507-11680983472a
github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7
github.com/xconnio/wampproto-protobuf/go v0.0.0-20240611092706-1e859744b5a2
golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d
gopkg.in/yaml.v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xconnio/wampproto-go v0.0.0-20240612115507-11680983472a h1:/JukUnOTvgfduNO9qh8A4Wl+pqJvsy57/Crh0Xc/kVo=
github.com/xconnio/wampproto-go v0.0.0-20240612115507-11680983472a/go.mod h1:/b7EyR1X9EkOHPQBJGz1KvdjClo1GsalBGIzjQU5+i4=
github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7 h1:DrQEqUq14XFZCHXpErCeuOxmCRpFW9Z1hnIb/8Avpk8=
github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7/go.mod h1:/b7EyR1X9EkOHPQBJGz1KvdjClo1GsalBGIzjQU5+i4=
github.com/xconnio/wampproto-protobuf/go v0.0.0-20240611092706-1e859744b5a2 h1:1WkQ68ICoin0wTerMn5FrWl+ZbK4XS3/W2Wr86jS9K8=
github.com/xconnio/wampproto-protobuf/go v0.0.0-20240611092706-1e859744b5a2/go.mod h1:SZAkbKSDucIUBrPgcKlT0SzVmktfrsD7OdP1chFR+vw=
github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc=
Expand Down
50 changes: 48 additions & 2 deletions realm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
)

type Realm struct {
broker *wampproto.Broker
dealer *wampproto.Dealer
clients internal.Map[int64, BaseSession]
}

func NewRealm() *Realm {
return &Realm{
broker: wampproto.NewBroker(),
dealer: wampproto.NewDealer(),
clients: internal.Map[int64, BaseSession]{},
}
Expand All @@ -25,12 +27,30 @@ func (r *Realm) AttachClient(base BaseSession) error {

details := wampproto.NewSessionDetails(base.ID(), base.Realm(), base.AuthID(), base.AuthRole(),
base.Serializer().Static())
return r.dealer.AddSession(details)

if err := r.broker.AddSession(details); err != nil {
return err
}

if err := r.dealer.AddSession(details); err != nil {
return err
}

return nil
}

func (r *Realm) DetachClient(base BaseSession) error {
r.clients.Delete(base.ID())
return r.dealer.RemoveSession(base.ID())

if err := r.broker.RemoveSession(base.ID()); err != nil {
return err
}

if err := r.dealer.RemoveSession(base.ID()); err != nil {
return err
}

return nil
}

func (r *Realm) ReceiveMessage(sessionID int64, msg messages.Message) error {
Expand All @@ -44,6 +64,32 @@ func (r *Realm) ReceiveMessage(sessionID int64, msg messages.Message) error {

client, _ := r.clients.Load(msgWithRecipient.Recipient)
return client.WriteMessage(msgWithRecipient.Message)
case messages.MessageTypeSubscribe, messages.MessageTypeUnSubscribe:
msgWithRecipient, err := r.broker.ReceiveMessage(sessionID, msg)
if err != nil {
return err
}

client, _ := r.clients.Load(msgWithRecipient.Recipient)
return client.WriteMessage(msgWithRecipient.Message)
case messages.MessageTypePublish:
publish := msg.(*messages.Publish)
publication, err := r.broker.ReceivePublish(sessionID, publish)
if err != nil {
return err
}

for _, recipientID := range publication.Recipients {
client, _ := r.clients.Load(recipientID)
_ = client.WriteMessage(publication.Event)
}

if publication.Ack != nil {
client, _ := r.clients.Load(publication.Ack.Recipient)
_ = client.WriteMessage(publication.Ack.Message)
}

return nil
case messages.MessageTypeGoodbye:
if err := r.dealer.RemoveSession(sessionID); err != nil {
return err
Expand Down

0 comments on commit 260f672

Please sign in to comment.