Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove unnecessary busy loop during shutdown #575

Merged
merged 5 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rueidis

import (
"context"
"math/rand"
"net"
"runtime"
"sync"
Expand Down Expand Up @@ -394,22 +393,11 @@ func isBroken(err error, w wire) bool {
return err != nil && err != ErrClosing && w.Error() != nil
}

var rngPool = sync.Pool{
New: func() any {
return rand.New(rand.NewSource(time.Now().UnixNano()))
},
}

func fastrand(n int) (r int) {
r = util.FastRand(n)
return
}

func slotfn(n int, ks uint16, noreply bool) uint16 {
if n == 1 || ks == cmds.NoSlot || noreply {
return 0
}
return uint16(fastrand(n))
return uint16(util.FastRand(n))
}

type muxslots struct {
Expand Down
69 changes: 33 additions & 36 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,15 @@ func (p *pipe) _background() {
}()
{
p._exit(p._backgroundRead())
atomic.CompareAndSwapInt32(&p.state, 2, 3) // make write goroutine to exit
atomic.AddInt32(&p.waits, 1)
go func() {
<-p.queue.PutOne(cmds.PingCmd)
atomic.AddInt32(&p.waits, -1)
}()
select {
case <-p.close:
default:
atomic.AddInt32(&p.waits, 1)
go func() {
<-p.queue.PutOne(cmds.PingCmd) // avoid _backgroundWrite hanging at p.queue.WaitForWrite()
atomic.AddInt32(&p.waits, -1)
}()
}
}
err := p.Error()
p.nsubs.Close()
Expand Down Expand Up @@ -373,7 +376,7 @@ func (p *pipe) _background() {
resp := newErrResult(err)
for atomic.LoadInt32(&p.waits) != 0 {
select {
case <-p.close:
case <-p.close: // p.queue.NextWriteCmd() can only be called after _backgroundWrite
_, _, _ = p.queue.NextWriteCmd()
default:
}
Expand Down Expand Up @@ -404,34 +407,27 @@ func (p *pipe) _backgroundWrite() (err error) {
flushStart = time.Time{}
)

for atomic.LoadInt32(&p.state) < 3 {
for err == nil {
if ones[0], multi, ch = p.queue.NextWriteCmd(); ch == nil {
if flushDelay != 0 {
flushStart = time.Now()
}
if p.w.Buffered() == 0 {
err = p.Error()
} else {
err = p.w.Flush()
if p.w.Buffered() != 0 {
if err = p.w.Flush(); err != nil {
break
}
}
if err == nil {
if atomic.LoadInt32(&p.state) == 1 {
ones[0], multi, ch = p.queue.WaitForWrite()
} else {
runtime.Gosched()
continue
ones[0], multi, ch = p.queue.WaitForWrite()
if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage
// Blocking commands are executed in dedicated client which is acquired from pool.
// So, there is no sense to wait other commands to be written.
// https://github.com/redis/rueidis/issues/379
var blocked bool
for i := 0; i < len(multi) && !blocked; i++ {
blocked = multi[i].IsBlock()
}
if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage
// Blocking commands are executed in dedicated client which is acquired from pool.
// So, there is no sense to wait other commands to be written.
// https://github.com/redis/rueidis/issues/379
var blocked bool
for i := 0; i < len(multi) && !blocked; i++ {
blocked = blocked || multi[i].IsBlock()
}
if !blocked {
time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/redis/rueidis/issues/156
}
if !blocked {
time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/redis/rueidis/issues/156
}
}
}
Expand All @@ -441,12 +437,6 @@ func (p *pipe) _backgroundWrite() (err error) {
for _, cmd := range multi {
err = writeCmd(p.w, cmd.Commands())
}
if err != nil {
if err != ErrClosing { // ignore ErrClosing to allow the final PING command to be sent
return
}
runtime.Gosched()
}
}
return
}
Expand Down Expand Up @@ -1480,9 +1470,16 @@ func (p *pipe) Close() {
p.background()
}
if block == 1 && (stopping1 || stopping2) { // make sure there is no block cmd
atomic.AddInt32(&p.waits, 1)
ch := p.queue.PutOne(cmds.PingCmd)
select {
case <-p.queue.PutOne(cmds.PingCmd):
case <-ch:
atomic.AddInt32(&p.waits, -1)
case <-time.After(time.Second):
go func(ch chan RedisResult) {
<-ch
atomic.AddInt32(&p.waits, -1)
}(ch)
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3433,6 +3433,26 @@ func TestCloseAndWaitPendingCMDs(t *testing.T) {
wg.Wait()
}

func TestCloseWithGracefulPeriodExceeded(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, mock, _, _ := setup(t, ClientOption{})
go func() {
p.Close()
}()
mock.Expect("PING")
<-p.close
}

func TestCloseWithPipeliningAndGracefulPeriodExceeded(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, mock, _, _ := setup(t, ClientOption{AlwaysPipelining: true})
go func() {
p.Close()
}()
mock.Expect("PING")
<-p.close
}

func TestAlreadyCanceledContext(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, _, close, closeConn := setup(t, ClientOption{})
Expand Down