Skip to content

Commit

Permalink
fix: ignore LOADING error to prevent out of order message with SUNSUB…
Browse files Browse the repository at this point in the history
…SCRIBE (#719)
  • Loading branch information
dntam00 authored Jan 10, 2025
1 parent 8897ec3 commit 5784e2f
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 2 deletions.
3 changes: 2 additions & 1 deletion pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ var noHello = regexp.MustCompile("unknown command .?(HELLO|hello).?")
// See https://github.com/redis/rueidis/pull/691
func isUnsubReply(msg *RedisMessage) bool {
// ex. NOPERM User limiteduser has no permissions to run the 'ping' command
if msg.typ == '-' && strings.Contains(msg.string, "'ping'") {
// ex. LOADING Redis is loading the dataset in memory
if msg.typ == '-' && (strings.HasPrefix(msg.string, "LOADING") || strings.Contains(msg.string, "'ping'")) {
msg.typ = '+'
msg.string = "PONG"
return true
Expand Down
89 changes: 88 additions & 1 deletion pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3240,7 +3240,7 @@ func TestPubSub(t *testing.T) {
}
})

t.Run("PubSub unsubReply failed", func(t *testing.T) {
t.Run("PubSub unsubReply failed because of NOPERM error from server", func(t *testing.T) {
ctx := context.Background()
p, mock, cancel, _ := setup(t, ClientOption{})

Expand Down Expand Up @@ -3327,6 +3327,93 @@ func TestPubSub(t *testing.T) {
cancel()
})

t.Run("PubSub unsubReply failed because of error LOADING from server", func(t *testing.T) {
ctx := context.Background()
p, mock, cancel, _ := setup(t, ClientOption{})

commands := []Completed{
builder.Sunsubscribe().Channel("1").Build(),
builder.Sunsubscribe().Channel("2").Build(),
builder.Get().Key("mk").Build(),
}

replies := [][]RedisMessage{
{
{ // proactive unsubscribe before user unsubscribe
typ: '>',
values: []RedisMessage{
{typ: '+', string: "sunsubscribe"},
{typ: '+', string: "a"},
{typ: ':', integer: 0},
},
},
{ // proactive unsubscribe before user unsubscribe
typ: '>',
values: []RedisMessage{
{typ: '+', string: "sunsubscribe"},
{typ: '+', string: "b"},
{typ: ':', integer: 0},
},
},
}, {
// empty
}, {
{ // proactive unsubscribe after user unsubscribe
typ: '>',
values: []RedisMessage{
{typ: '+', string: "sunsubscribe"},
{typ: '_'},
{typ: ':', integer: 0},
},
},
{typ: '+', string: "mk"},
},
}

p.background()

// proactive unsubscribe before other commands
mock.Expect().Reply(RedisMessage{ // proactive unsubscribe before user unsubscribe
typ: '>',
values: []RedisMessage{
{typ: '+', string: "sunsubscribe"},
{typ: '+', string: "0"},
{typ: ':', integer: 0},
},
})

time.Sleep(time.Millisecond * 100)

for i, cmd1 := range commands {
cmd2 := builder.Get().Key(strconv.Itoa(i)).Build()
go func() {
if cmd1.IsUnsub() {
mock.Expect(cmd1.Commands()...).Expect(cmds.PingCmd.Commands()...).
Reply(replies[i]...).
Reply(RedisMessage{ // failed unsubReply
typ: '-',
string: "LOADING Redis is loading the dataset in memory",
}).Expect(cmd2.Commands()...).ReplyString(strconv.Itoa(i))
} else {
mock.Expect(cmd1.Commands()...).Reply(replies[i]...).Expect(cmd2.Commands()...).ReplyString(strconv.Itoa(i))
}
}()
if i == 2 {
if v, err := p.Do(ctx, cmd1).ToString(); err != nil || v != "mk" {
t.Fatalf("unexpected err %v", err)
}
} else {
if err := p.Do(ctx, cmd1).Error(); err != nil {
t.Fatalf("unexpected err %v", err)
}
}
if v, err := p.Do(ctx, cmd2).ToString(); err != nil || v != strconv.Itoa(i) {
t.Fatalf("unexpected val %v %v", v, err)
}
}
cancel()
})

t.Run("PubSub Unexpected Subscribe", func(t *testing.T) {
shouldPanic := func(push string) (pass bool) {
defer func() { pass = recover() == protocolbug }()
Expand Down

0 comments on commit 5784e2f

Please sign in to comment.