Skip to content

Commit

Permalink
fix: cache auto retry on MOVED / ASK / ... (#701)
Browse files Browse the repository at this point in the history
* fix: cache auto retry on MOVED / ASK / ...

* fix: askingMultiCache should retry {Cmd} with ASKING

* fix DoXCache testcase

* Revert "fix DoXCache testcase"

This reverts commit 182897e.

* to: simplify error handler

* fix: ClientSideCachingExecAbort cases

* to: add testcase for askingMultiCache

---------

Co-authored-by: wuyuxiang <[email protected]>
  • Loading branch information
wyxloading and wuyuxiang authored Dec 17, 2024
1 parent 95b5b32 commit 8aa9298
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 24 deletions.
3 changes: 3 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,9 @@ func askingMultiCache(cc conn, ctx context.Context, multi []CacheableTTL) *redis
resps := cc.DoMulti(ctx, commands...)
for i := 5; i < len(resps.s); i += 6 {
if arr, err := resps.s[i].ToArray(); err != nil {
if preErr := resps.s[i-1].Error(); preErr != nil { // if {Cmd} get a RedisError
err = preErr
}
results.s = append(results.s, newErrResult(err))
} else {
results.s = append(results.s, newResult(arr[len(arr)-1], nil))
Expand Down
69 changes: 69 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5535,6 +5535,75 @@ func TestClusterClientMovedRetry(t *testing.T) {
})
}

func TestClusterClientCacheASKRetry(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())

setup := func() (*clusterClient, *mockConn) {
m := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiResp
}
return RedisResult{}
},
}
client, err := newClusterClient(
&ClientOption{InitAddress: []string{":0"}},
func(dst string, opt *ClientOption) conn { return m },
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
return client, m
}

t.Run("DoCache Retry on ASK", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoCacheFn = func(cmd Cacheable, ttl time.Duration) RedisResult {
return newResult(RedisMessage{typ: '-', string: "ASK 0 :0"}, nil)
}
m.DoMultiFn = func(multi ...Completed) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{{}, {}, {}, {}, newResult(RedisMessage{typ: '-', string: "ASK 0 :0"}, nil), newResult(RedisMessage{typ: '_'}, nil)}}
}
return &redisresults{s: []RedisResult{{}, {}, {}, {}, {}, newResult(RedisMessage{typ: '*', values: []RedisMessage{{}, {}, {}, {}, {}, {typ: '+', string: "OK"}}}, nil)}}
}
resp := client.DoCache(context.Background(), client.B().Get().Key("a1").Cache(), 10*time.Second)
if v, err := resp.ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
if attempts != 2 {
t.Fatalf("expected 2 attempts, got %v", attempts)
}
})

t.Run("DoMultiCache Retry on ASK", func(t *testing.T) {
client, m := setup()

attempts := 0
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '-', string: "ASK 0 :0"}, nil)}}
}
m.DoMultiFn = func(multi ...Completed) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{{}, {}, {}, {}, newResult(RedisMessage{typ: '-', string: "ASK 0 :0"}, nil), newResult(RedisMessage{typ: '_'}, nil)}}
}
return &redisresults{s: []RedisResult{{}, {}, {}, {}, {}, newResult(RedisMessage{typ: '*', values: []RedisMessage{{}, {}, {}, {}, {}, RedisMessage{typ: '+', string: "OK"}}}, nil)}}
}
resps := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("a1").Cache(), 10*time.Second))
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
if attempts != 2 {
t.Fatalf("expected 2 attempts, got %v", attempts)
}
})
}

