diff --git a/go.mod b/go.mod index 2ef2f04..5090180 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/gammazero/workerpool v1.1.3 github.com/gobwas/ws v1.4.0 github.com/stretchr/testify v1.8.4 - github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7 + github.com/xconnio/wampproto-go v0.0.0-20240703222643-1c76cc0481d2 github.com/xconnio/wampproto-protobuf/go v0.0.0-20240701141955-e9a1025ec125 golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 5cfd0e4..5a636ec 100644 --- a/go.sum +++ b/go.sum @@ -36,8 +36,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= -github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7 h1:DrQEqUq14XFZCHXpErCeuOxmCRpFW9Z1hnIb/8Avpk8= -github.com/xconnio/wampproto-go v0.0.0-20240630150305-316020c30fb7/go.mod h1:/b7EyR1X9EkOHPQBJGz1KvdjClo1GsalBGIzjQU5+i4= +github.com/xconnio/wampproto-go v0.0.0-20240703222643-1c76cc0481d2 h1:J2M+qVzdKVFVuOeKghQwSVHdxwX/5m3tlF5XS0kD9Sk= +github.com/xconnio/wampproto-go v0.0.0-20240703222643-1c76cc0481d2/go.mod h1:/b7EyR1X9EkOHPQBJGz1KvdjClo1GsalBGIzjQU5+i4= github.com/xconnio/wampproto-protobuf/go v0.0.0-20240701141955-e9a1025ec125 h1:JorTUNYHzmMslyT03aJhHFQ+KnnvFaMGT5XB6K1oLLk= github.com/xconnio/wampproto-protobuf/go v0.0.0-20240701141955-e9a1025ec125/go.mod h1:SZAkbKSDucIUBrPgcKlT0SzVmktfrsD7OdP1chFR+vw= github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc= diff --git a/session.go b/session.go index 2188a93..609b5f9 100644 --- a/session.go +++ b/session.go @@ -2,9 +2,12 @@ package xconn import ( "context" + "errors" "fmt" + "io" "log" "sync" + "time" "github.com/xconnio/wampproto-go" "github.com/xconnio/wampproto-go/messages" @@ -32,6 +35,8 @@ type Session struct { unsubscribeRequests map[int64]chan *UnSubscribeResponse subscriptions map[int64]EventHandler publishRequests map[int64]chan *PublishResponse + + goodbyeChan chan struct{} } func NewSession(base BaseSession, serializer serializers.Serializer) *Session { @@ -49,6 +54,8 @@ func NewSession(base BaseSession, serializer serializers.Serializer) *Session { unsubscribeRequests: map[int64]chan *UnSubscribeResponse{}, subscriptions: map[int64]EventHandler{}, publishRequests: map[int64]chan *PublishResponse{}, + + goodbyeChan: make(chan struct{}, 1), } go session.waitForRouterMessages() @@ -59,7 +66,10 @@ func (s *Session) waitForRouterMessages() { for { payload, err := s.base.Read() if err != nil { - log.Println("failed to read message: ", err) + if !errors.Is(err, io.EOF) { + log.Println("failed to read message: ", err) + } + _ = s.base.Close() return } @@ -225,6 +235,8 @@ func (s *Session) processIncomingMessage(msg messages.Message) error { default: return fmt.Errorf("unknown error message type %T", msg) } + case messages.MessageTypeGoodbye: + s.goodbyeChan <- struct{}{} default: return fmt.Errorf("SESSION: received unexpected message %T", msg) } @@ -381,7 +393,7 @@ func (s *Session) UnSubscribe(ctx context.Context, subscription *Subscription) e delete(s.subscriptions, subscription.ID) return nil case <-ctx.Done(): - return fmt.Errorf("subscribe request timed") + return fmt.Errorf("unsubscribe request timed") } } @@ -421,3 +433,22 @@ func (s *Session) Publish(ctx context.Context, topic string, args []any, kwArgs return fmt.Errorf("publish request timed") } } + +func (s *Session) Leave() error { + goodbye := messages.NewGoodBye("wamp.close.close_realm", map[string]any{}) + toSend, err := s.proto.SendMessage(goodbye) + if err != nil { + return err + } + + if err = s.base.Write(toSend); err != nil { + return err + } + + select { + case <-s.goodbyeChan: + return nil + case <-time.After(time.Second * 10): + return errors.New("leave timeout") + } +}