Skip to content

Commit

Permalink
fix: ignore BUSY error to prevent out of order message with SUNSUBSCRIBE
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Jan 11, 2025
1 parent c90dc90 commit 992fc76
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 3 deletions.
5 changes: 3 additions & 2 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ var noHello = regexp.MustCompile("unknown command .?(HELLO|hello).?")
// See https://github.com/redis/rueidis/pull/691
func isUnsubReply(msg *RedisMessage) bool {
// ex. NOPERM User limiteduser has no permissions to run the 'ping' command
// ex. LOADING Redis is loading the dataset in memory
if msg.typ == '-' && (strings.HasPrefix(msg.string, "LOADING") || strings.Contains(msg.string, "'ping'")) {
// ex. LOADING server is loading the dataset in memory
// ex. BUSY
if msg.typ == '-' && (strings.HasPrefix(msg.string, "LOADING") || strings.HasPrefix(msg.string, "BUSY") || strings.Contains(msg.string, "'ping'")) {
msg.typ = '+'
msg.string = "PONG"
return true
Expand Down
89 changes: 88 additions & 1 deletion pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3392,7 +3392,94 @@ func TestPubSub(t *testing.T) {
Reply(replies[i]...).
Reply(RedisMessage{ // failed unsubReply
typ: '-',
string: "LOADING Redis is loading the dataset in memory",
string: "LOADING server is loading the dataset in memory",
}).Expect(cmd2.Commands()...).ReplyString(strconv.Itoa(i))
} else {
mock.Expect(cmd1.Commands()...).Reply(replies[i]...).Expect(cmd2.Commands()...).ReplyString(strconv.Itoa(i))
}
}()
if i == 2 {
if v, err := p.Do(ctx, cmd1).ToString(); err != nil || v != "mk" {
t.Fatalf("unexpected err %v", err)
}
} else {
if err := p.Do(ctx, cmd1).Error(); err != nil {
t.Fatalf("unexpected err %v", err)
}
}
if v, err := p.Do(ctx, cmd2).ToString(); err != nil || v != strconv.Itoa(i) {
t.Fatalf("unexpected val %v %v", v, err)
}
}
cancel()
})

t.Run("PubSub unsubReply failed because of error BUSY from server", func(t *testing.T) {
ctx := context.Background()
p, mock, cancel, _ := setup(t, ClientOption{})

commands := []Completed{
builder.Sunsubscribe().Channel("1").Build(),
builder.Sunsubscribe().Channel("2").Build(),
builder.Get().Key("mk").Build(),
}

replies := [][]RedisMessage{
{
{ // proactive unsubscribe before user unsubscribe
typ: '>',
values: []RedisMessage{
{typ: '+', string: "sunsubscribe"},
{typ: '+', string: "a"},
{typ: ':', integer: 0},
},
},
{ // proactive unsubscribe before user unsubscribe
typ: '>',
values: []RedisMessage{
{typ: '+', string: "sunsubscribe"},
{typ: '+', string: "b"},
{typ: ':', integer: 0},
},
},
}, {
// empty
}, {
{ // proactive unsubscribe after user unsubscribe
typ: '>',
values: []RedisMessage{
{typ: '+', string: "sunsubscribe"},
{typ: '_'},
{typ: ':', integer: 0},
},
},
{typ: '+', string: "mk"},
},
}

p.background()

// proactive unsubscribe before other commands
mock.Expect().Reply(RedisMessage{ // proactive unsubscribe before user unsubscribe
typ: '>',
values: []RedisMessage{
{typ: '+', string: "sunsubscribe"},
{typ: '+', string: "0"},
{typ: ':', integer: 0},
},
})

time.Sleep(time.Millisecond * 100)

for i, cmd1 := range commands {
cmd2 := builder.Get().Key(strconv.Itoa(i)).Build()
go func() {
if cmd1.IsUnsub() {
mock.Expect(cmd1.Commands()...).Expect(cmds.PingCmd.Commands()...).
Reply(replies[i]...).
Reply(RedisMessage{ // failed unsubReply
typ: '-',
string: "BUSY",
}).Expect(cmd2.Commands()...).ReplyString(strconv.Itoa(i))
} else {
mock.Expect(cmd1.Commands()...).Reply(replies[i]...).Expect(cmd2.Commands()...).ReplyString(strconv.Itoa(i))
Expand Down

0 comments on commit 992fc76

Please sign in to comment.