Skip to content

Commit

Permalink
Merge pull request #24 from xconnio/leave
Browse files Browse the repository at this point in the history
session: implement leave
  • Loading branch information
om26er authored Jul 3, 2024
2 parents e3c3d18 + 7340b96 commit 788b546
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/gammazero/workerpool v1.1.3
github.com/gobwas/ws v1.4.0
github.com/stretchr/testify v1.8.4
github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7
github.com/xconnio/wampproto-go v0.0.0-20240703222643-1c76cc0481d2
github.com/xconnio/wampproto-protobuf/go v0.0.0-20240701141955-e9a1025ec125
golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d
gopkg.in/yaml.v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7 h1:DrQEqUq14XFZCHXpErCeuOxmCRpFW9Z1hnIb/8Avpk8=
github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7/go.mod h1:/b7EyR1X9EkOHPQBJGz1KvdjClo1GsalBGIzjQU5+i4=
github.com/xconnio/wampproto-go v0.0.0-20240703222643-1c76cc0481d2 h1:J2M+qVzdKVFVuOeKghQwSVHdxwX/5m3tlF5XS0kD9Sk=
github.com/xconnio/wampproto-go v0.0.0-20240703222643-1c76cc0481d2/go.mod h1:/b7EyR1X9EkOHPQBJGz1KvdjClo1GsalBGIzjQU5+i4=
github.com/xconnio/wampproto-protobuf/go v0.0.0-20240701141955-e9a1025ec125 h1:JorTUNYHzmMslyT03aJhHFQ+KnnvFaMGT5XB6K1oLLk=
github.com/xconnio/wampproto-protobuf/go v0.0.0-20240701141955-e9a1025ec125/go.mod h1:SZAkbKSDucIUBrPgcKlT0SzVmktfrsD7OdP1chFR+vw=
github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc=
Expand Down
35 changes: 33 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package xconn

import (
"context"
"errors"
"fmt"
"io"
"log"
"sync"
"time"

"github.com/xconnio/wampproto-go"
"github.com/xconnio/wampproto-go/messages"
Expand Down Expand Up @@ -32,6 +35,8 @@ type Session struct {
unsubscribeRequests map[int64]chan *UnSubscribeResponse
subscriptions map[int64]EventHandler
publishRequests map[int64]chan *PublishResponse

goodbyeChan chan struct{}
}

func NewSession(base BaseSession, serializer serializers.Serializer) *Session {
Expand All @@ -49,6 +54,8 @@ func NewSession(base BaseSession, serializer serializers.Serializer) *Session {
unsubscribeRequests: map[int64]chan *UnSubscribeResponse{},
subscriptions: map[int64]EventHandler{},
publishRequests: map[int64]chan *PublishResponse{},

goodbyeChan: make(chan struct{}, 1),
}

go session.waitForRouterMessages()
Expand All @@ -59,7 +66,10 @@ func (s *Session) waitForRouterMessages() {
for {
payload, err := s.base.Read()
if err != nil {
log.Println("failed to read message: ", err)
if !errors.Is(err, io.EOF) {
log.Println("failed to read message: ", err)
}

_ = s.base.Close()
return
}
Expand Down Expand Up @@ -225,6 +235,8 @@ func (s *Session) processIncomingMessage(msg messages.Message) error {
default:
return fmt.Errorf("unknown error message type %T", msg)
}
case messages.MessageTypeGoodbye:
s.goodbyeChan <- struct{}{}
default:
return fmt.Errorf("SESSION: received unexpected message %T", msg)
}
Expand Down Expand Up @@ -381,7 +393,7 @@ func (s *Session) UnSubscribe(ctx context.Context, subscription *Subscription) e
delete(s.subscriptions, subscription.ID)
return nil
case <-ctx.Done():
return fmt.Errorf("subscribe request timed")
return fmt.Errorf("unsubscribe request timed")
}
}

Expand Down Expand Up @@ -421,3 +433,22 @@ func (s *Session) Publish(ctx context.Context, topic string, args []any, kwArgs
return fmt.Errorf("publish request timed")
}
}

func (s *Session) Leave() error {
goodbye := messages.NewGoodBye("wamp.close.close_realm", map[string]any{})
toSend, err := s.proto.SendMessage(goodbye)
if err != nil {
return err
}

if err = s.base.Write(toSend); err != nil {
return err
}

select {
case <-s.goodbyeChan:
return nil
case <-time.After(time.Second * 10):
return errors.New("leave timeout")
}
}

0 comments on commit 788b546

Please sign in to comment.