diff --git a/mux.go b/mux.go index 74e4e2cf..da2c6fdd 100644 --- a/mux.go +++ b/mux.go @@ -2,7 +2,6 @@ package rueidis import ( "context" - "math/rand" "net" "runtime" "sync" @@ -394,22 +393,11 @@ func isBroken(err error, w wire) bool { return err != nil && err != ErrClosing && w.Error() != nil } -var rngPool = sync.Pool{ - New: func() any { - return rand.New(rand.NewSource(time.Now().UnixNano())) - }, -} - -func fastrand(n int) (r int) { - r = util.FastRand(n) - return -} - func slotfn(n int, ks uint16, noreply bool) uint16 { if n == 1 || ks == cmds.NoSlot || noreply { return 0 } - return uint16(fastrand(n)) + return uint16(util.FastRand(n)) } type muxslots struct { diff --git a/pipe.go b/pipe.go index 2aa7677c..8dace0a3 100644 --- a/pipe.go +++ b/pipe.go @@ -340,12 +340,15 @@ func (p *pipe) _background() { }() { p._exit(p._backgroundRead()) - atomic.CompareAndSwapInt32(&p.state, 2, 3) // make write goroutine to exit - atomic.AddInt32(&p.waits, 1) - go func() { - <-p.queue.PutOne(cmds.PingCmd) - atomic.AddInt32(&p.waits, -1) - }() + select { + case <-p.close: + default: + atomic.AddInt32(&p.waits, 1) + go func() { + <-p.queue.PutOne(cmds.PingCmd) // avoid _backgroundWrite hanging at p.queue.WaitForWrite() + atomic.AddInt32(&p.waits, -1) + }() + } } err := p.Error() p.nsubs.Close() @@ -373,7 +376,7 @@ func (p *pipe) _background() { resp := newErrResult(err) for atomic.LoadInt32(&p.waits) != 0 { select { - case <-p.close: + case <-p.close: // p.queue.NextWriteCmd() can only be called after _backgroundWrite _, _, _ = p.queue.NextWriteCmd() default: } @@ -404,34 +407,27 @@ func (p *pipe) _backgroundWrite() (err error) { flushStart = time.Time{} ) - for atomic.LoadInt32(&p.state) < 3 { + for err == nil { if ones[0], multi, ch = p.queue.NextWriteCmd(); ch == nil { if flushDelay != 0 { flushStart = time.Now() } - if p.w.Buffered() == 0 { - err = p.Error() - } else { - err = p.w.Flush() + if p.w.Buffered() != 0 { + if err = p.w.Flush(); err != nil { + break + } } - if err == nil { - if atomic.LoadInt32(&p.state) == 1 { - ones[0], multi, ch = p.queue.WaitForWrite() - } else { - runtime.Gosched() - continue + ones[0], multi, ch = p.queue.WaitForWrite() + if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage + // Blocking commands are executed in dedicated client which is acquired from pool. + // So, there is no sense to wait other commands to be written. + // https://github.com/redis/rueidis/issues/379 + var blocked bool + for i := 0; i < len(multi) && !blocked; i++ { + blocked = multi[i].IsBlock() } - if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage - // Blocking commands are executed in dedicated client which is acquired from pool. - // So, there is no sense to wait other commands to be written. - // https://github.com/redis/rueidis/issues/379 - var blocked bool - for i := 0; i < len(multi) && !blocked; i++ { - blocked = blocked || multi[i].IsBlock() - } - if !blocked { - time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/redis/rueidis/issues/156 - } + if !blocked { + time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/redis/rueidis/issues/156 } } } @@ -441,12 +437,6 @@ func (p *pipe) _backgroundWrite() (err error) { for _, cmd := range multi { err = writeCmd(p.w, cmd.Commands()) } - if err != nil { - if err != ErrClosing { // ignore ErrClosing to allow the final PING command to be sent - return - } - runtime.Gosched() - } } return } @@ -1480,9 +1470,16 @@ func (p *pipe) Close() { p.background() } if block == 1 && (stopping1 || stopping2) { // make sure there is no block cmd + atomic.AddInt32(&p.waits, 1) + ch := p.queue.PutOne(cmds.PingCmd) select { - case <-p.queue.PutOne(cmds.PingCmd): + case <-ch: + atomic.AddInt32(&p.waits, -1) case <-time.After(time.Second): + go func(ch chan RedisResult) { + <-ch + atomic.AddInt32(&p.waits, -1) + }(ch) } } } diff --git a/pipe_test.go b/pipe_test.go index da3590cd..85c4597d 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -3433,6 +3433,26 @@ func TestCloseAndWaitPendingCMDs(t *testing.T) { wg.Wait() } +func TestCloseWithGracefulPeriodExceeded(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + p, mock, _, _ := setup(t, ClientOption{}) + go func() { + p.Close() + }() + mock.Expect("PING") + <-p.close +} + +func TestCloseWithPipeliningAndGracefulPeriodExceeded(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + p, mock, _, _ := setup(t, ClientOption{AlwaysPipelining: true}) + go func() { + p.Close() + }() + mock.Expect("PING") + <-p.close +} + func TestAlreadyCanceledContext(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) p, _, close, closeConn := setup(t, ClientOption{})