From a6343b4d5a22095decca5831e0da77304b259624 Mon Sep 17 00:00:00 2001 From: Rueian Date: Tue, 18 Jun 2024 23:33:49 +0800 Subject: [PATCH 1/5] fix: remove unnecessary busy loop during shutdown Signed-off-by: Rueian --- pipe.go | 56 +++++++++++++++++++++++++------------------------------- 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/pipe.go b/pipe.go index 2aa7677c..3bea1be4 100644 --- a/pipe.go +++ b/pipe.go @@ -343,7 +343,7 @@ func (p *pipe) _background() { atomic.CompareAndSwapInt32(&p.state, 2, 3) // make write goroutine to exit atomic.AddInt32(&p.waits, 1) go func() { - <-p.queue.PutOne(cmds.PingCmd) + <-p.queue.PutOne(cmds.PingCmd) // avoid _backgroundWrite hanging at p.queue.WaitForWrite() atomic.AddInt32(&p.waits, -1) }() } @@ -373,7 +373,7 @@ func (p *pipe) _background() { resp := newErrResult(err) for atomic.LoadInt32(&p.waits) != 0 { select { - case <-p.close: + case <-p.close: // p.queue.NextWriteCmd() can only be called after _backgroundWrite _, _, _ = p.queue.NextWriteCmd() default: } @@ -404,34 +404,27 @@ func (p *pipe) _backgroundWrite() (err error) { flushStart = time.Time{} ) - for atomic.LoadInt32(&p.state) < 3 { + for err == nil { if ones[0], multi, ch = p.queue.NextWriteCmd(); ch == nil { if flushDelay != 0 { flushStart = time.Now() } - if p.w.Buffered() == 0 { - err = p.Error() - } else { - err = p.w.Flush() + if p.w.Buffered() != 0 { + if err = p.w.Flush(); err != nil { + break + } } - if err == nil { - if atomic.LoadInt32(&p.state) == 1 { - ones[0], multi, ch = p.queue.WaitForWrite() - } else { - runtime.Gosched() - continue + ones[0], multi, ch = p.queue.WaitForWrite() + if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage + // Blocking commands are executed in dedicated client which is acquired from pool. + // So, there is no sense to wait other commands to be written. + // https://github.com/redis/rueidis/issues/379 + var blocked bool + for i := 0; i < len(multi) && !blocked; i++ { + blocked = multi[i].IsBlock() } - if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage - // Blocking commands are executed in dedicated client which is acquired from pool. - // So, there is no sense to wait other commands to be written. - // https://github.com/redis/rueidis/issues/379 - var blocked bool - for i := 0; i < len(multi) && !blocked; i++ { - blocked = blocked || multi[i].IsBlock() - } - if !blocked { - time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/redis/rueidis/issues/156 - } + if !blocked { + time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/redis/rueidis/issues/156 } } } @@ -441,12 +434,6 @@ func (p *pipe) _backgroundWrite() (err error) { for _, cmd := range multi { err = writeCmd(p.w, cmd.Commands()) } - if err != nil { - if err != ErrClosing { // ignore ErrClosing to allow the final PING command to be sent - return - } - runtime.Gosched() - } } return } @@ -1480,9 +1467,16 @@ func (p *pipe) Close() { p.background() } if block == 1 && (stopping1 || stopping2) { // make sure there is no block cmd + atomic.AddInt32(&p.waits, 1) + ch := p.queue.PutOne(cmds.PingCmd) select { - case <-p.queue.PutOne(cmds.PingCmd): + case <-ch: + atomic.AddInt32(&p.waits, -1) case <-time.After(time.Second): + go func(ch chan RedisResult) { + <-ch + atomic.AddInt32(&p.waits, -1) + }(ch) } } } From 077fc01f677099b917b042a120d25e69f48d6558 Mon Sep 17 00:00:00 2001 From: Rueian Date: Wed, 19 Jun 2024 22:18:19 +0800 Subject: [PATCH 2/5] test: perf: set 1s limit for graceful shutdown --- pipe_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pipe_test.go b/pipe_test.go index da3590cd..5ac4079d 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -3433,6 +3433,16 @@ func TestCloseAndWaitPendingCMDs(t *testing.T) { wg.Wait() } +func TestCloseWithGracefulPeriodExceeded(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + p, mock, _, _ := setup(t, ClientOption{}) + go func() { + p.Close() + }() + mock.Expect("PING") + <-p.close +} + func TestAlreadyCanceledContext(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) p, _, close, closeConn := setup(t, ClientOption{}) From b5d343915c12efecd09bd4cca55c4ac100a7dda1 Mon Sep 17 00:00:00 2001 From: Rueian Date: Wed, 19 Jun 2024 22:23:05 +0800 Subject: [PATCH 3/5] feat: remove the dead rngPool Signed-off-by: Rueian --- mux.go | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/mux.go b/mux.go index 74e4e2cf..da2c6fdd 100644 --- a/mux.go +++ b/mux.go @@ -2,7 +2,6 @@ package rueidis import ( "context" - "math/rand" "net" "runtime" "sync" @@ -394,22 +393,11 @@ func isBroken(err error, w wire) bool { return err != nil && err != ErrClosing && w.Error() != nil } -var rngPool = sync.Pool{ - New: func() any { - return rand.New(rand.NewSource(time.Now().UnixNano())) - }, -} - -func fastrand(n int) (r int) { - r = util.FastRand(n) - return -} - func slotfn(n int, ks uint16, noreply bool) uint16 { if n == 1 || ks == cmds.NoSlot || noreply { return 0 } - return uint16(fastrand(n)) + return uint16(util.FastRand(n)) } type muxslots struct { From 878d55efad193a182b0c665d776cc43f7a3175fb Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 20 Jun 2024 00:06:51 +0800 Subject: [PATCH 4/5] perf: remove unnecessary goroutine when we know the _backgroundWrite has exited Signed-off-by: Rueian --- pipe.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pipe.go b/pipe.go index 3bea1be4..8dace0a3 100644 --- a/pipe.go +++ b/pipe.go @@ -340,12 +340,15 @@ func (p *pipe) _background() { }() { p._exit(p._backgroundRead()) - atomic.CompareAndSwapInt32(&p.state, 2, 3) // make write goroutine to exit - atomic.AddInt32(&p.waits, 1) - go func() { - <-p.queue.PutOne(cmds.PingCmd) // avoid _backgroundWrite hanging at p.queue.WaitForWrite() - atomic.AddInt32(&p.waits, -1) - }() + select { + case <-p.close: + default: + atomic.AddInt32(&p.waits, 1) + go func() { + <-p.queue.PutOne(cmds.PingCmd) // avoid _backgroundWrite hanging at p.queue.WaitForWrite() + atomic.AddInt32(&p.waits, -1) + }() + } } err := p.Error() p.nsubs.Close() From 9fe24e512c19b93eeb71ee476fd6b344ce4ee331 Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 20 Jun 2024 19:20:12 +0800 Subject: [PATCH 5/5] test: perf: set 1s limit for graceful shutdown Signed-off-by: Rueian --- pipe_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pipe_test.go b/pipe_test.go index 5ac4079d..85c4597d 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -3443,6 +3443,16 @@ func TestCloseWithGracefulPeriodExceeded(t *testing.T) { <-p.close } +func TestCloseWithPipeliningAndGracefulPeriodExceeded(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + p, mock, _, _ := setup(t, ClientOption{AlwaysPipelining: true}) + go func() { + p.Close() + }() + mock.Expect("PING") + <-p.close +} + func TestAlreadyCanceledContext(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) p, _, close, closeConn := setup(t, ClientOption{})