Skip to content

Commit

Permalink
remove old route cache if replica cn cannot connected (matrixorigin#1…
Browse files Browse the repository at this point in the history
…8971)

remove old route cache if replica cn cannot connected, and add retry for sharding read

Approved by: @iamlinjunhong
  • Loading branch information
zhangxu19830126 authored Sep 24, 2024
1 parent 5ba4382 commit 7a04342
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 106 deletions.
3 changes: 2 additions & 1 deletion pkg/shardservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,8 @@ func (s *service) maybeRemoveReadCache(
err error,
) {
if moerr.IsMoErrCode(err, moerr.ErrReplicaNotFound) ||
moerr.IsMoErrCode(err, moerr.ErrReplicaNotMatch) {
moerr.IsMoErrCode(err, moerr.ErrReplicaNotMatch) ||
moerr.IsMoErrCode(err, moerr.ErrBackendCannotConnect) {
s.removeReadCache(table)
}
}
Expand Down
227 changes: 122 additions & 105 deletions pkg/shardservice/service_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,134 +29,151 @@ func (s *service) Read(
req ReadRequest,
opts ReadOptions,
) error {
var cache *readCache
var err error
for {
cache, err = s.getShards(req.TableID)
if err != nil {
return err
}
if opts.shardID == 0 ||
cache.hasShard(req.TableID, opts.shardID) {
break
}
fn := func() (bool, error) {
var cache *readCache
var err error
for {
cache, err = s.getShards(req.TableID)
if err != nil {
return false, err
}
if opts.shardID == 0 ||
cache.hasShard(req.TableID, opts.shardID) {
break
}

// remove old read cache
s.removeReadCache(req.TableID)
// remove old read cache
s.removeReadCache(req.TableID)

// shards updated, create new allocated
s.createC <- req.TableID
// shards updated, create new allocated
s.createC <- req.TableID

// wait shard created
err = s.waitShardCreated(
ctx,
// wait shard created
err = s.waitShardCreated(
ctx,
req.TableID,
opts.shardID,
)
if err != nil {
return false, err
}
}

selected := newSlice()
defer selected.close()

cache.selectReplicas(
req.TableID,
opts.shardID,
func(
metadata pb.ShardsMetadata,
shard pb.TableShard,
replica pb.ShardReplica,
) bool {
if opts.filter(metadata, shard, replica) {
shard.Replicas = []pb.ShardReplica{replica}
selected.values = append(selected.values, shard)
}
return true
},
)
if err != nil {
return err
}
}

selected := newSlice()
defer selected.close()

cache.selectReplicas(
req.TableID,
func(
metadata pb.ShardsMetadata,
shard pb.TableShard,
replica pb.ShardReplica,
) bool {
if opts.filter(metadata, shard, replica) {
shard.Replicas = []pb.ShardReplica{replica}
selected.values = append(selected.values, shard)
}
return true
},
)
futures := newFutureSlice()
defer futures.close()

futures := newFutureSlice()
defer futures.close()
local := 0
remote := 0
for i, shard := range selected.values {
if s.isLocalReplica(shard.Replicas[0]) {
selected.local = append(selected.local, i)
local++
continue
}

local := 0
remote := 0
for i, shard := range selected.values {
if s.isLocalReplica(shard.Replicas[0]) {
selected.local = append(selected.local, i)
local++
continue
remote++
if opts.adjust != nil {
opts.adjust(&shard)
}
f, e := s.remote.client.AsyncSend(
ctx,
s.newReadRequest(
shard,
req.Method,
req.Param,
opts.readAt,
),
)
if e != nil {
s.maybeRemoveReadCache(req.TableID, e)
if i == 0 {
return true, e
}

err = errors.Join(err, e)
continue
}
futures.values = append(futures.values, f)
}

remote++
if opts.adjust != nil {
opts.adjust(&shard)
}
f, e := s.remote.client.AsyncSend(
ctx,
s.newReadRequest(
shard,
v2.ReplicaLocalReadCounter.Add(float64(local))
v2.ReplicaRemoteReadCounter.Add(float64(remote))

var buffer *morpc.Buffer
for _, i := range selected.local {
if opts.adjust != nil {
opts.adjust(&selected.values[i])
}

if buffer == nil {
buffer = morpc.NewBuffer()
defer buffer.Close()
}

v, e := s.doRead(
ctx,
selected.values[i],
opts.readAt,
req.Method,
req.Param,
opts.readAt,
),
)
if e != nil {
buffer,
)
if e == nil {
req.Apply(v)
continue
}
s.maybeRemoveReadCache(req.TableID, e)
err = errors.Join(err, e)
continue
}
futures.values = append(futures.values, f)
}

v2.ReplicaLocalReadCounter.Add(float64(local))
v2.ReplicaRemoteReadCounter.Add(float64(remote))

var buffer *morpc.Buffer
for _, i := range selected.local {
if opts.adjust != nil {
opts.adjust(&selected.values[i])
}

if buffer == nil {
buffer = morpc.NewBuffer()
defer buffer.Close()
}
var resp *pb.Response
for _, f := range futures.values {
v, e := f.Get()
if e == nil {
resp = v.(*pb.Response)
resp, e = s.unwrapError(resp, e)
}
if e == nil {
req.Apply(resp.ShardRead.Payload)
s.remote.pool.ReleaseResponse(resp)
} else {
s.maybeRemoveReadCache(req.TableID, e)
err = errors.Join(err, e)
}

v, e := s.doRead(
ctx,
selected.values[i],
opts.readAt,
req.Method,
req.Param,
buffer,
)
if e == nil {
req.Apply(v)
continue
f.Close()
}
s.maybeRemoveReadCache(req.TableID, e)
err = errors.Join(err, e)
continue
return false, err
}

var resp *pb.Response
for _, f := range futures.values {
v, e := f.Get()
if e == nil {
resp = v.(*pb.Response)
resp, e = s.unwrapError(resp, e)
for {
canRetry, err := fn()
if err == nil {
return nil
}
if e == nil {
req.Apply(resp.ShardRead.Payload)
s.remote.pool.ReleaseResponse(resp)
} else {
s.maybeRemoveReadCache(req.TableID, e)
err = errors.Join(err, e)
if !canRetry {
return err
}

f.Close()
}
return err
}

func (s *service) doRead(
Expand Down
69 changes: 69 additions & 0 deletions pkg/shardservice/service_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/shard"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -295,6 +296,70 @@ func TestReadWithStaleReplica(t *testing.T) {
)
}

func TestReadWithRemovedReplica(t *testing.T) {
runServicesTest(
t,
"cn1,cn2",
func(
ctx context.Context,
server *server,
services []*service,
) {
s1 := services[0]
store1 := s1.storage.(*MemShardStorage)
s2 := services[1]
store2 := s2.storage.(*MemShardStorage)

table := uint64(1)
shards := uint32(2)
mustAddTestShards(t, ctx, s1, table, shards, 1, s2)
waitReplicaCount(table, s1, 1)
waitReplicaCount(table, s2, 1)

s2Shard := uint64(0)
for _, s := range s2.getAllocatedShards() {
if s.TableID == table {
s2Shard = s.ShardID
break
}
panic("missing s2 shard")
}

k := []byte("k")
v := []byte("v")

store1.set(k, v, newTestTimestamp(1))
store2.set(k, v, newTestTimestamp(1))
store1.waiter.NotifyLatestCommitTS(newTestTimestamp(3))
store2.waiter.NotifyLatestCommitTS(newTestTimestamp(3))

fn := func() error {
return unwrapError(
s1.Read(
ctx,
ReadRequest{
TableID: table,
Param: shard.ReadParam{KeyParam: shard.KeyParam{Key: k}},
Apply: func(b []byte) {
},
},
DefaultOptions.ReadAt(newTestTimestamp(2)).Shard(s2Shard),
),
)
}
require.NoError(t, fn())

s2.options.disableHeartbeat.Store(true)
server.env.UpdateState("cn2", metadata.WorkState_Drained)
waitReplicaCount(table, s1, 2)

// cannot connect to cn2, and cache refreshed, and retry
require.NoError(t, fn())
},
nil,
)
}

func TestReadWithLazyCreateShards(t *testing.T) {
runServicesTest(
t,
Expand Down Expand Up @@ -430,6 +495,10 @@ type joinError interface {
func unwrapError(
err error,
) error {
if err == nil {
return nil
}

if e, ok := err.(joinError); ok {
return e.Unwrap()[0]
}
Expand Down

0 comments on commit 7a04342

Please sign in to comment.