diff --git a/README.md b/README.md index ff314173..a0041eab 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ A Fast Golang Redis client that does auto pipelining and supports client side ca * Auto pipeline for non-blocking redis commands * Connection pooling for blocking redis commands * Opt-in client side caching in RESP3 -* Pub/Sub, Redis 7 Sharded Pub/Sub in RESP3 +* Pub/Sub, Redis 7 Sharded Pub/Sub * Redis Cluster, Sentinel, Streams, TLS, RedisJSON, RedisBloom, RediSearch, RedisGraph, RedisTimeseries, RedisAI, RedisGears * IDE friendly redis command builder * Generic Hash/RedisJSON Object Mapping with client side caching and optimistic locking @@ -27,8 +27,6 @@ A Fast Golang Redis client that does auto pipelining and supports client side ca Rueidis is built around RESP2 and RESP3 protocol, and supports almost all redis features. However, the following features has not yet been implemented in RESP2 mode: -* PubSub only works in RESP3 and Redis >= 6.0 -* Redis Sentinel only works in RESP3 and Redis >= 6.0 * Client side caching only works in RESP3 and Redis >= 6.0 ## Getting Started @@ -636,9 +634,3 @@ client.Do(ctx, client.B().Lindex().Key("k").Index(0).Build()).ToString() client.Do(ctx, client.B().Lpop().Key("k").Build()).ToString() client.Do(ctx, client.B().Lpop().Key("k").Count(2).Build()).AsStrSlice() ``` - -## Not Yet Implement - -The following subjects are not yet implemented. -* PubSub in RESP2 -* Sentinel in RESP2 diff --git a/docker-compose.yml b/docker-compose.yml index 492b761a..630b64e5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,6 +37,18 @@ services: ports: - "6380:6380" - "26379:26379" + sentinel5: + image: redis:5-alpine + entrypoint: + - /bin/sh + - -c + - | + redis-server --save "" --appendonly no --port 6385 & + echo "sentinel monitor test5 127.0.0.1 6385 2\n" > sentinel.conf + redis-server sentinel.conf --sentinel + ports: + - "6385:6385" + - "26355:26379" cluster: image: redis:7.0.5-alpine entrypoint: diff --git a/dockertest.sh b/dockertest.sh index 24c64c98..1b1915fb 100755 --- a/dockertest.sh +++ b/dockertest.sh @@ -3,6 +3,6 @@ set -ev docker-compose up -d -go test -coverprofile=./c.out -v -race -timeout 20m ./... +go test -coverprofile=./c.out -v -race -timeout 30m ./... cp c.out coverage.txt docker-compose down -v \ No newline at end of file diff --git a/mux.go b/mux.go index c79e07d5..f2aa8cb2 100644 --- a/mux.go +++ b/mux.go @@ -53,10 +53,9 @@ type mux struct { func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux { dead := deadFn() return newMux(dst, option, (*pipe)(nil), dead, func() (w wire) { - conn, err := dialFn(dst, option) - if err == nil { - w, err = newPipe(conn, option) - } + w, err := newPipe(func() (net.Conn, error) { + return dialFn(dst, option) + }, option) if err != nil { dead.error.Store(&errs{error: err}) w = dead diff --git a/pipe.go b/pipe.go index d9795a00..884d9260 100644 --- a/pipe.go +++ b/pipe.go @@ -60,9 +60,22 @@ type pipe struct { error atomic.Value onInvalidations func([]RedisMessage) + + r2psFn func() (p *pipe, err error) + r2mu sync.Mutex + r2pipe *pipe + r2ps bool +} + +func newPipe(connFn func() (net.Conn, error), option *ClientOption) (p *pipe, err error) { + return _newPipe(connFn, option, false) } -func newPipe(conn net.Conn, option *ClientOption) (p *pipe, err error) { +func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool) (p *pipe, err error) { + conn, err := connFn() + if err != nil { + return nil, err + } p = &pipe{ conn: conn, queue: newRing(option.RingScaleEachConn), @@ -75,6 +88,13 @@ func newPipe(conn net.Conn, option *ClientOption) (p *pipe, err error) { timeout: option.ConnWriteTimeout, pinggap: option.Dialer.KeepAlive, + + r2ps: r2ps, + } + if !r2ps { + p.r2psFn = func() (p *pipe, err error) { + return _newPipe(connFn, option, true) + } } if !option.DisableCache { p.cache = newLRU(option.CacheSizeEachConn) @@ -112,29 +132,31 @@ func newPipe(conn net.Conn, option *ClientOption) (p *pipe, err error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - var resp2 bool - for i, r := range p.DoMulti(ctx, cmds.NewMultiCompleted(init)...) { - if i == 0 { - p.info, err = r.ToMap() - } else { - err = r.Error() - } - if err != nil { - if re, ok := err.(*RedisError); ok { - if !resp2 && noHello.MatchString(re.string) { - resp2 = true - continue - } else if strings.Contains(re.string, "wrong number of arguments for 'TRACKING'") { - err = fmt.Errorf("%s: %w", re.string, ErrNoCache) - } else if resp2 { - continue + var r2 bool + if !r2ps { + for i, r := range p.DoMulti(ctx, cmds.NewMultiCompleted(init)...) { + if i == 0 { + p.info, err = r.ToMap() + } else { + err = r.Error() + } + if err != nil { + if re, ok := err.(*RedisError); ok { + if !r2 && noHello.MatchString(re.string) { + r2 = true + continue + } else if strings.Contains(re.string, "wrong number of arguments for 'TRACKING'") { + err = fmt.Errorf("%s: %w", re.string, ErrNoCache) + } else if r2 { + continue + } } + p.Close() + return nil, err } - p.Close() - return nil, err } } - if !resp2 { + if !r2 && !r2ps { if ver, ok := p.info["version"]; ok { if v := strings.Split(ver.string, "."); len(v) != 0 { vv, _ := strconv.ParseInt(v[0], 10, 32) @@ -300,6 +322,7 @@ func (p *pipe) _backgroundRead() (err error) { skip int // skip rest push messages ver = p.version pr bool // push reply + r2ps = p.r2ps ) defer func() { @@ -316,7 +339,7 @@ func (p *pipe) _backgroundRead() (err error) { if msg, err = readNextMessage(p.r); err != nil { return } - if msg.typ == '>' { + if msg.typ == '>' || (r2ps && len(msg.values) != 0 && msg.values[0].string != "pong") { if pr = p.handlePush(msg.values); !pr { continue } @@ -325,7 +348,7 @@ func (p *pipe) _backgroundRead() (err error) { pr = false continue } - } else if ver < 7 && len(msg.values) != 0 { + } else if ver >= 6 && ver < 7 && len(msg.values) != 0 { // This is a workaround for Redis 6's broken invalidation protocol: https://github.com/redis/redis/issues/8935 // When Redis 6 handles MULTI, MGET, or other multi-keys command, // it will send invalidation message immediately if it finds the keys are expired, thus causing the multi-keys command response to be broken. @@ -516,10 +539,29 @@ func (p *pipe) handlePush(values []RedisMessage) (reply bool) { } return false } +func (p *pipe) _r2pipe() (r2p *pipe) { + p.r2mu.Lock() + if p.r2pipe != nil { + r2p = p.r2pipe + } else { + var err error + if r2p, err = p.r2psFn(); err != nil { + r2p = epipeFn(err) + } else { + p.r2pipe = r2p + } + } + p.r2mu.Unlock() + return r2p +} func (p *pipe) Receive(ctx context.Context, subscribe cmds.Completed, fn func(message PubSubMessage)) error { if p.nsubs == nil || p.psubs == nil || p.ssubs == nil { - return ErrClosing + return p.Error() + } + + if p.version < 6 && p.r2psFn != nil { + return p._r2pipe().Receive(ctx, subscribe, fn) } var sb *subs @@ -535,9 +577,6 @@ func (p *pipe) Receive(ctx context.Context, subscribe cmds.Completed, fn func(me default: panic(wrongreceive) } - if p.version < 6 { - return ErrRESP2PubSub - } if ch, cancel := sb.Subscribe(args); ch != nil { defer cancel() @@ -575,6 +614,9 @@ func (p *pipe) CleanSubscriptions() { } func (p *pipe) SetPubSubHooks(hooks PubSubHooks) <-chan error { + if p.version < 6 && p.r2psFn != nil { + return p._r2pipe().SetPubSubHooks(hooks) + } if hooks.isZero() { if old := p.pshks.Swap(emptypshks).(*pshks); old.close != nil { close(old.close) @@ -622,6 +664,12 @@ func (p *pipe) Do(ctx context.Context, cmd cmds.Completed) (resp RedisResult) { }() } + if cmd.NoReply() { + if p.version < 6 && p.r2psFn != nil { + return p._r2pipe().Do(ctx, cmd) + } + } + waits := atomic.AddInt32(&p.waits, 1) // if this is 1, and background worker is not started, no need to queue state := atomic.LoadInt32(&p.state) @@ -634,10 +682,6 @@ func (p *pipe) Do(ctx context.Context, cmd cmds.Completed) (resp RedisResult) { goto queue } if cmd.NoReply() { - if p.version < 6 { - atomic.AddInt32(&p.waits, -1) - return newErrResult(ErrRESP2PubSub) - } p.background() goto queue } @@ -685,19 +729,23 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...cmds.Completed) []RedisResu } isOptIn := multi[0].IsOptIn() // len(multi) > 0 should have already been checked by upper layer - noReply := false + noReply := 0 isBlock := false for _, cmd := range multi { if cmd.NoReply() { - if p.version < 6 { - for i := 0; i < len(resp); i++ { - resp[i] = newErrResult(ErrRESP2PubSub) - } - return resp + noReply++ + } + } + + if p.version < 6 && noReply != 0 { + if noReply != len(multi) { + for i := 0; i < len(resp); i++ { + resp[i] = newErrResult(ErrRESP2PubSubMixed) } - noReply = true - break + return resp + } else if p.r2psFn != nil { + return p._r2pipe().DoMulti(ctx, multi...) } } @@ -731,7 +779,7 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...cmds.Completed) []RedisResu if waits != 1 { goto queue } - if isOptIn || noReply { + if isOptIn || noReply != 0 { p.background() goto queue } @@ -1104,6 +1152,11 @@ func (p *pipe) Close() { if p.conn != nil { p.conn.Close() } + p.r2mu.Lock() + if p.r2pipe != nil { + p.r2pipe.Close() + } + p.r2mu.Unlock() } type pshks struct { @@ -1126,6 +1179,13 @@ func deadFn() *pipe { return dead } +func epipeFn(err error) *pipe { + dead := &pipe{state: 3} + dead.error.Store(&errs{error: err}) + dead.pshks.Store(emptypshks) + return dead +} + const ( protocolbug = "protocol bug, message handled out of order" wrongreceive = "only SUBSCRIBE, SSUBSCRIBE, or PSUBSCRIBE command are allowed in Receive" diff --git a/pipe_test.go b/pipe_test.go index 64f7de87..c8026bd9 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -144,7 +144,7 @@ func setup(t *testing.T, option ClientOption) (*pipe, *redisMock, func(), func() ReplyString("OK") } }() - p, err := newPipe(n1, &option) + p, err := newPipe(func() (net.Conn, error) { return n1, nil }, &option) if err != nil { t.Fatalf("pipe setup failed: %v", err) } @@ -186,7 +186,7 @@ func TestNewPipe(t *testing.T) { mock.Expect("SELECT", "1"). ReplyString("OK") }() - p, err := newPipe(n1, &ClientOption{ + p, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{ SelectDB: 1, Password: "pa", ClientName: "cn", @@ -214,7 +214,7 @@ func TestNewPipe(t *testing.T) { mock.Expect("SELECT", "1"). ReplyString("OK") }() - p, err := newPipe(n1, &ClientOption{ + p, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{ SelectDB: 1, Username: "ua", Password: "pa", @@ -241,7 +241,7 @@ func TestNewPipe(t *testing.T) { mock.Expect("CLIENT", "TRACKING", "ON", "OPTIN", "NOLOOP"). ReplyString("OK") }() - p, err := newPipe(n1, &ClientOption{ + p, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{ ClientTrackingOptions: []string{"OPTIN", "NOLOOP"}, }) if err != nil { @@ -257,7 +257,7 @@ func TestNewPipe(t *testing.T) { n1, n2 := net.Pipe() n1.Close() n2.Close() - if _, err := newPipe(n1, &ClientOption{}); err != io.ErrClosedPipe { + if _, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{}); err != io.ErrClosedPipe { t.Fatalf("pipe setup should failed with io.ErrClosedPipe, but got %v", err) } }) @@ -274,7 +274,7 @@ func TestNewRESP2Pipe(t *testing.T) { ReplyError("ERR unknown subcommand or wrong number of arguments for 'TRACKING'") mock.Expect("QUIT").ReplyString("OK") }() - if _, err := newPipe(n1, &ClientOption{}); !errors.Is(err, ErrNoCache) { + if _, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{}); !errors.Is(err, ErrNoCache) { t.Fatalf("unexpected err: %v", err) } mock.Close() @@ -291,7 +291,7 @@ func TestNewRESP2Pipe(t *testing.T) { ReplyString("OK") mock.Expect("QUIT").ReplyString("OK") }() - if _, err := newPipe(n1, &ClientOption{}); !errors.Is(err, ErrNoCache) { + if _, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{}); !errors.Is(err, ErrNoCache) { t.Fatalf("unexpected err: %v", err) } mock.Close() @@ -313,7 +313,7 @@ func TestNewRESP2Pipe(t *testing.T) { mock.Expect("SELECT", "1"). ReplyString("OK") }() - p, err := newPipe(n1, &ClientOption{ + p, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{ SelectDB: 1, Password: "pa", ClientName: "cn", @@ -346,7 +346,7 @@ func TestNewRESP2Pipe(t *testing.T) { mock.Expect("SELECT", "1"). ReplyString("OK") }() - p, err := newPipe(n1, &ClientOption{ + p, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{ SelectDB: 1, Username: "ua", Password: "pa", @@ -376,7 +376,7 @@ func TestNewRESP2Pipe(t *testing.T) { n1.Close() n2.Close() }() - _, err := newPipe(n1, &ClientOption{ + _, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{ SelectDB: 1, Username: "ua", Password: "pa", @@ -2136,7 +2136,7 @@ func TestPubSub(t *testing.T) { } }) - t.Run("RESP2 no subscribe", func(t *testing.T) { + t.Run("RESP2 pubsub mixed", func(t *testing.T) { p, _, cancel, _ := setup(t, ClientOption{}) p.version = 5 defer cancel() @@ -2144,25 +2144,34 @@ func TestPubSub(t *testing.T) { commands := []cmds.Completed{ builder.Subscribe().Channel("a").Build(), builder.Psubscribe().Pattern("b").Build(), - builder.Ssubscribe().Channel("c").Build(), + builder.Get().Key("c").Build(), } - - for _, c := range commands { - if e := p.Do(context.Background(), c).Error(); e != ErrRESP2PubSub { + for _, resp := range p.DoMulti(context.Background(), commands...) { + if e := resp.Error(); e != ErrRESP2PubSubMixed { t.Fatalf("unexpected err %v", e) } } + }) - for _, c := range commands { - if e := p.DoMulti(context.Background(), c)[0].Error(); e != ErrRESP2PubSub { - t.Fatalf("unexpected err %v", e) - } + t.Run("RESP2 pubsub connect error", func(t *testing.T) { + p, _, cancel, _ := setup(t, ClientOption{}) + p.version = 5 + e := errors.New("any") + p.r2psFn = func() (p *pipe, err error) { + return nil, e } + defer cancel() - for _, c := range commands { - if e := p.Receive(context.Background(), c, func(message PubSubMessage) {}); e != ErrRESP2PubSub { - t.Fatalf("unexpected err %v", e) - } + if err := p.Receive(context.Background(), builder.Subscribe().Channel("a").Build(), nil); err != e { + t.Fatalf("unexpected err %v", err) + } + + if err := p.Do(context.Background(), builder.Subscribe().Channel("a").Build()).Error(); err != e { + t.Fatalf("unexpected err %v", err) + } + + if err := p.DoMulti(context.Background(), builder.Subscribe().Channel("a").Build())[0].Error(); err != e { + t.Fatalf("unexpected err %v", err) } }) } @@ -3063,3 +3072,26 @@ func TestDeadPipe(t *testing.T) { t.Fatalf("unexpected err %v", err) } } + +func TestErrorPipe(t *testing.T) { + ctx := context.Background() + target := errors.New("any") + if err := epipeFn(target).Error(); err != target { + t.Fatalf("unexpected err %v", err) + } + if err := epipeFn(target).Do(ctx, cmds.NewCompleted(nil)).Error(); err != target { + t.Fatalf("unexpected err %v", err) + } + if err := epipeFn(target).DoMulti(ctx, cmds.NewCompleted(nil))[0].Error(); err != target { + t.Fatalf("unexpected err %v", err) + } + if err := epipeFn(target).DoCache(ctx, cmds.Cacheable(cmds.NewCompleted(nil)), time.Second).Error(); err != target { + t.Fatalf("unexpected err %v", err) + } + if err := epipeFn(target).Receive(ctx, cmds.NewCompleted(nil), func(message PubSubMessage) {}); err != target { + t.Fatalf("unexpected err %v", err) + } + if err := <-epipeFn(target).SetPubSubHooks(PubSubHooks{OnMessage: func(m PubSubMessage) {}}); err != target { + t.Fatalf("unexpected err %v", err) + } +} diff --git a/redis_test.go b/redis_test.go index 1708f096..8832e49e 100644 --- a/redis_test.go +++ b/redis_test.go @@ -395,7 +395,7 @@ func testBlockingXREAD(t *testing.T, client Client) { } func testPubSub(t *testing.T, client Client) { - msgs := 10000 + msgs := 5000 mmap := make(map[string]struct{}) for i := 0; i < msgs; i++ { mmap[strconv.Itoa(i)] = struct{}{} @@ -406,22 +406,27 @@ func testPubSub(t *testing.T, client Client) { ctx := context.Background() messages := make(chan string, 10) + + wg := sync.WaitGroup{} + wg.Add(2) go func() { err := client.Receive(ctx, client.B().Subscribe().Channel("ch1").Build(), func(msg PubSubMessage) { messages <- msg.Message }) - if err != ErrClosing { + if err != nil { t.Errorf("unexpected subscribe response %v", err) } + wg.Done() }() go func() { err := client.Receive(ctx, client.B().Psubscribe().Pattern("pat*").Build(), func(msg PubSubMessage) { messages <- msg.Message }) - if err != ErrClosing { + if err != nil { t.Errorf("unexpected subscribe response %v", err) } + wg.Done() }() go func() { @@ -447,6 +452,39 @@ func testPubSub(t *testing.T, client Client) { close(messages) } } + + for _, c := range client.Nodes() { + for _, resp := range c.DoMulti(context.Background(), + client.B().Unsubscribe().Channel("ch1").Build(), + client.B().Punsubscribe().Pattern("pat*").Build()) { + if err := resp.Error(); err != nil { + t.Fatal(err) + } + } + } + wg.Wait() + + t.Logf("testing pubsub hooks with 500 messages\n") + + for i := 0; i < 500; i++ { + cc, cancel := client.Dedicate() + msg := strconv.Itoa(i) + ch := cc.SetPubSubHooks(PubSubHooks{ + OnMessage: func(m PubSubMessage) { + cc.SetPubSubHooks(PubSubHooks{}) + }, + }) + if err := cc.Do(context.Background(), client.B().Subscribe().Channel("ch2").Build()).Error(); err != nil { + t.Fatal(err) + } + if err := client.Do(context.Background(), client.B().Publish().Channel("ch2").Message(msg).Build()).Error(); err != nil { + t.Fatal(err) + } + if err := <-ch; err != nil { + t.Fatal(err) + } + cancel() + } } func run(t *testing.T, client Client, cases ...func(*testing.T, Client)) { @@ -522,7 +560,7 @@ func TestSingleClient5Integration(t *testing.T) { t.Fatal(err) } - run(t, client, testSETGETRESP2, testMultiSETGETRESP2, testBlockingZPOP, testBlockingXREAD) + run(t, client, testSETGETRESP2, testMultiSETGETRESP2, testBlockingZPOP, testBlockingXREAD, testPubSub) client.Close() time.Sleep(time.Second * 5) // wait background ping exit @@ -538,7 +576,26 @@ func TestCluster5ClientIntegration(t *testing.T) { if err != nil { t.Fatal(err) } - run(t, client, testSETGETRESP2, testMultiSETGETRESP2, testBlockingZPOP, testBlockingXREAD) + run(t, client, testSETGETRESP2, testMultiSETGETRESP2, testBlockingZPOP, testBlockingXREAD, testPubSub) + + client.Close() + time.Sleep(time.Second * 5) // wait background ping exit +} + +func TestSentinel5ClientIntegration(t *testing.T) { + client, err := NewClient(ClientOption{ + InitAddress: []string{"127.0.0.1:26355"}, + ConnWriteTimeout: 180 * time.Second, + DisableCache: true, + Sentinel: SentinelOption{ + MasterSet: "test5", + }, + }) + if err != nil { + t.Fatal(err) + } + + run(t, client, testSETGETRESP2, testMultiSETGETRESP2, testBlockingZPOP, testBlockingXREAD, testPubSub) client.Close() time.Sleep(time.Second * 5) // wait background ping exit diff --git a/rueidis.go b/rueidis.go index c1c9517c..7a099b24 100644 --- a/rueidis.go +++ b/rueidis.go @@ -37,8 +37,8 @@ var ( ErrNoAddr = errors.New("no alive address in InitAddress") // ErrNoCache means your redis does not support client-side caching and must set ClientOption.DisableCache to true ErrNoCache = errors.New("ClientOption.DisableCache must be true for redis not supporting client-side caching or not supporting RESP3") - // ErrRESP2PubSub means your redis does not support RESP3 and rueidis can't handle SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE in this case - ErrRESP2PubSub = errors.New("rueidis does not support SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE in RESP2") + // ErrRESP2PubSubMixed means your redis does not support RESP3 and rueidis can't handle SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE in mixed case + ErrRESP2PubSubMixed = errors.New("rueidis does not support SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE mixed with other commands in RESP2") ) // ClientOption should be passed to NewClient to construct a Client