//gocyclo:ignore
func TestClusterClient_SendReadOperationToReplicaNodeWriteOperationToPrimaryNode(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
Expand Down
20 changes: 20 additions & 0 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,11 @@ func (p *pipe) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) Re
if err != nil {
if _, ok := err.(*RedisError); ok {
err = ErrDoCacheAborted
if preErr := resp.s[3].Error(); preErr != nil { // if {cmd} get a RedisError
if _, ok := preErr.(*RedisError); ok {
err = preErr
}
}
}
p.cache.Cancel(ck, cc, err)
return newErrResult(err)
Expand Down Expand Up @@ -1379,6 +1384,11 @@ func (p *pipe) doCacheMGet(ctx context.Context, cmd Cacheable, ttl time.Duration
if err != nil {
if _, ok := err.(*RedisError); ok {
err = ErrDoCacheAborted
if preErr := resp.s[len(multi)-2].Error(); preErr != nil { // if {rewritten} get a RedisError
if _, ok := preErr.(*RedisError); ok {
err = preErr
}
}
}
for _, key := range rewritten.Commands()[1 : keys+1] {
p.cache.Cancel(key, mgetcc, err)
Expand Down Expand Up @@ -1474,6 +1484,11 @@ func (p *pipe) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisre
if err := resp.s[i].Error(); err != nil {
if _, ok := err.(*RedisError); ok {
err = ErrDoCacheAborted
if preErr := resp.s[i-1].Error(); preErr != nil { // if {cmd} get a RedisError
if _, ok := preErr.(*RedisError); ok {
err = preErr
}
}
}
ck, cc := cmds.CacheKey(Cacheable(missing[i-1]))
p.cache.Cancel(ck, cc, err)
Expand All @@ -1497,6 +1512,11 @@ func (p *pipe) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisre
if err != nil {
if _, ok := err.(*RedisError); ok {
err = ErrDoCacheAborted
if preErr := resp.s[i-1].Error(); preErr != nil { // if {cmd} get a RedisError
if _, ok := preErr.(*RedisError); ok {
err = preErr
}
}
}
results.s[j] = newErrResult(err)
} else {
Expand Down
110 changes: 86 additions & 24 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,25 +1316,45 @@ func TestClientSideCachingExecAbort(t *testing.T) {
go func() {
mock.Expect("CLIENT", "CACHING", "YES").
Expect("MULTI").
Expect("PTTL", "a").
Expect("GET", "a").
Expect("PTTL", "a1").
Expect("GET", "a1").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
Reply(RedisMessage{typ: '_'})
mock.Expect("CLIENT", "CACHING", "YES").
Expect("MULTI").
Expect("PTTL", "a2").
Expect("GET", "a2").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyError("MOVED 0 127.0.0.1").
ReplyError("MOVED 0 127.0.0.1").
Reply(RedisMessage{typ: '_'})
}()

v, err := p.DoCache(context.Background(), Cacheable(cmds.NewCompleted([]string{"GET", "a"})), 10*time.Second).ToMessage()
if err != ErrDoCacheAborted {
t.Errorf("unexpected err, got %v", err)
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
if v, entry := p.cache.Flight("a", "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
for i, key := range []string{"a1", "a2"} {
v, err := p.DoCache(context.Background(), Cacheable(cmds.NewCompleted([]string{"GET", key})), 10*time.Second).ToMessage()
if i == 0 {
if err != ErrDoCacheAborted {
t.Errorf("unexpected err, got %v", err)
}
} else {
if re, ok := err.(*RedisError); !ok {
t.Errorf("unexpected err, got %v", err)
} else if _, moved := re.IsMoved(); !moved {
t.Errorf("unexpected err, got %v", err)
}
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
if v, entry := p.cache.Flight(key, "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
}
}
}

Expand Down Expand Up @@ -1641,20 +1661,42 @@ func TestClientSideCachingExecAbortMGet(t *testing.T) {
ReplyString("OK").
ReplyString("OK").
Reply(RedisMessage{typ: '_'})
mock.Expect("CLIENT", "CACHING", "YES").
Expect("MULTI").
Expect("PTTL", "b1").
Expect("PTTL", "b2").
Expect("MGET", "b1", "b2").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
ReplyError("MOVED 0 127.0.0.1").
Reply(RedisMessage{typ: '_'})
}()

v, err := p.DoCache(context.Background(), Cacheable(cmds.NewMGetCompleted([]string{"MGET", "a1", "a2"})), 10*time.Second).ToMessage()
if err != ErrDoCacheAborted {
t.Errorf("unexpected err, got %v", err)
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
if v, entry := p.cache.Flight("a1", "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
}
if v, entry := p.cache.Flight("a2", "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
for i, pair := range [][2]string{{"a1", "a2"}, {"b1", "b2"}} {
v, err := p.DoCache(context.Background(), Cacheable(cmds.NewMGetCompleted([]string{"MGET", pair[0], pair[1]})), 10*time.Second).ToMessage()
if i == 0 {
if err != ErrDoCacheAborted {
t.Errorf("unexpected err, got %v", err)
}
} else {
if re, ok := err.(*RedisError); !ok {
t.Errorf("unexpected err, got %v", err)
} else if _, moved := re.IsMoved(); !moved {
t.Errorf("unexpected err, got %v", err)
}
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
if v, entry := p.cache.Flight(pair[0], "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
}
if v, entry := p.cache.Flight(pair[1], "GET", time.Second, time.Now()); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
}
}
}

Expand Down Expand Up @@ -1925,6 +1967,11 @@ func TestClientSideCachingExecAbortDoMultiCache(t *testing.T) {
Expect("PTTL", "a2").
Expect("GET", "a2").
Expect("EXEC").
Expect("CLIENT", "CACHING", "YES").
Expect("MULTI").
Expect("PTTL", "a3").
Expect("GET", "a3").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
Expand All @@ -1937,26 +1984,41 @@ func TestClientSideCachingExecAbortDoMultiCache(t *testing.T) {
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
Reply(RedisMessage{typ: '_'}).
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
ReplyError("MOVED 0 127.0.0.1").
Reply(RedisMessage{typ: '_'})
}()

arr := p.DoMultiCache(context.Background(), []CacheableTTL{
CT(Cacheable(cmds.NewCompleted([]string{"GET", "a1"})), time.Second*10),
CT(Cacheable(cmds.NewCompleted([]string{"GET", "a2"})), time.Second*10),
CT(Cacheable(cmds.NewCompleted([]string{"GET", "a3"})), time.Second*10),
}...).s
for i, resp := range arr {
v, err := resp.ToMessage()
if i == 0 {
if v.integer != 1 {
t.Errorf("unexpected cached response, expected %v, got %v", 1, v.integer)
}
} else {
} else if i == 1 {
if err != ErrDoCacheAborted {
t.Errorf("unexpected err, got %v", err)
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
} else if i == 2 {
if re, ok := err.(*RedisError); !ok {
t.Errorf("unexpected err, got %v", err)
} else if _, moved := re.IsMoved(); !moved {
t.Errorf("unexpected err, got %v", err)
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
}
}
if v, entry := p.cache.Flight("a1", "GET", time.Second, time.Now()); v.integer != 1 {
Expand Down

0 comments on commit 8aa9298

Please sign in to comment.