From 4caa8d55c6ada0e795c974c5b9dddc75c1ea88d2 Mon Sep 17 00:00:00 2001 From: Brendan Dougherty Date: Mon, 8 Apr 2024 10:59:27 -0700 Subject: [PATCH] discovery: Fix tablets removed from healthcheck when topo server GetTablet call fails (#15633) Signed-off-by: Brendan Dougherty --- go/vt/discovery/topology_watcher.go | 14 ++++ go/vt/discovery/topology_watcher_test.go | 82 ++++++++++++++++++++++++ go/vt/topo/keyspace_external_test.go | 2 +- go/vt/topo/memorytopo/directory.go | 3 + go/vt/topo/memorytopo/election.go | 4 ++ go/vt/topo/memorytopo/file.go | 16 ++++- go/vt/topo/memorytopo/lock.go | 14 ++++ go/vt/topo/memorytopo/memorytopo.go | 57 +++++++++++++--- go/vt/topo/memorytopo/watch.go | 6 ++ go/vt/topo/tablet.go | 12 +++- go/vt/topo/tablet_test.go | 57 +++++++++++++++- 11 files changed, 252 insertions(+), 15 deletions(-) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 3945268f62e..0b69ecb6a63 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -143,6 +143,7 @@ func (tw *TopologyWatcher) Stop() { func (tw *TopologyWatcher) loadTablets() { newTablets := make(map[string]*tabletInfo) + var partialResult bool // First get the list of all tablets. tabletInfos, err := tw.getTablets() @@ -152,6 +153,7 @@ func (tw *TopologyWatcher) loadTablets() { // If we get a partial result error, we just log it and process the tablets that we did manage to fetch. if topo.IsErrType(err, topo.PartialResult) { log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err) + partialResult = true } else { // For all other errors, just return. log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err) return @@ -183,6 +185,18 @@ func (tw *TopologyWatcher) loadTablets() { } } + if partialResult { + // We don't want to remove any tablets from the tablets map or the healthcheck if we got a partial result + // because we don't know if they were actually deleted or if we simply failed to fetch them. + // Fill any gaps in the newTablets map using the existing tablets. + for alias, val := range tw.tablets { + if _, ok := newTablets[alias]; !ok { + tabletAliasStrs = append(tabletAliasStrs, alias) + newTablets[alias] = val + } + } + } + for alias, newVal := range newTablets { if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) { continue diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 0a7c96358a2..95c6e44ec43 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -18,6 +18,7 @@ package discovery import ( "context" + "errors" "math/rand/v2" "testing" "time" @@ -576,3 +577,84 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { tw.Stop() } + +func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + ts, factory := memorytopo.NewServerAndFactory(ctx, "aa") + defer ts.Close() + fhc := NewFakeHealthCheck(nil) + defer fhc.Close() + topologyWatcherOperations.ZeroAll() + counts := topologyWatcherOperations.Counts() + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5) + defer tw.Stop() + + // Force fallback to getting tablets individually. + factory.AddOperationError(memorytopo.List, ".*", topo.NewError(topo.NoImplementation, "List not supported")) + + counts = checkOpCounts(t, counts, map[string]int64{}) + checkChecksum(t, tw, 0) + + // Add a tablet to the topology. + tablet1 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 0, + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": 123, + }, + Keyspace: "keyspace", + Shard: "shard", + } + require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias) + + tw.loadTablets() + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1}) + checkChecksum(t, tw, 3238442862) + + // Check the tablet is returned by GetAllTablets(). + allTablets := fhc.GetAllTablets() + key1 := TabletToMapKey(tablet1) + assert.Len(t, allTablets, 1) + assert.Contains(t, allTablets, key1) + assert.True(t, proto.Equal(tablet1, allTablets[key1])) + + // Add a second tablet to the topology. + tablet2 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 2, + }, + Hostname: "host2", + PortMap: map[string]int32{ + "vt": 789, + }, + Keyspace: "keyspace", + Shard: "shard", + } + require.NoError(t, ts.CreateTablet(ctx, tablet2), "CreateTablet failed for %v", tablet2.Alias) + + // Cause the Get for the first tablet to fail. + factory.AddOperationError(memorytopo.Get, "tablets/aa-0000000000/Tablet", errors.New("fake error")) + + // Ensure that a topo Get error results in a partial results error. If not, the rest of this test is invalid. + _, err := ts.GetTabletsByCell(ctx, "aa", &topo.GetTabletsByCellOptions{}) + require.ErrorContains(t, err, "partial result") + + // Now force the error during loadTablets. + tw.loadTablets() + checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1}) + checkChecksum(t, tw, 2762153755) + + // Ensure the first tablet is still returned by GetAllTablets() and the second tablet has been added. + allTablets = fhc.GetAllTablets() + key2 := TabletToMapKey(tablet2) + assert.Len(t, allTablets, 2) + assert.Contains(t, allTablets, key1) + assert.Contains(t, allTablets, key2) + assert.True(t, proto.Equal(tablet1, allTablets[key1])) + assert.True(t, proto.Equal(tablet2, allTablets[key2])) +} diff --git a/go/vt/topo/keyspace_external_test.go b/go/vt/topo/keyspace_external_test.go index 38ff1c8ce7b..4edb45a411d 100644 --- a/go/vt/topo/keyspace_external_test.go +++ b/go/vt/topo/keyspace_external_test.go @@ -142,7 +142,7 @@ func TestServerGetServingShards(t *testing.T) { require.NotNil(t, stats) if tt.fallback { - factory.SetListError(errNoListImpl) + factory.AddOperationError(memorytopo.List, ".*", errNoListImpl) } err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}) diff --git a/go/vt/topo/memorytopo/directory.go b/go/vt/topo/memorytopo/directory.go index 8e673f474a6..b8fa11a9d52 100644 --- a/go/vt/topo/memorytopo/directory.go +++ b/go/vt/topo/memorytopo/directory.go @@ -39,6 +39,9 @@ func (c *Conn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.D if c.factory.err != nil { return nil, c.factory.err } + if err := c.factory.getOperationError(ListDir, dirPath); err != nil { + return nil, err + } isRoot := false if dirPath == "" || dirPath == "/" { diff --git a/go/vt/topo/memorytopo/election.go b/go/vt/topo/memorytopo/election.go index 0a76c202de2..1b6d2292f5c 100644 --- a/go/vt/topo/memorytopo/election.go +++ b/go/vt/topo/memorytopo/election.go @@ -35,6 +35,10 @@ func (c *Conn) NewLeaderParticipation(name, id string) (topo.LeaderParticipation c.factory.mu.Lock() defer c.factory.mu.Unlock() + if err := c.factory.getOperationError(NewLeaderParticipation, id); err != nil { + return nil, err + } + // Make sure the global path exists. electionPath := path.Join(electionsPath, name) if n := c.factory.getOrCreatePath(c.cell, electionPath); n == nil { diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index 800e7791afa..86722477e53 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -46,6 +46,9 @@ func (c *Conn) Create(ctx context.Context, filePath string, contents []byte) (to if c.factory.err != nil { return nil, c.factory.err } + if err := c.factory.getOperationError(Create, filePath); err != nil { + return nil, err + } // Get the parent dir. dir, file := path.Split(filePath) @@ -92,6 +95,9 @@ func (c *Conn) Update(ctx context.Context, filePath string, contents []byte, ver if c.factory.err != nil { return nil, c.factory.err } + if err := c.factory.getOperationError(Update, filePath); err != nil { + return nil, err + } // Get the parent dir, we'll need it in case of creation. dir, file := path.Split(filePath) @@ -168,6 +174,9 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, if c.factory.err != nil { return nil, nil, c.factory.err } + if err := c.factory.getOperationError(Get, filePath); err != nil { + return nil, nil, err + } // Get the node. n := c.factory.nodeByPath(c.cell, filePath) @@ -195,8 +204,8 @@ func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, if c.factory.err != nil { return nil, c.factory.err } - if c.factory.listErr != nil { - return nil, c.factory.listErr + if err := c.factory.getOperationError(List, filePathPrefix); err != nil { + return nil, err } dir, file := path.Split(filePathPrefix) @@ -259,6 +268,9 @@ func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version if c.factory.err != nil { return c.factory.err } + if err := c.factory.getOperationError(Delete, filePath); err != nil { + return err + } // Get the parent dir. dir, file := path.Split(filePath) diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index afce7868469..d0943c7058d 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -44,6 +44,13 @@ type memoryTopoLockDescriptor struct { func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { c.factory.callstats.Add([]string{"TryLock"}, 1) + c.factory.mu.Lock() + err := c.factory.getOperationError(TryLock, dirPath) + c.factory.mu.Unlock() + if err != nil { + return nil, err + } + return c.Lock(ctx, dirPath, contents) } @@ -51,6 +58,13 @@ func (c *Conn) TryLock(ctx context.Context, dirPath, contents string) (topo.Lock func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { c.factory.callstats.Add([]string{"Lock"}, 1) + c.factory.mu.Lock() + err := c.factory.getOperationError(Lock, dirPath) + c.factory.mu.Unlock() + if err != nil { + return nil, err + } + return c.lock(ctx, dirPath, contents) } diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index 2b5385d8e28..9d703a2869a 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -23,6 +23,7 @@ import ( "context" "errors" "math/rand/v2" + "regexp" "strings" "sync" "sync/atomic" @@ -50,6 +51,25 @@ const ( UnreachableServerAddr = "unreachable" ) +// Operation is one of the operations defined by topo.Conn +type Operation int + +// The following is the list of topo.Conn operations +const ( + ListDir = Operation(iota) + Create + Update + Get + List + Delete + Lock + TryLock + Watch + WatchRecursive + NewLeaderParticipation + Close +) + // Factory is a memory-based implementation of topo.Factory. It // takes a file-system like approach, with directories at each level // being an actual directory node. This is meant to be closer to @@ -72,14 +92,20 @@ type Factory struct { // err is used for testing purposes to force queries / watches // to return the given error err error - // listErr is used for testing purposed to fake errors from - // calls to List. - listErr error + // operationErrors is used for testing purposes to fake errors from + // operations and paths matching the spec + operationErrors map[Operation][]errorSpec // callstats allows us to keep track of how many topo.Conn calls // we make (Create, Get, Update, Delete, List, ListDir, etc). callstats *stats.CountersWithMultiLabels } +type errorSpec struct { + op Operation + pathPattern *regexp.Regexp + err error +} + // HasGlobalReadOnlyCell is part of the topo.Factory interface. func (f *Factory) HasGlobalReadOnlyCell(serverAddr, root string) bool { return false @@ -248,9 +274,10 @@ func (n *node) PropagateWatchError(err error) { // in case of a problem. func NewServerAndFactory(ctx context.Context, cells ...string) (*topo.Server, *Factory) { f := &Factory{ - cells: make(map[string]*node), - generation: uint64(rand.Int64N(1 << 60)), - callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}), + cells: make(map[string]*node), + generation: uint64(rand.Int64N(1 << 60)), + callstats: stats.NewCountersWithMultiLabels("", "", []string{"Call"}), + operationErrors: make(map[Operation][]errorSpec), } f.cells[topo.GlobalCell] = f.newDirectory(topo.GlobalCell, nil) @@ -363,9 +390,23 @@ func (f *Factory) recursiveDelete(n *node) { } } -func (f *Factory) SetListError(err error) { +func (f *Factory) AddOperationError(op Operation, pathPattern string, err error) { f.mu.Lock() defer f.mu.Unlock() - f.listErr = err + f.operationErrors[op] = append(f.operationErrors[op], errorSpec{ + op: op, + pathPattern: regexp.MustCompile(pathPattern), + err: err, + }) +} + +func (f *Factory) getOperationError(op Operation, path string) error { + specs := f.operationErrors[op] + for _, spec := range specs { + if spec.pathPattern.MatchString(path) { + return spec.err + } + } + return nil } diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index 3651bcca9ce..dcb90a8f0ef 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -37,6 +37,9 @@ func (c *Conn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-c if c.factory.err != nil { return nil, nil, c.factory.err } + if err := c.factory.getOperationError(Watch, filePath); err != nil { + return nil, nil, err + } n := c.factory.nodeByPath(c.cell, filePath) if n == nil { @@ -89,6 +92,9 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc if c.factory.err != nil { return nil, nil, c.factory.err } + if err := c.factory.getOperationError(WatchRecursive, dirpath); err != nil { + return nil, nil, err + } n := c.factory.getOrCreatePath(c.cell, dirpath) if n == nil { diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index 493e448752b..671a0f43905 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -240,6 +240,7 @@ type GetTabletsByCellOptions struct { // GetTabletsByCell returns all the tablets in the cell. // It returns ErrNoNode if the cell doesn't exist. +// It returns ErrPartialResult if some tablets couldn't be read. The results in the slice are incomplete. // It returns (nil, nil) if the cell exists, but there are no tablets in it. func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) { // If the cell doesn't exist, this will return ErrNoNode. @@ -277,6 +278,7 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G // GetTabletsIndividuallyByCell returns a sorted list of tablets for topo servers that do not // directly support the topoConn.List() functionality. // It returns ErrNoNode if the cell doesn't exist. +// It returns ErrPartialResult if some tablets couldn't be read. The results in the slice are incomplete. // It returns (nil, nil) if the cell exists, but there are no tablets in it. func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) { // If the cell doesn't exist, this will return ErrNoNode. @@ -286,10 +288,14 @@ func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, } sort.Sort(topoproto.TabletAliasList(aliases)) + var partialResultErr error tabletMap, err := ts.GetTabletMap(ctx, aliases, opt) if err != nil { - // we got another error than topo.ErrNoNode - return nil, err + if IsErrType(err, PartialResult) { + partialResultErr = err + } else { + return nil, err + } } tablets := make([]*TabletInfo, 0, len(aliases)) for _, tabletAlias := range aliases { @@ -303,7 +309,7 @@ func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, } } - return tablets, nil + return tablets, partialResultErr } // UpdateTablet updates the tablet data only - not associated replication paths. diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go index 04eea71a8a2..3a0153a11b5 100644 --- a/go/vt/topo/tablet_test.go +++ b/go/vt/topo/tablet_test.go @@ -18,9 +18,12 @@ package topo_test import ( "context" + "errors" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" @@ -76,7 +79,7 @@ func TestServerGetTabletsByCell(t *testing.T) { ts, factory := memorytopo.NewServerAndFactory(ctx, cell) defer ts.Close() if tt.listError != nil { - factory.SetListError(tt.listError) + factory.AddOperationError(memorytopo.List, ".*", tt.listError) } // Create an ephemeral keyspace and generate shard records within @@ -116,3 +119,55 @@ func TestServerGetTabletsByCell(t *testing.T) { }) } } + +func TestServerGetTabletsByCellPartialResults(t *testing.T) { + const cell = "zone1" + const keyspace = "keyspace" + const shard = "shard" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts, factory := memorytopo.NewServerAndFactory(ctx, cell) + defer ts.Close() + + // Create an ephemeral keyspace and generate shard records within + // the keyspace to fetch later. + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablets := make([]*topo.TabletInfo, 3) + + for i := 0; i < len(tablets); i++ { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(i), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(i), + }, + Keyspace: keyspace, + Shard: shard, + } + tInfo := &topo.TabletInfo{Tablet: tablet} + tablets[i] = tInfo + require.NoError(t, ts.CreateTablet(ctx, tablet)) + } + + // Force fallback to getting tablets individually. + factory.AddOperationError(memorytopo.List, ".*", topo.NewError(topo.NoImplementation, "List not supported")) + + // Cause the Get for the second tablet to fail. + factory.AddOperationError(memorytopo.Get, "tablets/zone1-0000000001/Tablet", errors.New("fake error")) + + // Verify that we return a partial list of tablets and that each + // tablet matches what we expect. + out, err := ts.GetTabletsByCell(ctx, cell, nil) + assert.Error(t, err) + assert.True(t, topo.IsErrType(err, topo.PartialResult), "Not a partial result: %v", err) + assert.Len(t, out, 2) + assert.True(t, proto.Equal(tablets[0].Tablet, out[0].Tablet), "Got: %v, want %v", tablets[0].Tablet, out[0].Tablet) + assert.True(t, proto.Equal(tablets[2].Tablet, out[1].Tablet), "Got: %v, want %v", tablets[2].Tablet, out[1].Tablet) +}