diff --git a/vendor/github.com/xtaci/smux/session.go b/vendor/github.com/xtaci/smux/session.go index 8c21ae4fe..be4a31868 100644 --- a/vendor/github.com/xtaci/smux/session.go +++ b/vendor/github.com/xtaci/smux/session.go @@ -178,7 +178,7 @@ func (s *Session) OpenStream() (*Stream, error) { stream := newStream(sid, s.config.MaxFrameSize, s) - if _, err := s.writeFrame(newFrame(byte(s.config.Version), cmdSYN, sid)); err != nil { + if _, err := s.writeControlFrame(newFrame(byte(s.config.Version), cmdSYN, sid)); err != nil { return nil, err } @@ -401,8 +401,12 @@ func (s *Session) recvLoop() { s.streamLock.Lock() if stream, ok := s.streams[sid]; ok { stream.pushBytes(newbuf) + // a stream used some token atomic.AddInt32(&s.bucket, -int32(written)) stream.notifyReadEvent() + } else { + // data directed to a missing/closed stream, recycle the buffer immediately. + defaultAllocator.Put(newbuf) } s.streamLock.Unlock() } else { @@ -559,9 +563,9 @@ func (s *Session) sendLoop() { } } -// writeFrame writes the frame to the underlying connection +// writeControlFrame writes the control frame to the underlying connection // and returns the number of bytes written if successful -func (s *Session) writeFrame(f Frame) (n int, err error) { +func (s *Session) writeControlFrame(f Frame) (n int, err error) { return s.writeFrameInternal(f, time.After(openCloseTimeout), CLSCTRL) } diff --git a/vendor/github.com/xtaci/smux/stream.go b/vendor/github.com/xtaci/smux/stream.go index 8b07ec9e7..1e7764171 100644 --- a/vendor/github.com/xtaci/smux/stream.go +++ b/vendor/github.com/xtaci/smux/stream.go @@ -219,6 +219,7 @@ func (s *Stream) WriteTo(w io.Writer) (n int64, err error) { if buf != nil { nw, ew := w.Write(buf) + // NOTE: WriteTo is a reader, so we need to return tokens here s.sess.returnTokens(len(buf)) defaultAllocator.Put(buf) if nw > 0 { @@ -255,6 +256,7 @@ func (s *Stream) writeTov2(w io.Writer) (n int64, err error) { if buf != nil { nw, ew := w.Write(buf) + // NOTE: WriteTo is a reader, so we need to return tokens here s.sess.returnTokens(len(buf)) defaultAllocator.Put(buf) if nw > 0 { @@ -484,7 +486,9 @@ func (s *Stream) Close() error { }) if once { - _, err = s.sess.writeFrame(newFrame(byte(s.sess.config.Version), cmdFIN, s.id)) + // send FIN in order + f := newFrame(byte(s.sess.config.Version), cmdFIN, s.id) + _, err = s.sess.writeFrameInternal(f, time.After(openCloseTimeout), CLSDATA) s.sess.streamClosed(s.id) return err } else { diff --git a/vendor/modules.txt b/vendor/modules.txt index fbcb334a1..1c446281d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -50,7 +50,7 @@ github.com/xtaci/kcp-go/v5 # github.com/xtaci/qpp v1.1.17 ## explicit; go 1.22.3 github.com/xtaci/qpp -# github.com/xtaci/smux v1.5.28 +# github.com/xtaci/smux v1.5.29 ## explicit; go 1.13 github.com/xtaci/smux # github.com/xtaci/tcpraw v1.2.31