Skip to content

Commit

Permalink
Support KeyRange in --clusters_to_watch flag (#17604)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Jan 28, 2025
1 parent 489fd05 commit 8921bce
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 93 deletions.
6 changes: 6 additions & 0 deletions changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- **[Stalled Disk Recovery in VTOrc](#stall-disk-recovery)**
- **[Update default MySQL version to 8.0.40](#mysql-8-0-40)**
- **[Update lite images to Debian Bookworm](#debian-bookworm)**
- **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)**
- **[Support for Filtering Query logs on Error](#query-logs)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
Expand Down Expand Up @@ -135,6 +136,11 @@ This is the last time this will be needed in the `8.0.x` series, as starting wit

The base system now uses Debian Bookworm instead of Debian Bullseye for the `vitess/lite` images. This change was brought by [Pull Request #17552].

### <a id="key-range-vtorc"/>KeyRanges in `--clusters_to_watch` in VTOrc</a>
VTOrc now supports specifying keyranges in the `--clusters_to_watch` flag. This means that there is no need to restart a VTOrc instance with a different flag value when you reshard a keyspace.
For example, if a VTOrc is configured to watch `ks/-80`, then it would watch all the shards that fall under the keyrange `-80`. If a reshard is performed and `-80` is split into new shards `-40` and `40-80`, the VTOrc instance will automatically start watching the new shards without needing a restart. In the previous logic, specifying `ks/-80` for the flag would mean that VTOrc would watch only 1 (or no) shard. In the new system, since we interpret `-80` as a key range, it can watch multiple shards as described in the example.
Users can continue to specify exact keyranges. The new feature is backward compatible.

### <a id="query-logs"/>Support for Filtering Query logs on Error</a>

The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet.
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Flags:
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/keyranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored.
--config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn)
--config-name string Name of the config file (without extension) to search for. (default "vtconfig")
Expand Down
8 changes: 8 additions & 0 deletions go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange {
return &topodatapb.KeyRange{Start: start, End: end}
}

// NewCompleteKeyRange returns a complete key range.
func NewCompleteKeyRange() *topodatapb.KeyRange {
return &topodatapb.KeyRange{
Start: nil,
End: nil,
}
}

// KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent,
// it returns false.
func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) {
Expand Down
8 changes: 8 additions & 0 deletions go/vt/topo/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ func TestValidateShardName(t *testing.T) {
},
valid: true,
},
{
name: "-",
expectedRange: &topodatapb.KeyRange{
Start: []byte{},
End: []byte{},
},
valid: true,
},
{
name: "40-80",
expectedRange: &topodatapb.KeyRange{
Expand Down
26 changes: 5 additions & 21 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package logic

import (
"context"
"sort"
"strings"
"sync"

"golang.org/x/exp/maps"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/vt/topo"
Expand All @@ -31,7 +31,7 @@ import (
// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards(ctx context.Context) error {
var keyspaces []string
if len(clustersToWatch) == 0 { // all known keyspaces
if len(shardsToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var err error
Expand All @@ -41,26 +41,10 @@ func RefreshAllKeyspacesAndShards(ctx context.Context) error {
return err
}
} else {
// Parse input and build list of keyspaces
for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") {
// This is a keyspace/shard specification
input := strings.Split(ks, "/")
keyspaces = append(keyspaces, input[0])
} else {
// Assume this is a keyspace
keyspaces = append(keyspaces, ks)
}
}
if len(keyspaces) == 0 {
log.Errorf("Found no keyspaces for input: %+v", clustersToWatch)
return nil
}
// Get keyspaces to watch from the list of known keyspaces.
keyspaces = maps.Keys(shardsToWatch)
}

// Sort the list of keyspaces.
// The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace
sort.Strings(keyspaces)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtorc/logic/keyspace_shard_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func TestRefreshAllKeyspaces(t *testing.T) {
// Set clusters to watch to only watch ks1 and ks3
onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"}
clustersToWatch = onlyKs1and3
err := initializeShardsToWatch()
require.NoError(t, err)
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))

// Verify that we only have ks1 and ks3 in vtorc's db.
Expand All @@ -106,6 +108,8 @@ func TestRefreshAllKeyspaces(t *testing.T) {

// Set clusters to watch to watch all keyspaces
clustersToWatch = nil
err = initializeShardsToWatch()
require.NoError(t, err)
// Change the durability policy of ks1
reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", policy.DurabilitySemiSync)
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))
Expand All @@ -119,7 +123,6 @@ func TestRefreshAllKeyspaces(t *testing.T) {
verifyPrimaryAlias(t, "ks3", "80-", "zone_ks3-0000000101", "")
verifyKeyspaceInfo(t, "ks4", keyspaceDurabilityTest, "")
verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "")

}

func TestRefreshKeyspace(t *testing.T) {
Expand Down
90 changes: 53 additions & 37 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/external/golib/sqlutils"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
Expand All @@ -48,27 +49,29 @@ var (
clustersToWatch []string
shutdownWaitTime = 30 * time.Second
shardsLockCounter int32
shardsToWatch map[string][]string
shardsToWatchMu sync.Mutex
// shardsToWatch is a map storing the shards for a given keyspace that need to be watched.
// We store the key range for all the shards that we want to watch.
// This is populated by parsing `--clusters_to_watch` flag.
shardsToWatch map[string][]*topodatapb.KeyRange

// ErrNoPrimaryTablet is a fixed error message.
ErrNoPrimaryTablet = errors.New("no primary tablet found")
)

// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/keyranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM")
}

// updateShardsToWatch parses the --clusters_to_watch flag-value
// initializeShardsToWatch parses the --clusters_to_watch flag-value
// into a map of keyspace/shards.
func updateShardsToWatch() {
func initializeShardsToWatch() error {
shardsToWatch = make(map[string][]*topodatapb.KeyRange)
if len(clustersToWatch) == 0 {
return
return nil
}

newShardsToWatch := make(map[string][]string, 0)
for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") {
// Validate keyspace/shard parses.
Expand All @@ -77,34 +80,50 @@ func updateShardsToWatch() {
log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err)
continue
}
newShardsToWatch[k] = append(newShardsToWatch[k], s)
if !key.IsValidKeyRange(s) {
return fmt.Errorf("invalid key range %q while parsing clusters to watch", s)
}
// Parse the shard name into key range value.
keyRanges, err := key.ParseShardingSpec(s)
if err != nil {
return fmt.Errorf("could not parse shard name %q: %+v", s, err)
}
shardsToWatch[k] = append(shardsToWatch[k], keyRanges...)
} else {
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
// Assume this is a keyspace and find all shards in keyspace.
// Remove trailing slash if exists.
ks = strings.TrimSuffix(ks, "/")
shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
// Log the err and continue.
log.Errorf("Error fetching shards for keyspace: %v", ks)
continue
}
if len(shards) == 0 {
log.Errorf("Topo has no shards for ks: %v", ks)
continue
}
newShardsToWatch[ks] = shards
// We store the entire range of key range if nothing is specified.
shardsToWatch[ks] = []*topodatapb.KeyRange{key.NewCompleteKeyRange()}
}
}
if len(newShardsToWatch) == 0 {
log.Error("No keyspace/shards to watch")
return

if len(shardsToWatch) == 0 {
log.Error("No keyspace/shards to watch, watching all keyspaces")
}
return nil
}

shardsToWatchMu.Lock()
defer shardsToWatchMu.Unlock()
shardsToWatch = newShardsToWatch
// shouldWatchTablet checks if the given tablet is part of the watch list.
func shouldWatchTablet(tablet *topodatapb.Tablet) bool {
// If we are watching all keyspaces, then we want to watch this tablet too.
if len(shardsToWatch) == 0 {
return true
}
shardRanges, ok := shardsToWatch[tablet.GetKeyspace()]
// If we don't have the keyspace in our map, then this tablet
// doesn't need to be watched.
if !ok {
return false
}
// Get the tablet's key range, and check if
// it is part of the shard ranges we are watching.
kr := tablet.GetKeyRange()
for _, shardRange := range shardRanges {
if key.KeyRangeContainsKeyRange(shardRange, kr) {
return true
}
}
return false
}

// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
Expand All @@ -117,7 +136,10 @@ func OpenTabletDiscovery() <-chan time.Time {
log.Error(err)
}
// Parse --clusters_to_watch into a filter.
updateShardsToWatch()
err := initializeShardsToWatch()
if err != nil {
log.Fatalf("Error parsing --clusters-to-watch: %v", err)
}
// We refresh all information from the topo once before we start the ticks to do
// it on a timer.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
Expand Down Expand Up @@ -179,16 +201,10 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f
// Filter tablets that should not be watched using shardsToWatch map.
matchedTablets := make([]*topo.TabletInfo, 0, len(tablets))
func() {
shardsToWatchMu.Lock()
defer shardsToWatchMu.Unlock()
for _, t := range tablets {
if len(shardsToWatch) > 0 {
_, ok := shardsToWatch[t.Tablet.Keyspace]
if !ok || !slices.Contains(shardsToWatch[t.Tablet.Keyspace], t.Tablet.Shard) {
continue // filter
}
if shouldWatchTablet(t.Tablet) {
matchedTablets = append(matchedTablets, t)
}
matchedTablets = append(matchedTablets, t)
}
}()

Expand Down
Loading

0 comments on commit 8921bce

Please sign in to comment.