From e2e58c91fac51f7ee4a3a89b8ad3d43edece43b6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 25 Jun 2024 14:07:05 -0400 Subject: [PATCH 01/14] Add named lock support And use that for VReplication workflows when coordination is necessary, such as between the VReplicaiton engine and the VDiff engine. Signed-off-by: Matt Lord --- go/vt/topo/conn.go | 8 ++ go/vt/topo/consultopo/lock.go | 17 ++-- go/vt/topo/consultopo/server.go | 2 +- go/vt/topo/etcd2topo/election.go | 2 +- go/vt/topo/etcd2topo/lock.go | 13 +++- go/vt/topo/faketopo/faketopo.go | 7 ++ go/vt/topo/keyspace_lock.go | 2 +- go/vt/topo/locks.go | 42 ++++++++-- go/vt/topo/memorytopo/lock.go | 16 +++- go/vt/topo/named_lock.go | 58 ++++++++++++++ go/vt/topo/routing_rules_lock.go | 2 +- go/vt/topo/server.go | 1 + go/vt/topo/shard_lock.go | 4 +- go/vt/topo/stats_conn.go | 20 +++-- go/vt/topo/stats_conn_test.go | 12 ++- go/vt/topo/zk2topo/lock.go | 5 ++ go/vt/vtctl/workflow/server.go | 77 +++++++++++++------ .../tabletmanager/vdiff/table_differ.go | 9 ++- 18 files changed, 239 insertions(+), 58 deletions(-) create mode 100644 go/vt/topo/named_lock.go diff --git a/go/vt/topo/conn.go b/go/vt/topo/conn.go index 348fc569ecf..10934be0287 100644 --- a/go/vt/topo/conn.go +++ b/go/vt/topo/conn.go @@ -120,6 +120,14 @@ type Conn interface { // Returns ErrInterrupted if ctx is canceled. Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) + // LockName is similar to `Lock` but the difference is that it does not require + // the path to exist and have children in order to lock it. This is because with + // named locks you are NOT locking an actual topo entity such as a Keyspace record. + // Because this lock is not blocking any Vitess operations OTHER than another + // caller that is trying to get the same named lock, there is a static 24 hour + // TTL on them to ensure that they are eventually cleaned up. + LockName(ctx context.Context, dirPath, contents string) (LockDescriptor, error) + // TryLock takes lock on the given directory with a fail-fast approach. // It is similar to `Lock` but the difference is it attempts to acquire the lock // if it is likely to succeed. If there is already a lock on given path, then unlike `Lock` diff --git a/go/vt/topo/consultopo/lock.go b/go/vt/topo/consultopo/lock.go index ae47b91cc6c..9924c6e80e0 100644 --- a/go/vt/topo/consultopo/lock.go +++ b/go/vt/topo/consultopo/lock.go @@ -49,7 +49,12 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD return nil, convertError(err, dirPath) } - return s.lock(ctx, dirPath, contents) + return s.lock(ctx, dirPath, contents, s.lockTTL) +} + +// LockName is part of the topo.Conn interface. +func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + return s.lock(ctx, dirPath, contents, topo.NamedLockTTL.String()) } // TryLock is part of the topo.Conn interface. @@ -74,11 +79,11 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo } // everything is good let's acquire the lock. - return s.lock(ctx, dirPath, contents) + return s.lock(ctx, dirPath, contents, s.lockTTL) } // Lock is part of the topo.Conn interface. -func (s *Server) lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { +func (s *Server) lock(ctx context.Context, dirPath, contents, ttl string) (topo.LockDescriptor, error) { lockPath := path.Join(s.root, dirPath, locksFilename) lockOpts := &api.LockOptions{ @@ -86,16 +91,14 @@ func (s *Server) lock(ctx context.Context, dirPath, contents string) (topo.LockD Value: []byte(contents), SessionOpts: &api.SessionEntry{ Name: api.DefaultLockSessionName, - TTL: api.DefaultLockSessionTTL, + TTL: ttl, }, } lockOpts.SessionOpts.Checks = s.lockChecks if s.lockDelay > 0 { lockOpts.SessionOpts.LockDelay = s.lockDelay } - if s.lockTTL != "" { - lockOpts.SessionOpts.TTL = s.lockTTL - } + lockOpts.SessionOpts.TTL = ttl // Build the lock structure. l, err := s.client.LockOpts(lockOpts) if err != nil { diff --git a/go/vt/topo/consultopo/server.go b/go/vt/topo/consultopo/server.go index a7a5446c274..c6890820c2d 100644 --- a/go/vt/topo/consultopo/server.go +++ b/go/vt/topo/consultopo/server.go @@ -111,7 +111,7 @@ type Server struct { locks map[string]*lockInstance lockChecks []string - lockTTL string + lockTTL string // This is the default used for all locks lockDelay time.Duration } diff --git a/go/vt/topo/etcd2topo/election.go b/go/vt/topo/etcd2topo/election.go index 276d9e60355..94768b50470 100644 --- a/go/vt/topo/etcd2topo/election.go +++ b/go/vt/topo/etcd2topo/election.go @@ -91,7 +91,7 @@ func (mp *etcdLeaderParticipation) WaitForLeadership() (context.Context, error) // Try to get the primaryship, by getting a lock. var err error - ld, err = mp.s.lock(lockCtx, electionPath, mp.id) + ld, err = mp.s.lock(lockCtx, electionPath, mp.id, leaseTTL) if err != nil { // It can be that we were interrupted. return nil, err diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go index 89095156471..67291457d63 100644 --- a/go/vt/topo/etcd2topo/lock.go +++ b/go/vt/topo/etcd2topo/lock.go @@ -153,7 +153,7 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo } // everything is good let's acquire the lock. - return s.lock(ctx, dirPath, contents) + return s.lock(ctx, dirPath, contents, leaseTTL) } // Lock is part of the topo.Conn interface. @@ -168,15 +168,20 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD return nil, convertError(err, dirPath) } - return s.lock(ctx, dirPath, contents) + return s.lock(ctx, dirPath, contents, leaseTTL) +} + +// LockName is part of the topo.Conn interface. +func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + return s.lock(ctx, dirPath, contents, int(topo.NamedLockTTL.Seconds())) } // lock is used by both Lock() and primary election. -func (s *Server) lock(ctx context.Context, nodePath, contents string) (topo.LockDescriptor, error) { +func (s *Server) lock(ctx context.Context, nodePath, contents string, ttl int) (topo.LockDescriptor, error) { nodePath = path.Join(s.root, nodePath, locksPath) // Get a lease, set its KeepAlive. - lease, err := s.cli.Grant(ctx, int64(leaseTTL)) + lease, err := s.cli.Grant(ctx, int64(ttl)) if err != nil { return nil, convertError(err, nodePath) } diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index 52a2a41c5df..39f39e652c5 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -291,6 +291,13 @@ func (f *FakeConn) Lock(ctx context.Context, dirPath, contents string) (topo.Loc return &fakeLockDescriptor{}, nil } +// LockName implements the Conn interface +func (f *FakeConn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + f.mu.Lock() + defer f.mu.Unlock() + return &fakeLockDescriptor{}, nil +} + // TryLock is part of the topo.Conn interface. Its implementation is same as Lock func (f *FakeConn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { return f.Lock(ctx, dirPath, contents) diff --git a/go/vt/topo/keyspace_lock.go b/go/vt/topo/keyspace_lock.go index 7df1b2ee64f..7da4498d2c3 100644 --- a/go/vt/topo/keyspace_lock.go +++ b/go/vt/topo/keyspace_lock.go @@ -46,7 +46,7 @@ func (s *keyspaceLock) Path() string { func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) { return ts.internalLock(ctx, &keyspaceLock{ keyspace: keyspace, - }, action, true) + }, action, Blocking) } // CheckKeyspaceLocked can be called on a context to make sure we have the lock diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go index 16caaf49dfb..26fba49c58a 100644 --- a/go/vt/topo/locks.go +++ b/go/vt/topo/locks.go @@ -46,6 +46,10 @@ var ( RemoteOperationTimeout = 15 * time.Second ) +// How long named locks are kept in the topo server. +// This ensures that orphaned named locks are not kept around. +const NamedLockTTL = 24 * time.Hour + // Lock describes a long-running lock on a keyspace or a shard. // It needs to be public as we JSON-serialize it. type Lock struct { @@ -120,6 +124,28 @@ type locksKeyType int var locksKey locksKeyType +// Support different lock types. +type LockType int + +const ( + Blocking LockType = iota + NonBlocking + Named +) + +func (lt LockType) String() string { + switch lt { + case Blocking: + return "blocking" + case NonBlocking: + return "non blocking" + case Named: + return "named" + default: + return "unknown" + } +} + // iTopoLock is the interface for knowing the resource that is being locked. // It allows for better controlling nuances for different lock types and log messages. type iTopoLock interface { @@ -129,7 +155,7 @@ type iTopoLock interface { } // perform the topo lock operation -func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, isBlocking bool) (LockDescriptor, error) { +func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, lockType LockType) (LockDescriptor, error) { log.Infof("Locking %v %v for action %v", lt.Type(), lt.ResourceName(), l.Action) ctx, cancel := context.WithTimeout(ctx, LockTimeout) @@ -143,10 +169,16 @@ func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, isBlocking bo if err != nil { return nil, err } - if isBlocking { + switch lockType { + case Blocking: return ts.globalCell.Lock(ctx, lt.Path(), j) + case NonBlocking: + return ts.globalCell.TryLock(ctx, lt.Path(), j) + case Named: + return ts.globalCell.LockName(ctx, lt.Path(), j) + default: + return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unknown lock type %s", lockType) } - return ts.globalCell.TryLock(ctx, lt.Path(), j) } // unlock unlocks a previously locked key. @@ -174,7 +206,7 @@ func (l *Lock) unlock(ctx context.Context, lt iTopoLock, lockDescriptor LockDesc return lockDescriptor.Unlock(ctx) } -func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string, isBlocking bool) (context.Context, func(*error), error) { +func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string, lockType LockType) (context.Context, func(*error), error) { i, ok := ctx.Value(locksKey).(*locksInfo) if !ok { i = &locksInfo{ @@ -191,7 +223,7 @@ func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string, // lock it l := newLock(action) - lockDescriptor, err := l.lock(ctx, ts, lt, isBlocking) + lockDescriptor, err := l.lock(ctx, ts, lt, lockType) if err != nil { return nil, nil, err } diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index d0943c7058d..0c373e32f6b 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -65,11 +65,16 @@ func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDes return nil, err } - return c.lock(ctx, dirPath, contents) + return c.lock(ctx, dirPath, contents, false) +} + +// LockName is part of the topo.Conn interface. +func (c *Conn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + return c.lock(ctx, dirPath, contents, true) } // Lock is part of the topo.Conn interface. -func (c *Conn) lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { +func (c *Conn) lock(ctx context.Context, dirPath, contents string, named bool) (topo.LockDescriptor, error) { for { if err := c.dial(ctx); err != nil { return nil, err @@ -82,7 +87,12 @@ func (c *Conn) lock(ctx context.Context, dirPath, contents string) (topo.LockDes return nil, c.factory.err } - n := c.factory.nodeByPath(c.cell, dirPath) + var n *node + if named { + n = c.factory.getOrCreatePath(c.cell, dirPath) + } else { + n = c.factory.nodeByPath(c.cell, dirPath) + } if n == nil { c.factory.mu.Unlock() return nil, topo.NewError(topo.NoNode, dirPath) diff --git a/go/vt/topo/named_lock.go b/go/vt/topo/named_lock.go new file mode 100644 index 00000000000..d550130ce94 --- /dev/null +++ b/go/vt/topo/named_lock.go @@ -0,0 +1,58 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topo + +import ( + "context" + "path" +) + +type namedLock struct { + name string +} + +var _ iTopoLock = (*namedLock)(nil) + +func (s *namedLock) Type() string { + return "named" +} + +func (s *namedLock) ResourceName() string { + return s.name +} + +func (s *namedLock) Path() string { + return path.Join(NamedLocksPath, s.name) +} + +// LockName will lock the opaque identifier, and return: +// - a context with a locksInfo structure for future reference. +// - an unlock method +// - an error if anything failed. +func (ts *Server) LockName(ctx context.Context, name, action string) (context.Context, func(*error), error) { + return ts.internalLock(ctx, &namedLock{ + name: name, + }, action, Named) +} + +// CheckNamedLocked can be called on a context to make sure we have the lock +// for a given opaque identifier. +func CheckNameLocked(ctx context.Context, name string) error { + return checkLocked(ctx, &namedLock{ + name: name, + }) +} diff --git a/go/vt/topo/routing_rules_lock.go b/go/vt/topo/routing_rules_lock.go index c45ddb738c9..ea2053fab69 100644 --- a/go/vt/topo/routing_rules_lock.go +++ b/go/vt/topo/routing_rules_lock.go @@ -38,7 +38,7 @@ func (s *routingRules) Path() string { // LockRoutingRules acquires a lock for routing rules. func (ts *Server) LockRoutingRules(ctx context.Context, action string) (context.Context, func(*error), error) { - return ts.internalLock(ctx, &routingRules{}, action, true) + return ts.internalLock(ctx, &routingRules{}, action, Blocking) } // CheckRoutingRulesLocked checks if a lock for routing rules is still possessed. diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index 6e5fadd8b97..a4c6695c9fb 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -94,6 +94,7 @@ const ( ExternalClusterVitess = "vitess" RoutingRulesPath = "routing_rules" KeyspaceRoutingRulesPath = "keyspace" + NamedLocksPath = "internal/named_locks" ) // Factory is a factory interface to create Conn objects. diff --git a/go/vt/topo/shard_lock.go b/go/vt/topo/shard_lock.go index 72d0b1c8ca4..4f0980fc293 100644 --- a/go/vt/topo/shard_lock.go +++ b/go/vt/topo/shard_lock.go @@ -63,7 +63,7 @@ func (ts *Server) LockShard(ctx context.Context, keyspace, shard, action string) return ts.internalLock(ctx, &shardLock{ keyspace: keyspace, shard: shard, - }, action, true) + }, action, Blocking) } // TryLockShard will lock the shard, and return: @@ -85,7 +85,7 @@ func (ts *Server) TryLockShard(ctx context.Context, keyspace, shard, action stri return ts.internalLock(ctx, &shardLock{ keyspace: keyspace, shard: shard, - }, action, false) + }, action, NonBlocking) } // CheckShardLocked can be called on a context to make sure we have the lock diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 34c45d793ac..1a96cebc10d 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -159,16 +159,21 @@ func (st *StatsConn) Delete(ctx context.Context, filePath string, version Versio // Lock is part of the Conn interface func (st *StatsConn) Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) { - return st.internalLock(ctx, dirPath, contents, true) + return st.internalLock(ctx, dirPath, contents, Blocking) +} + +// LockName is part of the Conn interface +func (st *StatsConn) LockName(ctx context.Context, dirPath, contents string) (LockDescriptor, error) { + return st.internalLock(ctx, dirPath, contents, Named) } // TryLock is part of the topo.Conn interface. Its implementation is same as Lock func (st *StatsConn) TryLock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) { - return st.internalLock(ctx, dirPath, contents, false) + return st.internalLock(ctx, dirPath, contents, NonBlocking) } // TryLock is part of the topo.Conn interface. Its implementation is same as Lock -func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, isBlocking bool) (LockDescriptor, error) { +func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, lockType LockType) (LockDescriptor, error) { statsKey := []string{"Lock", st.cell} if st.readOnly { return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, readOnlyErrorStrFormat, statsKey[0], dirPath) @@ -177,10 +182,15 @@ func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, defer topoStatsConnTimings.Record(statsKey, startTime) var res LockDescriptor var err error - if isBlocking { + switch lockType { + case Blocking: res, err = st.conn.Lock(ctx, dirPath, contents) - } else { + case NonBlocking: res, err = st.conn.TryLock(ctx, dirPath, contents) + case Named: + res, err = st.conn.LockName(ctx, dirPath, contents) + default: + return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unknown lock type %s", lockType) } if err != nil { topoStatsConnErrors.Add(statsKey, int64(1)) diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index 78f9cc6eb72..17888e3dcaa 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -108,7 +108,17 @@ func (st *fakeConn) Lock(ctx context.Context, dirPath, contents string) (lock Lo } if dirPath == "error" { return lock, fmt.Errorf("dummy error") + } + return lock, err +} +// LockName is part of the Conn interface +func (st *fakeConn) LockName(ctx context.Context, dirPath, contents string) (lock LockDescriptor, err error) { + if st.readOnly { + return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, "topo server connection is read-only") + } + if dirPath == "error" { + return lock, fmt.Errorf("dummy error") } return lock, err } @@ -121,7 +131,6 @@ func (st *fakeConn) TryLock(ctx context.Context, dirPath, contents string) (lock } if dirPath == "error" { return lock, fmt.Errorf("dummy error") - } return lock, err } @@ -140,7 +149,6 @@ func (st *fakeConn) WatchRecursive(ctx context.Context, path string) (current [] func (st *fakeConn) NewLeaderParticipation(name, id string) (mp LeaderParticipation, err error) { if name == "error" { return mp, fmt.Errorf("dummy error") - } return mp, err } diff --git a/go/vt/topo/zk2topo/lock.go b/go/vt/topo/zk2topo/lock.go index 5baf1f7f33f..83cec41b1a3 100644 --- a/go/vt/topo/zk2topo/lock.go +++ b/go/vt/topo/zk2topo/lock.go @@ -42,6 +42,11 @@ func (zs *Server) Lock(ctx context.Context, dirPath, contents string) (topo.Lock return zs.lock(ctx, dirPath, contents) } +// LockName is part of the topo.Conn interface. +func (zs *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + return zs.lock(ctx, dirPath, contents) +} + // TryLock is part of the topo.Conn interface. func (zs *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { // We list all the entries under dirPath diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 31c27601f6b..f3da478ca98 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1460,13 +1460,21 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } sw := &switcher{s: s, ts: ts} - lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate") + + // When creating the workflow, locking the workflow and its target keyspace is sufficient. + lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "SwitchWrites") + if lockErr != nil { + ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) + return nil, lockErr + } + defer workflowUnlock(&err) + ctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate") if lockErr != nil { ts.Logger().Errorf("Locking target keyspace %s failed: %v", ts.TargetKeyspaceName(), lockErr) return nil, lockErr } defer targetUnlock(&err) - ctx = lockCtx // If we get an error after this point, where the vreplication streams/records // have been created, then we clean up the workflow's artifacts. @@ -2571,24 +2579,30 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, } else { sw = &switcher{s: s, ts: ts} } - var tctx context.Context - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets") + + // Lock the workflow along with its source and target keyspaces. + lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "SwitchWrites") + if lockErr != nil { + ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) + } + defer workflowUnlock(&err) + ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets") if lockErr != nil { ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr } defer sourceUnlock(&err) - ctx = tctx - if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets") + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) - ctx = tctx + ctx = lockCtx } + if !keepData { switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: @@ -2763,23 +2777,30 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy } else { sw = &switcher{ts: ts, s: s} } - var tctx context.Context - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources") + + // Lock the workflow and its source and target keyspaces. + lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "SwitchWrites") + if lockErr != nil { + ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) + } + defer workflowUnlock(&err) + ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources") if lockErr != nil { ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr } defer sourceUnlock(&err) - ctx = tctx if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources") + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) - ctx = tctx + ctx = lockCtx } + if !force { if err := sw.validateWorkflowHasCompleted(ctx); err != nil { ts.Logger().Errorf("Workflow has not completed, cannot DropSources: %v", err) @@ -2997,14 +3018,21 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche } else { sw = &switcher{s: s, ts: ts} } - var tctx context.Context - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow") + + // Lock the workflow and its target keyspace. + lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "SwitchWrites") + if lockErr != nil { + ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) + } + defer workflowUnlock(&err) + ctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) - ctx = tctx + if err := sw.dropTargetVReplicationStreams(ctx); err != nil { return nil, err } @@ -3238,7 +3266,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return handleError("workflow validation failed", err) } - // For reads, locking the source keyspace is sufficient. + // For switching reads, locking the source keyspace is sufficient. ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads") if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) @@ -3310,19 +3338,24 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } - // Need to lock both source and target keyspaces. - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") + // Lock the workflow and its source and target keyspaces. + lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "SwitchWrites") + if lockErr != nil { + return handleError(fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr) + } + defer workflowUnlock(&err) + ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } - ctx = tctx defer sourceUnlock(&err) if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites") + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites") if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } - ctx = tctx + ctx = lockCtx defer targetUnlock(&err) } diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index a98a3ce90f9..93f61af197d 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -115,10 +115,11 @@ func (td *tableDiffer) initialize(ctx context.Context) error { defer dbClient.Close() targetKeyspace := td.wd.ct.vde.thisTablet.Keyspace - log.Infof("Locking target keyspace %s", targetKeyspace) - ctx, unlock, lockErr := td.wd.ct.ts.LockKeyspace(ctx, targetKeyspace, "vdiff") + lockName := fmt.Sprintf("%s/%s", targetKeyspace, td.wd.ct.workflow) + log.Infof("Locking workflow %s", lockName) + ctx, unlock, lockErr := td.wd.ct.ts.LockName(ctx, lockName, "vdiff") if lockErr != nil { - log.Errorf("LockKeyspace failed: %v", lockErr) + log.Errorf("LockName failed: %v", lockErr) return lockErr } @@ -126,7 +127,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { defer func() { unlock(&err) if err != nil { - log.Errorf("UnlockKeyspace %s failed: %v", targetKeyspace, err) + log.Errorf("Unlocking workflow %s failed: %v", lockName, err) } }() From 0809dad24464696883914a283c407f7940b8d284 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 26 Jun 2024 09:30:17 -0400 Subject: [PATCH 02/14] Add ability to override global implementation lock TTL Signed-off-by: Matt Lord --- go/vt/topo/conn.go | 8 ++ go/vt/topo/consultopo/lock.go | 29 +++++++- go/vt/topo/etcd2topo/lock.go | 16 ++++ go/vt/topo/faketopo/faketopo.go | 10 ++- go/vt/topo/keyspace_lock.go | 4 +- go/vt/topo/locks.go | 73 +++++++++++++++++-- go/vt/topo/memorytopo/lock.go | 16 ++++ go/vt/topo/named_lock.go | 2 +- go/vt/topo/routing_rules_lock.go | 2 +- go/vt/topo/shard_lock.go | 4 +- go/vt/topo/stats_conn.go | 21 ++++-- go/vt/topo/stats_conn_test.go | 14 +++- go/vt/topo/zk2topo/lock.go | 7 ++ go/vt/vtctl/workflow/server.go | 18 ++++- go/vt/vtctl/workflow/switcher.go | 6 +- go/vt/vtctl/workflow/switcher_dry_run.go | 3 +- go/vt/vtctl/workflow/switcher_interface.go | 4 +- .../tabletmanager/vdiff/table_differ.go | 2 +- 18 files changed, 206 insertions(+), 33 deletions(-) diff --git a/go/vt/topo/conn.go b/go/vt/topo/conn.go index 10934be0287..b00bdc67207 100644 --- a/go/vt/topo/conn.go +++ b/go/vt/topo/conn.go @@ -19,6 +19,7 @@ package topo import ( "context" "sort" + "time" ) // Conn defines the interface that must be implemented by topology @@ -120,6 +121,13 @@ type Conn interface { // Returns ErrInterrupted if ctx is canceled. Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) + // LockWithTTL is similar to `Lock` but the difference is that it allows + // you to override the global default TTL that is configured for the + // implementation (--topo_etcd_lease_ttl and --topo_consul_lock_session_ttl). + // Note: this is no different than `Lock` for ZooKeeper as it does not + // support lock TTLs and they exist until released or the session ends. + LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (LockDescriptor, error) + // LockName is similar to `Lock` but the difference is that it does not require // the path to exist and have children in order to lock it. This is because with // named locks you are NOT locking an actual topo entity such as a Keyspace record. diff --git a/go/vt/topo/consultopo/lock.go b/go/vt/topo/consultopo/lock.go index 9924c6e80e0..49554474677 100644 --- a/go/vt/topo/consultopo/lock.go +++ b/go/vt/topo/consultopo/lock.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "path" + "time" "github.com/hashicorp/consul/api" @@ -52,6 +53,21 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD return s.lock(ctx, dirPath, contents, s.lockTTL) } +// LockWithTTL is part of the topo.Conn interface. +func (s *Server) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (topo.LockDescriptor, error) { + // We list the directory first to make sure it exists. + if _, err := s.ListDir(ctx, dirPath, false /*full*/); err != nil { + // We need to return the right error codes, like + // topo.ErrNoNode and topo.ErrInterrupted, and the + // easiest way to do this is to return convertError(err). + // It may lose some of the context, if this is an issue, + // maybe logging the error would work here. + return nil, convertError(err, dirPath) + } + + return s.lock(ctx, dirPath, contents, ttl.String()) +} + // LockName is part of the topo.Conn interface. func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { return s.lock(ctx, dirPath, contents, topo.NamedLockTTL.String()) @@ -91,14 +107,23 @@ func (s *Server) lock(ctx context.Context, dirPath, contents, ttl string) (topo. Value: []byte(contents), SessionOpts: &api.SessionEntry{ Name: api.DefaultLockSessionName, - TTL: ttl, + TTL: api.DefaultLockSessionTTL, }, } lockOpts.SessionOpts.Checks = s.lockChecks + if s.lockTTL != "" { + // Override the API default with the global default from + // --topo_consul_lock_session_ttl. + lockOpts.SessionOpts.TTL = s.lockTTL + } + if ttl != "" { + // Override the global default with the one provided by the + // caller. + lockOpts.SessionOpts.TTL = ttl + } if s.lockDelay > 0 { lockOpts.SessionOpts.LockDelay = s.lockDelay } - lockOpts.SessionOpts.TTL = ttl // Build the lock structure. l, err := s.client.LockOpts(lockOpts) if err != nil { diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go index 67291457d63..5b7c0d210d3 100644 --- a/go/vt/topo/etcd2topo/lock.go +++ b/go/vt/topo/etcd2topo/lock.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "path" + "time" "github.com/spf13/pflag" @@ -171,6 +172,21 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD return s.lock(ctx, dirPath, contents, leaseTTL) } +// LockWithTTL is part of the topo.Conn interface. +func (s *Server) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (topo.LockDescriptor, error) { + // We list the directory first to make sure it exists. + if _, err := s.ListDir(ctx, dirPath, false /*full*/); err != nil { + // We need to return the right error codes, like + // topo.ErrNoNode and topo.ErrInterrupted, and the + // easiest way to do this is to return convertError(err). + // It may lose some of the context, if this is an issue, + // maybe logging the error would work here. + return nil, convertError(err, dirPath) + } + + return s.lock(ctx, dirPath, contents, int(ttl.Seconds())) +} + // LockName is part of the topo.Conn interface. func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { return s.lock(ctx, dirPath, contents, int(topo.NamedLockTTL.Seconds())) diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index 39f39e652c5..0c88b95e3da 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -20,6 +20,7 @@ import ( "context" "strings" "sync" + "time" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -291,7 +292,14 @@ func (f *FakeConn) Lock(ctx context.Context, dirPath, contents string) (topo.Loc return &fakeLockDescriptor{}, nil } -// LockName implements the Conn interface +// LockWithTTL implements the Conn interface. +func (f *FakeConn) LockWithTTL(ctx context.Context, dirPath, contents string, _ time.Duration) (topo.LockDescriptor, error) { + f.mu.Lock() + defer f.mu.Unlock() + return &fakeLockDescriptor{}, nil +} + +// LockName implements the Conn interface. func (f *FakeConn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { f.mu.Lock() defer f.mu.Unlock() diff --git a/go/vt/topo/keyspace_lock.go b/go/vt/topo/keyspace_lock.go index 7da4498d2c3..fe9da9a5f8b 100644 --- a/go/vt/topo/keyspace_lock.go +++ b/go/vt/topo/keyspace_lock.go @@ -43,10 +43,10 @@ func (s *keyspaceLock) Path() string { // - a context with a locksInfo structure for future reference. // - an unlock method // - an error if anything failed. -func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) { +func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string, opts ...LockOption) (context.Context, func(*error), error) { return ts.internalLock(ctx, &keyspaceLock{ keyspace: keyspace, - }, action, Blocking) + }, action, opts...) } // CheckKeyspaceLocked can be called on a context to make sure we have the lock diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go index 26fba49c58a..e73eccee38e 100644 --- a/go/vt/topo/locks.go +++ b/go/vt/topo/locks.go @@ -47,7 +47,8 @@ var ( ) // How long named locks are kept in the topo server. -// This ensures that orphaned named locks are not kept around. +// This ensures that orphaned named locks are not kept around. This +// should never happen, but it provides a final safety net. const NamedLockTTL = 24 * time.Hour // Lock describes a long-running lock on a keyspace or a shard. @@ -58,6 +59,7 @@ type Lock struct { HostName string UserName string Time string + Options lockOptions // Status is the current status of the Lock. Status string @@ -155,7 +157,7 @@ type iTopoLock interface { } // perform the topo lock operation -func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, lockType LockType) (LockDescriptor, error) { +func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, opts ...LockOption) (LockDescriptor, error) { log.Infof("Locking %v %v for action %v", lt.Type(), lt.ResourceName(), l.Action) ctx, cancel := context.WithTimeout(ctx, LockTimeout) @@ -169,15 +171,21 @@ func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, lockType Lock if err != nil { return nil, err } - switch lockType { - case Blocking: - return ts.globalCell.Lock(ctx, lt.Path(), j) + + for _, o := range opts { + o.apply(&l.Options) + } + + switch l.Options.lockType { case NonBlocking: return ts.globalCell.TryLock(ctx, lt.Path(), j) case Named: return ts.globalCell.LockName(ctx, lt.Path(), j) default: - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unknown lock type %s", lockType) + if l.Options.ttl != 0 { + return ts.globalCell.LockWithTTL(ctx, lt.Path(), j, l.Options.ttl) + } + return ts.globalCell.Lock(ctx, lt.Path(), j) } } @@ -206,7 +214,7 @@ func (l *Lock) unlock(ctx context.Context, lt iTopoLock, lockDescriptor LockDesc return lockDescriptor.Unlock(ctx) } -func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string, lockType LockType) (context.Context, func(*error), error) { +func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string, opts ...LockOption) (context.Context, func(*error), error) { i, ok := ctx.Value(locksKey).(*locksInfo) if !ok { i = &locksInfo{ @@ -223,7 +231,7 @@ func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string, // lock it l := newLock(action) - lockDescriptor, err := l.lock(ctx, ts, lt, lockType) + lockDescriptor, err := l.lock(ctx, ts, lt, opts...) if err != nil { return nil, nil, err } @@ -278,3 +286,52 @@ func checkLocked(ctx context.Context, lt iTopoLock) error { // Check the lock server implementation still holds the lock. return li.lockDescriptor.Check(ctx) } + +// lockOptions configure a Lock call. lockOptions are set by the LockOption +// values passed to the lock functions. +type lockOptions struct { + lockType LockType + ttl time.Duration +} + +// LockOption configures how we perform the locking operation. +type LockOption interface { + apply(*lockOptions) +} + +// funcLockOption wraps a function that modifies lockOptions into an +// implementation of the LockOption interface. +type funcLockOption struct { + f func(*lockOptions) +} + +func (flo *funcLockOption) apply(lo *lockOptions) { + flo.f(lo) +} + +func newFuncLockOption(f func(*lockOptions)) *funcLockOption { + return &funcLockOption{ + f: f, + } +} + +// WithTimeToLive allows you to specify how long the underlying topo server +// implementation should hold the lock before releasing it — even if the caller +// has not explicitly released it. This provides a way to override the global +// ttl values that are set via --topo_consul_lock_session_ttl and +// --topo_etcd_lease_ttl. +// Note: This option is ignored by the ZooKeeper implementation as it does not +// support TTLs. +func WithTimeToLive(ttl time.Duration) LockOption { + return newFuncLockOption(func(o *lockOptions) { + o.ttl = ttl + }) +} + +// WithType determines the type of lock we take. The options are defined +// by the LockType type. +func WithType(lt LockType) LockOption { + return newFuncLockOption(func(o *lockOptions) { + o.lockType = lt + }) +} diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index 0c373e32f6b..09beaddaf55 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -19,6 +19,7 @@ package memorytopo import ( "context" "fmt" + "time" "vitess.io/vitess/go/vt/topo" ) @@ -68,6 +69,21 @@ func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDes return c.lock(ctx, dirPath, contents, false) } +// LockWithTTL is part of the topo.Conn interface. It behaves the same as Lock +// as TTLs are not supported in memorytopo. +func (c *Conn) LockWithTTL(ctx context.Context, dirPath, contents string, _ time.Duration) (topo.LockDescriptor, error) { + c.factory.callstats.Add([]string{"LockWithTTL"}, 1) + + c.factory.mu.Lock() + err := c.factory.getOperationError(Lock, dirPath) + c.factory.mu.Unlock() + if err != nil { + return nil, err + } + + return c.lock(ctx, dirPath, contents, false) +} + // LockName is part of the topo.Conn interface. func (c *Conn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { return c.lock(ctx, dirPath, contents, true) diff --git a/go/vt/topo/named_lock.go b/go/vt/topo/named_lock.go index d550130ce94..124fd5dca9a 100644 --- a/go/vt/topo/named_lock.go +++ b/go/vt/topo/named_lock.go @@ -46,7 +46,7 @@ func (s *namedLock) Path() string { func (ts *Server) LockName(ctx context.Context, name, action string) (context.Context, func(*error), error) { return ts.internalLock(ctx, &namedLock{ name: name, - }, action, Named) + }, action, WithType(Named)) } // CheckNamedLocked can be called on a context to make sure we have the lock diff --git a/go/vt/topo/routing_rules_lock.go b/go/vt/topo/routing_rules_lock.go index ea2053fab69..848f4eaf6bd 100644 --- a/go/vt/topo/routing_rules_lock.go +++ b/go/vt/topo/routing_rules_lock.go @@ -38,7 +38,7 @@ func (s *routingRules) Path() string { // LockRoutingRules acquires a lock for routing rules. func (ts *Server) LockRoutingRules(ctx context.Context, action string) (context.Context, func(*error), error) { - return ts.internalLock(ctx, &routingRules{}, action, Blocking) + return ts.internalLock(ctx, &routingRules{}, action) } // CheckRoutingRulesLocked checks if a lock for routing rules is still possessed. diff --git a/go/vt/topo/shard_lock.go b/go/vt/topo/shard_lock.go index 4f0980fc293..829c9aeeda3 100644 --- a/go/vt/topo/shard_lock.go +++ b/go/vt/topo/shard_lock.go @@ -63,7 +63,7 @@ func (ts *Server) LockShard(ctx context.Context, keyspace, shard, action string) return ts.internalLock(ctx, &shardLock{ keyspace: keyspace, shard: shard, - }, action, Blocking) + }, action) } // TryLockShard will lock the shard, and return: @@ -85,7 +85,7 @@ func (ts *Server) TryLockShard(ctx context.Context, keyspace, shard, action stri return ts.internalLock(ctx, &shardLock{ keyspace: keyspace, shard: shard, - }, action, NonBlocking) + }, action, WithType(NonBlocking)) } // CheckShardLocked can be called on a context to make sure we have the lock diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 1a96cebc10d..2636cec7951 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -159,21 +159,26 @@ func (st *StatsConn) Delete(ctx context.Context, filePath string, version Versio // Lock is part of the Conn interface func (st *StatsConn) Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) { - return st.internalLock(ctx, dirPath, contents, Blocking) + return st.internalLock(ctx, dirPath, contents, Blocking, 0) +} + +// LockWithTTL is part of the Conn interface +func (st *StatsConn) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (LockDescriptor, error) { + return st.internalLock(ctx, dirPath, contents, Blocking, 0) } // LockName is part of the Conn interface func (st *StatsConn) LockName(ctx context.Context, dirPath, contents string) (LockDescriptor, error) { - return st.internalLock(ctx, dirPath, contents, Named) + return st.internalLock(ctx, dirPath, contents, Named, 0) } // TryLock is part of the topo.Conn interface. Its implementation is same as Lock func (st *StatsConn) TryLock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) { - return st.internalLock(ctx, dirPath, contents, NonBlocking) + return st.internalLock(ctx, dirPath, contents, NonBlocking, 0) } // TryLock is part of the topo.Conn interface. Its implementation is same as Lock -func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, lockType LockType) (LockDescriptor, error) { +func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, lockType LockType, ttl time.Duration) (LockDescriptor, error) { statsKey := []string{"Lock", st.cell} if st.readOnly { return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, readOnlyErrorStrFormat, statsKey[0], dirPath) @@ -183,14 +188,16 @@ func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, var res LockDescriptor var err error switch lockType { - case Blocking: - res, err = st.conn.Lock(ctx, dirPath, contents) case NonBlocking: res, err = st.conn.TryLock(ctx, dirPath, contents) case Named: res, err = st.conn.LockName(ctx, dirPath, contents) default: - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unknown lock type %s", lockType) + if ttl != 0 { + res, err = st.conn.LockWithTTL(ctx, dirPath, contents, ttl) + } else { + res, err = st.conn.Lock(ctx, dirPath, contents) + } } if err != nil { topoStatsConnErrors.Add(statsKey, int64(1)) diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index 17888e3dcaa..59dbd795d01 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "testing" + "time" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -112,7 +113,18 @@ func (st *fakeConn) Lock(ctx context.Context, dirPath, contents string) (lock Lo return lock, err } -// LockName is part of the Conn interface +// LockWithTTL is part of the Conn interface. +func (st *fakeConn) LockWithTTL(ctx context.Context, dirPath, contents string, _ time.Duration) (lock LockDescriptor, err error) { + if st.readOnly { + return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, "topo server connection is read-only") + } + if dirPath == "error" { + return lock, fmt.Errorf("dummy error") + } + return lock, err +} + +// LockName is part of the Conn interface. func (st *fakeConn) LockName(ctx context.Context, dirPath, contents string) (lock LockDescriptor, err error) { if st.readOnly { return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, "topo server connection is read-only") diff --git a/go/vt/topo/zk2topo/lock.go b/go/vt/topo/zk2topo/lock.go index 83cec41b1a3..fdd9fbd0137 100644 --- a/go/vt/topo/zk2topo/lock.go +++ b/go/vt/topo/zk2topo/lock.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "path" + "time" "github.com/z-division/go-zookeeper/zk" @@ -42,6 +43,12 @@ func (zs *Server) Lock(ctx context.Context, dirPath, contents string) (topo.Lock return zs.lock(ctx, dirPath, contents) } +// LockWithTTL is part of the topo.Conn interface. It behaves the same as Lock +// as TTLs are not supported in Zookeeper. +func (zs *Server) LockWithTTL(ctx context.Context, dirPath, contents string, _ time.Duration) (topo.LockDescriptor, error) { + return zs.lock(ctx, dirPath, contents) +} + // LockName is part of the topo.Conn interface. func (zs *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { return zs.lock(ctx, dirPath, contents) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index f3da478ca98..3a05bfd0d3e 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3267,7 +3267,13 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } // For switching reads, locking the source keyspace is sufficient. - ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads") + // We need to hold the keyspace locks longer than the command timeout. + cmdTimeout := req.GetTimeout() + if cmdTimeout == nil { + cmdTimeout = &vttimepb.Duration{Seconds: 30} + } + ksLockTimeout := time.Duration(int64(time.Second) * (cmdTimeout.Seconds * 2)) + ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTimeToLive(ksLockTimeout)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } @@ -3345,13 +3351,19 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError(fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr) } defer workflowUnlock(&err) - ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") + // We need to hold the keyspace locks longer than the command timeout. + cmdTimeout := req.GetTimeout() + if cmdTimeout == nil { + cmdTimeout = &vttimepb.Duration{Seconds: 30} + } + ksLockTimeout := time.Duration(int64(time.Second) * (cmdTimeout.Seconds * 2)) + ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", topo.WithTimeToLive(ksLockTimeout)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer sourceUnlock(&err) if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites") + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites", topo.WithTimeToLive(ksLockTimeout)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go index aa41655aab8..7d26d6681da 100644 --- a/go/vt/vtctl/workflow/switcher.go +++ b/go/vt/vtctl/workflow/switcher.go @@ -20,6 +20,8 @@ import ( "context" "time" + "vitess.io/vitess/go/vt/topo" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -126,8 +128,8 @@ func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { r.ts.cancelMigration(ctx, sm) } -func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) { - return r.s.ts.LockKeyspace(ctx, keyspace, action) +func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) { + return r.s.ts.LockKeyspace(ctx, keyspace, action, opts...) } func (r *switcher) freezeTargetVReplication(ctx context.Context) error { diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go index b8b1369bdf7..d04aa336c50 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run.go +++ b/go/vt/vtctl/workflow/switcher_dry_run.go @@ -27,6 +27,7 @@ import ( "golang.org/x/exp/maps" "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/vt/topo" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -293,7 +294,7 @@ func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrato dr.drLog.Log("Cancel migration as requested") } -func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) (context.Context, func(*error), error) { +func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string, _ ...topo.LockOption) (context.Context, func(*error), error) { dr.drLog.Logf("Lock keyspace %s", keyspace) return ctx, func(e *error) { dr.drLog.Logf("Unlock keyspace %s", keyspace) diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go index 0780aaf484c..54923ffd5e3 100644 --- a/go/vt/vtctl/workflow/switcher_interface.go +++ b/go/vt/vtctl/workflow/switcher_interface.go @@ -20,11 +20,13 @@ import ( "context" "time" + "vitess.io/vitess/go/vt/topo" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) type iswitcher interface { - lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) + lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) cancelMigration(ctx context.Context, sm *StreamMigrator) stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error) stopSourceWrites(ctx context.Context) error diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 93f61af197d..f91a82b9d2c 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -119,7 +119,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { log.Infof("Locking workflow %s", lockName) ctx, unlock, lockErr := td.wd.ct.ts.LockName(ctx, lockName, "vdiff") if lockErr != nil { - log.Errorf("LockName failed: %v", lockErr) + log.Errorf("Locking workfkow %s failed: %v", lockName, lockErr) return lockErr } From 3a9fa7e159dd678791100bc00e8e4a3bd8c1ac44 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 27 Jun 2024 12:20:50 -0400 Subject: [PATCH 03/14] Add tests Signed-off-by: Matt Lord --- go/test/endtoend/topotest/etcd2/main_test.go | 69 ++++++++++++++++ go/vt/topo/keyspace_lock_test.go | 35 +++++++- go/vt/topo/locks.go | 9 +- go/vt/topo/memorytopo/lock.go | 1 + go/vt/topo/named_lock_test.go | 87 ++++++++++++++++++++ go/vt/topo/routing_rules_lock.go | 4 +- go/vt/topo/shard_lock.go | 4 +- go/vt/topo/stats_conn.go | 10 ++- go/vt/topo/stats_conn_test.go | 26 +++--- 9 files changed, 222 insertions(+), 23 deletions(-) create mode 100644 go/vt/topo/named_lock_test.go diff --git a/go/test/endtoend/topotest/etcd2/main_test.go b/go/test/endtoend/topotest/etcd2/main_test.go index 747f2721cdc..70e10fcd378 100644 --- a/go/test/endtoend/topotest/etcd2/main_test.go +++ b/go/test/endtoend/topotest/etcd2/main_test.go @@ -19,6 +19,7 @@ package ectd2 import ( "context" "flag" + "fmt" "os" "testing" "time" @@ -201,6 +202,74 @@ func TestKeyspaceLocking(t *testing.T) { topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true) } +// TestLockingWithTTL tests that locking with the TTL override works as intended. +func TestLockingWithTTL(t *testing.T) { + // Create the topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) + require.NoError(t, err) + + ctx := context.Background() + + // Acquire a keyspace lock with a short custom TTL. + ttl := 1 * time.Second + ctx, unlock, err := ts.LockKeyspace(ctx, KeyspaceName, "TestLockingWithTTL", topo.WithTimeToLive(ttl)) + require.NoError(t, err) + defer unlock(&err) + + // Check that CheckKeyspaceLocked DOES return an error after waiting more than + // the specified TTL as we should have lost our lock. + time.Sleep(ttl * 2) + err = topo.CheckKeyspaceLocked(ctx, KeyspaceName) + require.Error(t, err) +} + +// TestNamedLocking tests that named locking works as intended. +func TestNamedLocking(t *testing.T) { + // Create topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot) + require.NoError(t, err) + + ctx := context.Background() + lockName := "TestNamedLocking" + action := "Testing" + + // Acquire a named lock. + ctx, unlock, err := ts.LockName(ctx, lockName, action) + require.NoError(t, err) + + // Check that we can't reacquire it from the same context. + _, _, err = ts.LockName(ctx, lockName, action) + require.ErrorContains(t, err, fmt.Sprintf("lock for named %s is already held", lockName)) + + // Check that CheckNameLocked doesn't return an error as we should still be + // holding the lock. + err = topo.CheckNameLocked(ctx, lockName) + require.NoError(t, err) + + // We'll now try to acquire the lock from a different goroutine. + secondCallerAcquired := false + go func() { + _, unlock, err := ts.LockName(context.Background(), lockName, action) + defer unlock(&err) + require.NoError(t, err) + secondCallerAcquired = true + }() + + // Wait for some time and ensure that the second attempt at acquiring the lock + // is blocked. + time.Sleep(100 * time.Millisecond) + require.False(t, secondCallerAcquired) + + // Unlock the name. + unlock(&err) + // Check that we no longer have the named lock. + err = topo.CheckNameLocked(ctx, lockName) + require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName)) + + // Wait to see that the second goroutine WAS now able to acquire the named lock. + topoutils.WaitForBoolValue(t, &secondCallerAcquired, true) +} + func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result { t.Helper() var res []*sqltypes.Result diff --git a/go/vt/topo/keyspace_lock_test.go b/go/vt/topo/keyspace_lock_test.go index 6d0a34de554..4a5d95fc5f5 100644 --- a/go/vt/topo/keyspace_lock_test.go +++ b/go/vt/topo/keyspace_lock_test.go @@ -19,12 +19,14 @@ package topo_test import ( "context" "testing" + "time" "github.com/stretchr/testify/require" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // TestTopoKeyspaceLock tests keyspace lock operations. @@ -82,3 +84,34 @@ func TestTopoKeyspaceLock(t *testing.T) { require.NoError(t, err) defer unlock(&err) } + +// TestTopoKeyspaceLockWithTTL tests keyspace lock with a custom TTL. +func TestTopoKeyspaceLockWithTTL(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts, tsf := memorytopo.NewServerAndFactory(ctx, "zone1") + defer ts.Close() + + currentTopoLockTimeout := topo.LockTimeout + topo.LockTimeout = testLockTimeout + defer func() { + topo.LockTimeout = currentTopoLockTimeout + }() + + ks1 := "ks1" + ttl := time.Second + err := ts.CreateKeyspace(ctx, ks1, &topodatapb.Keyspace{}) + require.NoError(t, err) + + ctx, unlock, err := ts.LockKeyspace(ctx, ks1, "ks1", topo.WithTimeToLive(ttl)) + require.NoError(t, err) + defer unlock(&err) + + err = topo.CheckKeyspaceLocked(ctx, ks1) + require.NoError(t, err) + + // Confirm the new stats. + stats := tsf.GetCallStats() + require.NotNil(t, stats) + require.Equal(t, int64(1), stats.Counts()["LockWithTTL"]) +} diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go index e73eccee38e..0d4b40ef79d 100644 --- a/go/vt/topo/locks.go +++ b/go/vt/topo/locks.go @@ -158,7 +158,10 @@ type iTopoLock interface { // perform the topo lock operation func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, opts ...LockOption) (LockDescriptor, error) { - log.Infof("Locking %v %v for action %v", lt.Type(), lt.ResourceName(), l.Action) + for _, o := range opts { + o.apply(&l.Options) + } + log.Infof("Locking %v %v for action %v with options: %+v", lt.Type(), lt.ResourceName(), l.Action, l.Options) ctx, cancel := context.WithTimeout(ctx, LockTimeout) defer cancel() @@ -172,10 +175,6 @@ func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, opts ...LockO return nil, err } - for _, o := range opts { - o.apply(&l.Options) - } - switch l.Options.lockType { case NonBlocking: return ts.globalCell.TryLock(ctx, lt.Path(), j) diff --git a/go/vt/topo/memorytopo/lock.go b/go/vt/topo/memorytopo/lock.go index 09beaddaf55..0dafcf250ed 100644 --- a/go/vt/topo/memorytopo/lock.go +++ b/go/vt/topo/memorytopo/lock.go @@ -86,6 +86,7 @@ func (c *Conn) LockWithTTL(ctx context.Context, dirPath, contents string, _ time // LockName is part of the topo.Conn interface. func (c *Conn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) { + c.factory.callstats.Add([]string{"LockName"}, 1) return c.lock(ctx, dirPath, contents, true) } diff --git a/go/vt/topo/named_lock_test.go b/go/vt/topo/named_lock_test.go new file mode 100644 index 00000000000..1f4c97e31ec --- /dev/null +++ b/go/vt/topo/named_lock_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topo_test + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +// TestTopoNamedLock tests named lock operations. +func TestTopoNamedLock(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts, tsf := memorytopo.NewServerAndFactory(ctx, "zone1") + defer ts.Close() + + currentTopoLockTimeout := topo.LockTimeout + topo.LockTimeout = testLockTimeout + defer func() { + topo.LockTimeout = currentTopoLockTimeout + }() + + lockName := "testy" + action := "testing" + lockNameCalls := int64(0) + + ctx, unlock, err := ts.LockName(ctx, lockName, action) + require.NoError(t, err) + lockNameCalls++ + + // Locking the same name again, without unlocking, should return an error. + // This does not attempt the lock within the topo server implementation + // as we first check the context and see that the lock is held, thus the + // lockNameCalls should not be increased. + _, _, err = ts.LockName(ctx, lockName, action) + require.ErrorContains(t, err, fmt.Sprintf("%s is already held", lockName)) + + // Check that we have the named lock. + err = topo.CheckNameLocked(ctx, lockName) + require.NoError(t, err) + + // Check we can acquire a different named lock. + lockName2 := "testy2" + ctx, unlock2, err := ts.LockName(ctx, lockName2, action) + require.NoError(t, err) + defer unlock2(&err) + lockNameCalls++ + + // Unlock the first name. + unlock(&err) + + err = topo.CheckNameLocked(ctx, lockName) + require.ErrorContains(t, err, fmt.Sprintf("%s is not locked", lockName)) + err = topo.CheckNameLocked(ctx, lockName2) + require.NoError(t, err) + + // Confirm that the first named lock can be re-acquired after unlocking. + _, unlock, err = ts.LockName(ctx, lockName, action) + require.NoError(t, err) + defer unlock(&err) + lockNameCalls++ + + // Confirm the stats. + stats := tsf.GetCallStats() + require.NotNil(t, stats) + require.Equal(t, lockNameCalls, stats.Counts()["LockName"]) +} diff --git a/go/vt/topo/routing_rules_lock.go b/go/vt/topo/routing_rules_lock.go index 848f4eaf6bd..682eff30dc5 100644 --- a/go/vt/topo/routing_rules_lock.go +++ b/go/vt/topo/routing_rules_lock.go @@ -37,8 +37,8 @@ func (s *routingRules) Path() string { } // LockRoutingRules acquires a lock for routing rules. -func (ts *Server) LockRoutingRules(ctx context.Context, action string) (context.Context, func(*error), error) { - return ts.internalLock(ctx, &routingRules{}, action) +func (ts *Server) LockRoutingRules(ctx context.Context, action string, opts ...LockOption) (context.Context, func(*error), error) { + return ts.internalLock(ctx, &routingRules{}, action, opts...) } // CheckRoutingRulesLocked checks if a lock for routing rules is still possessed. diff --git a/go/vt/topo/shard_lock.go b/go/vt/topo/shard_lock.go index 829c9aeeda3..23326dcd23f 100644 --- a/go/vt/topo/shard_lock.go +++ b/go/vt/topo/shard_lock.go @@ -59,11 +59,11 @@ func (s *shardLock) Path() string { // // * operations that we don't want to conflict with re-parenting: // - DeleteTablet when it's the shard's current primary -func (ts *Server) LockShard(ctx context.Context, keyspace, shard, action string) (context.Context, func(*error), error) { +func (ts *Server) LockShard(ctx context.Context, keyspace, shard, action string, opts ...LockOption) (context.Context, func(*error), error) { return ts.internalLock(ctx, &shardLock{ keyspace: keyspace, shard: shard, - }, action) + }, action, opts...) } // TryLockShard will lock the shard, and return: diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go index 2636cec7951..39bc8c9bc43 100644 --- a/go/vt/topo/stats_conn.go +++ b/go/vt/topo/stats_conn.go @@ -164,7 +164,7 @@ func (st *StatsConn) Lock(ctx context.Context, dirPath, contents string) (LockDe // LockWithTTL is part of the Conn interface func (st *StatsConn) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (LockDescriptor, error) { - return st.internalLock(ctx, dirPath, contents, Blocking, 0) + return st.internalLock(ctx, dirPath, contents, Blocking, ttl) } // LockName is part of the Conn interface @@ -179,7 +179,13 @@ func (st *StatsConn) TryLock(ctx context.Context, dirPath, contents string) (Loc // TryLock is part of the topo.Conn interface. Its implementation is same as Lock func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, lockType LockType, ttl time.Duration) (LockDescriptor, error) { - statsKey := []string{"Lock", st.cell} + statsKey := []string{"Lock", st.cell} // Also used for NonBlocking / TryLock + switch { + case lockType == Named: + statsKey[0] = "LockName" + case ttl != 0: + statsKey[0] = "LockWithTTL" + } if st.readOnly { return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, readOnlyErrorStrFormat, statsKey[0], dirPath) } diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index 59dbd795d01..605487697cc 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" ) @@ -322,23 +324,25 @@ func TestStatsConnTopoLock(t *testing.T) { statsConn.Lock(ctx, "", "") timingCounts := topoStatsConnTimings.Counts()["Lock.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, timingCounts, int64(1)) - // error is zero before getting an error + statsConn.LockWithTTL(ctx, "", "", time.Second) + timingCounts = topoStatsConnTimings.Counts()["LockWithTTL.global"] + require.Equal(t, timingCounts, int64(1)) + + statsConn.LockName(ctx, "", "") + timingCounts = topoStatsConnTimings.Counts()["LockName.global"] + require.Equal(t, timingCounts, int64(1)) + + // Error is zero before getting an error. errorCount := topoStatsConnErrors.Counts()["Lock.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, errorCount, int64(0)) statsConn.Lock(ctx, "error", "") - // error stats gets emitted + // Error stats gets emitted. errorCount = topoStatsConnErrors.Counts()["Lock.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, errorCount, int64(1)) } // TestStatsConnTopoWatch emits stats on Watch From 1ba541ad08e49d47cc4b0c0bb9f72b0e262c75d6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 28 Jun 2024 10:35:12 -0400 Subject: [PATCH 04/14] Minor changes from another self review Signed-off-by: Matt Lord --- go/test/endtoend/topotest/etcd2/main_test.go | 3 ++- go/vt/topo/etcd2topo/lock.go | 2 +- go/vt/topo/keyspace_lock_test.go | 2 +- go/vt/topo/locks.go | 10 +++++----- go/vt/topo/named_lock.go | 2 +- go/vt/topo/named_lock_test.go | 3 ++- go/vt/vtctl/workflow/server.go | 6 +++--- 7 files changed, 15 insertions(+), 13 deletions(-) diff --git a/go/test/endtoend/topotest/etcd2/main_test.go b/go/test/endtoend/topotest/etcd2/main_test.go index 70e10fcd378..3d240598948 100644 --- a/go/test/endtoend/topotest/etcd2/main_test.go +++ b/go/test/endtoend/topotest/etcd2/main_test.go @@ -212,7 +212,7 @@ func TestLockingWithTTL(t *testing.T) { // Acquire a keyspace lock with a short custom TTL. ttl := 1 * time.Second - ctx, unlock, err := ts.LockKeyspace(ctx, KeyspaceName, "TestLockingWithTTL", topo.WithTimeToLive(ttl)) + ctx, unlock, err := ts.LockKeyspace(ctx, KeyspaceName, "TestLockingWithTTL", topo.WithTTL(ttl)) require.NoError(t, err) defer unlock(&err) @@ -267,6 +267,7 @@ func TestNamedLocking(t *testing.T) { require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName)) // Wait to see that the second goroutine WAS now able to acquire the named lock. + time.Sleep(100 * time.Millisecond) topoutils.WaitForBoolValue(t, &secondCallerAcquired, true) } diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go index 5b7c0d210d3..43e4e25cc6d 100644 --- a/go/vt/topo/etcd2topo/lock.go +++ b/go/vt/topo/etcd2topo/lock.go @@ -35,7 +35,7 @@ import ( ) var ( - leaseTTL = 30 + leaseTTL = 30 // This is the default used for all locks ) func init() { diff --git a/go/vt/topo/keyspace_lock_test.go b/go/vt/topo/keyspace_lock_test.go index 4a5d95fc5f5..35fd804db9a 100644 --- a/go/vt/topo/keyspace_lock_test.go +++ b/go/vt/topo/keyspace_lock_test.go @@ -103,7 +103,7 @@ func TestTopoKeyspaceLockWithTTL(t *testing.T) { err := ts.CreateKeyspace(ctx, ks1, &topodatapb.Keyspace{}) require.NoError(t, err) - ctx, unlock, err := ts.LockKeyspace(ctx, ks1, "ks1", topo.WithTimeToLive(ttl)) + ctx, unlock, err := ts.LockKeyspace(ctx, ks1, ks1, topo.WithTTL(ttl)) require.NoError(t, err) defer unlock(&err) diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go index 0d4b40ef79d..6abe3d363e7 100644 --- a/go/vt/topo/locks.go +++ b/go/vt/topo/locks.go @@ -47,8 +47,8 @@ var ( ) // How long named locks are kept in the topo server. -// This ensures that orphaned named locks are not kept around. This -// should never happen, but it provides a final safety net. +// This ensures that orphaned named locks are not kept around forever. +// This should never happen, but it provides a final safety net. const NamedLockTTL = 24 * time.Hour // Lock describes a long-running lock on a keyspace or a shard. @@ -161,7 +161,7 @@ func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, opts ...LockO for _, o := range opts { o.apply(&l.Options) } - log.Infof("Locking %v %v for action %v with options: %+v", lt.Type(), lt.ResourceName(), l.Action, l.Options) + log.Infof("Locking %s %s for action %s with options: %+v", lt.Type(), lt.ResourceName(), l.Action, l.Options) ctx, cancel := context.WithTimeout(ctx, LockTimeout) defer cancel() @@ -314,14 +314,14 @@ func newFuncLockOption(f func(*lockOptions)) *funcLockOption { } } -// WithTimeToLive allows you to specify how long the underlying topo server +// WithTTL allows you to specify how long the underlying topo server // implementation should hold the lock before releasing it — even if the caller // has not explicitly released it. This provides a way to override the global // ttl values that are set via --topo_consul_lock_session_ttl and // --topo_etcd_lease_ttl. // Note: This option is ignored by the ZooKeeper implementation as it does not // support TTLs. -func WithTimeToLive(ttl time.Duration) LockOption { +func WithTTL(ttl time.Duration) LockOption { return newFuncLockOption(func(o *lockOptions) { o.ttl = ttl }) diff --git a/go/vt/topo/named_lock.go b/go/vt/topo/named_lock.go index 124fd5dca9a..533317f8d9d 100644 --- a/go/vt/topo/named_lock.go +++ b/go/vt/topo/named_lock.go @@ -49,7 +49,7 @@ func (ts *Server) LockName(ctx context.Context, name, action string) (context.Co }, action, WithType(Named)) } -// CheckNamedLocked can be called on a context to make sure we have the lock +// CheckNameLocked can be called on a context to make sure we have the lock // for a given opaque identifier. func CheckNameLocked(ctx context.Context, name string) error { return checkLocked(ctx, &namedLock{ diff --git a/go/vt/topo/named_lock_test.go b/go/vt/topo/named_lock_test.go index 1f4c97e31ec..a2bb82c73e9 100644 --- a/go/vt/topo/named_lock_test.go +++ b/go/vt/topo/named_lock_test.go @@ -59,7 +59,7 @@ func TestTopoNamedLock(t *testing.T) { err = topo.CheckNameLocked(ctx, lockName) require.NoError(t, err) - // Check we can acquire a different named lock. + // Confirm that we can acquire a different named lock. lockName2 := "testy2" ctx, unlock2, err := ts.LockName(ctx, lockName2, action) require.NoError(t, err) @@ -69,6 +69,7 @@ func TestTopoNamedLock(t *testing.T) { // Unlock the first name. unlock(&err) + // Confirm that we no longer have the first named lock. err = topo.CheckNameLocked(ctx, lockName) require.ErrorContains(t, err, fmt.Sprintf("%s is not locked", lockName)) err = topo.CheckNameLocked(ctx, lockName2) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 3a05bfd0d3e..55d568a21da 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3273,7 +3273,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc cmdTimeout = &vttimepb.Duration{Seconds: 30} } ksLockTimeout := time.Duration(int64(time.Second) * (cmdTimeout.Seconds * 2)) - ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTimeToLive(ksLockTimeout)) + ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTimeout)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } @@ -3357,13 +3357,13 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit cmdTimeout = &vttimepb.Duration{Seconds: 30} } ksLockTimeout := time.Duration(int64(time.Second) * (cmdTimeout.Seconds * 2)) - ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", topo.WithTimeToLive(ksLockTimeout)) + ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTimeout)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer sourceUnlock(&err) if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites", topo.WithTimeToLive(ksLockTimeout)) + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTimeout)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } From f5d386fb4b3e679c5e0c01672a1f880b58e802e7 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 28 Jun 2024 10:36:33 -0400 Subject: [PATCH 05/14] Remove unnecessary wait in test Signed-off-by: Matt Lord --- go/test/endtoend/topotest/etcd2/main_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/go/test/endtoend/topotest/etcd2/main_test.go b/go/test/endtoend/topotest/etcd2/main_test.go index 3d240598948..67b0dbbc8f7 100644 --- a/go/test/endtoend/topotest/etcd2/main_test.go +++ b/go/test/endtoend/topotest/etcd2/main_test.go @@ -267,7 +267,6 @@ func TestNamedLocking(t *testing.T) { require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName)) // Wait to see that the second goroutine WAS now able to acquire the named lock. - time.Sleep(100 * time.Millisecond) topoutils.WaitForBoolValue(t, &secondCallerAcquired, true) } From 4119a706be18efcf554de8cb9bc7bfdb646bc213 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 28 Jun 2024 13:33:35 -0400 Subject: [PATCH 06/14] Correct action string in LockName calls Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 55d568a21da..22e7c02764d 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1463,7 +1463,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl // When creating the workflow, locking the workflow and its target keyspace is sufficient. lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) - ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "SwitchWrites") + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "MoveTablesCreate") if lockErr != nil { ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) return nil, lockErr @@ -2582,7 +2582,7 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, // Lock the workflow along with its source and target keyspaces. lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) - ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "SwitchWrites") + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropTargets") if lockErr != nil { ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) } @@ -2780,7 +2780,7 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy // Lock the workflow and its source and target keyspaces. lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) - ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "SwitchWrites") + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropSources") if lockErr != nil { ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) } @@ -3021,7 +3021,7 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche // Lock the workflow and its target keyspace. lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) - ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "SwitchWrites") + ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "completeMigrateWorkflow") if lockErr != nil { ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr) } From 90143d5a524bab353e7837ae09a25db1f4586f27 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 29 Jun 2024 12:15:01 -0400 Subject: [PATCH 07/14] Tie timeout value together across vtctldclient and vtctld Signed-off-by: Matt Lord --- .../vreplication/common/switchtraffic.go | 3 ++ .../command/vreplication/common/utils.go | 3 +- go/vt/topo/consultopo/server.go | 2 +- go/vt/topo/etcd2topo/lock.go | 2 +- go/vt/vtctl/workflow/server.go | 52 +++++++++++++------ 5 files changed, 42 insertions(+), 20 deletions(-) diff --git a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go index 4004afc0ac0..3429c10303c 100644 --- a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go +++ b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go @@ -48,6 +48,9 @@ func GetSwitchTrafficCommand(opts *SubCommandsOpts) *cobra.Command { topodatapb.TabletType_RDONLY, } } + if SwitchTrafficOptions.Timeout.Seconds() < 1 { + return fmt.Errorf("timeout value must be at least 1 second") + } return nil }, RunE: commandSwitchTraffic, diff --git a/go/cmd/vtctldclient/command/vreplication/common/utils.go b/go/cmd/vtctldclient/command/vreplication/common/utils.go index a742f31a9ff..34d0773c03a 100644 --- a/go/cmd/vtctldclient/command/vreplication/common/utils.go +++ b/go/cmd/vtctldclient/command/vreplication/common/utils.go @@ -47,7 +47,6 @@ var ( } onDDLDefault = binlogdatapb.OnDDLAction_IGNORE.String() MaxReplicationLagDefault = 30 * time.Second - TimeoutDefault = 30 * time.Second BaseOptions = struct { Workflow string @@ -239,7 +238,7 @@ var SwitchTrafficOptions = struct { func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences bool) { cmd.Flags().StringSliceVarP(&SwitchTrafficOptions.Cells, "cells", "c", nil, "Cells and/or CellAliases to switch traffic in.") cmd.Flags().Var((*topoproto.TabletTypeListFlag)(&SwitchTrafficOptions.TabletTypes), "tablet-types", "Tablet types to switch traffic for.") - cmd.Flags().DurationVar(&SwitchTrafficOptions.Timeout, "timeout", TimeoutDefault, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.") + cmd.Flags().DurationVar(&SwitchTrafficOptions.Timeout, "timeout", workflow.DefaultTimeout, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.") cmd.Flags().DurationVar(&SwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", MaxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this.") cmd.Flags().BoolVar(&SwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original source keyspace to support rolling back the traffic cutover.") cmd.Flags().BoolVar(&SwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred.") diff --git a/go/vt/topo/consultopo/server.go b/go/vt/topo/consultopo/server.go index c6890820c2d..ab61a40b1e8 100644 --- a/go/vt/topo/consultopo/server.go +++ b/go/vt/topo/consultopo/server.go @@ -111,7 +111,7 @@ type Server struct { locks map[string]*lockInstance lockChecks []string - lockTTL string // This is the default used for all locks + lockTTL string // This is the default used for all non-named locks lockDelay time.Duration } diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go index 43e4e25cc6d..7fb761611cd 100644 --- a/go/vt/topo/etcd2topo/lock.go +++ b/go/vt/topo/etcd2topo/lock.go @@ -35,7 +35,7 @@ import ( ) var ( - leaseTTL = 30 // This is the default used for all locks + leaseTTL = 30 // This is the default used for all non-named locks ) func init() { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 22e7c02764d..27ebae736b7 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -80,6 +80,8 @@ const ( rdonlyTabletSuffix = "@rdonly" // Globally routable tables don't have a keyspace prefix. globalTableQualifier = "" + // Default duration used for lag, timeout, etc. + DefaultTimeout = 30 * time.Second ) var tabletTypeSuffixes = []string{primaryTabletSuffix, replicaTabletSuffix, rdonlyTabletSuffix} @@ -129,9 +131,6 @@ const ( lockTablesCycles = 2 // Time to wait between LOCK TABLES cycles on the sources during SwitchWrites. lockTablesCycleDelay = time.Duration(100 * time.Millisecond) - - // Default duration used for lag, timeout, etc. - defaultDuration = 30 * time.Second ) var ( @@ -2543,7 +2542,7 @@ func (s *Server) optimizeCopyStateTable(tablet *topodatapb.Tablet) { s.sem.Release(1) } }() - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout) defer cancel() sqlOptimizeTable := "optimize table _vt.copy_state" if _, err := s.tmc.ExecuteFetchAsAllPrivs(ctx, tablet, &tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest{ @@ -3060,13 +3059,13 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor rdDryRunResults, wrDryRunResults *[]string hasReplica, hasRdonly, hasPrimary bool ) - timeout, set, err := protoutil.DurationFromProto(req.Timeout) + timeout, set, err := protoutil.DurationFromProto(req.GetTimeout()) if err != nil { err = vterrors.Wrapf(err, "unable to parse Timeout into a valid duration") return nil, err } if !set { - timeout = defaultDuration + timeout = DefaultTimeout } ts, startState, err := s.getWorkflowState(ctx, req.Keyspace, req.Workflow) if err != nil { @@ -3083,7 +3082,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, err } if !set { - maxReplicationLagAllowed = defaultDuration + maxReplicationLagAllowed = DefaultTimeout } direction := TrafficSwitchDirection(req.Direction) if direction == DirectionBackward { @@ -3268,11 +3267,22 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // For switching reads, locking the source keyspace is sufficient. // We need to hold the keyspace locks longer than the command timeout. - cmdTimeout := req.GetTimeout() - if cmdTimeout == nil { - cmdTimeout = &vttimepb.Duration{Seconds: 30} + // We need to hold the keyspace locks longer than the command timeout. + cmdTimeout, set, err := protoutil.DurationFromProto(req.GetTimeout()) + if err != nil { + return nil, vterrors.Wrapf(err, "unable to parse Timeout into a valid duration") + } + if !set { + cmdTimeout = DefaultTimeout + } + // We enforce the 1 second minimum as Etcd only takes a seconds value so + // you'd get unexpected behavior if you e.g. set the timeout to 500ms as + // Etcd would get a value of 0 or a never-ending TTL. + if cmdTimeout.Seconds() < 1 { + return nil, vterrors.Wrapf(err, "Timeout must be at least 1 second") } - ksLockTimeout := time.Duration(int64(time.Second) * (cmdTimeout.Seconds * 2)) + // Give ourselves extra time to be sure the lock is not lost. + ksLockTimeout := cmdTimeout * 2 ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTimeout)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) @@ -3352,11 +3362,21 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } defer workflowUnlock(&err) // We need to hold the keyspace locks longer than the command timeout. - cmdTimeout := req.GetTimeout() - if cmdTimeout == nil { - cmdTimeout = &vttimepb.Duration{Seconds: 30} + cmdTimeout, set, err := protoutil.DurationFromProto(req.GetTimeout()) + if err != nil { + return handleError("unable to parse Timeout into a valid duration", err) + } + if !set { + cmdTimeout = DefaultTimeout + } + // We enforce the 1 second minimum as Etcd only takes a seconds value so + // you'd get unexpected behavior if you e.g. set the timeout to 500ms as + // Etcd would get a value of 0 or a never-ending TTL. + if cmdTimeout.Seconds() < 1 { + return handleError("Timeout must be at least 1 second", err) } - ksLockTimeout := time.Duration(int64(time.Second) * (cmdTimeout.Seconds * 2)) + // Give ourselves extra time to be sure the lock is not lost. + ksLockTimeout := cmdTimeout * 2 ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTimeout)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) @@ -3679,7 +3699,7 @@ func (s *Server) applySQLShard(ctx context.Context, tabletInfo *topo.TabletInfo, if err != nil { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "fillStringTemplate failed: %v", err) } - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + ctx, cancel := context.WithTimeout(ctx, DefaultTimeout) defer cancel() // Need to make sure that replication is enabled since we're only applying // the statement on primaries. From f4de4758fd57303e666cc88e9efad157503a4315 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 29 Jun 2024 14:53:30 -0400 Subject: [PATCH 08/14] Check and confirm that locks were not lost during traffic switches Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 43 ++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 27ebae736b7..f58e448c3b0 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3288,6 +3288,15 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer unlock(&err) + confirmKeyspaceLocksHeld := func() error { + if req.DryRun { // We don't actually take locks + return nil + } + if err := topo.CheckKeyspaceLocked(ctx, ts.SourceKeyspaceName()); err != nil { + return vterrors.Wrapf(err, "%s keyspace lock was lost", ts.SourceKeyspaceName()) + } + return nil + } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { switch { @@ -3307,11 +3316,18 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } return sw.logs(), nil } + + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the shards", err) } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("switchShardReads Completed: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", @@ -3390,6 +3406,18 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit ctx = lockCtx defer targetUnlock(&err) } + confirmKeyspaceLocksHeld := func() error { + if req.DryRun { // We don't actually take locks + return nil + } + if err := topo.CheckKeyspaceLocked(ctx, ts.SourceKeyspaceName()); err != nil { + return vterrors.Wrapf(err, "%s keyspace lock was lost", ts.SourceKeyspaceName()) + } + if err := topo.CheckKeyspaceLocked(ctx, ts.TargetKeyspaceName()); err != nil { + return vterrors.Wrapf(err, "%s keyspace lock was lost", ts.TargetKeyspaceName()) + } + return nil + } // Find out if the target is using any sequence tables for auto_increment // value generation. If so, then we'll need to ensure that they are @@ -3468,30 +3496,45 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("Waiting for streams to catchup") if err := sw.waitForCatchup(ctx, timeout); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to sync up replication between the source and target", err) } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("Migrating streams") if err := sw.migrateStreams(ctx, sm); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to migrate the workflow streams", err) } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("Resetting sequences") if err := sw.resetSequences(ctx); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to reset the sequences", err) } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } ts.Logger().Infof("Creating reverse streams") if err := sw.createReverseVReplication(ctx); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to create the reverse vreplication streams", err) } + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } // Initialize any target sequences, if there are any, before allowing new writes. if req.InitializeTargetSequences && len(sequenceMetadata) > 0 { ts.Logger().Infof("Initializing target sequences") From ebc66a073f141f0824249deffa66c8d02138803f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 30 Jun 2024 11:59:49 -0400 Subject: [PATCH 09/14] Minor improvements on self review Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 52 ++++++++++++++-------------------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index f58e448c3b0..b78fc5e0d68 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3067,6 +3067,12 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if !set { timeout = DefaultTimeout } + // We enforce the 1 second minimum as some things that use it, such as Etcd, only takes + // a seconds value so you'd get unexpected behavior if you e.g. set the timeout to + // 500ms as Etcd would get a value of 0 or a never-ending TTL. + if timeout.Seconds() < 1 { + return nil, vterrors.Wrap(err, "Timeout must be at least 1 second") + } ts, startState, err := s.getWorkflowState(ctx, req.Keyspace, req.Workflow) if err != nil { return nil, err @@ -3267,22 +3273,13 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // For switching reads, locking the source keyspace is sufficient. // We need to hold the keyspace locks longer than the command timeout. - // We need to hold the keyspace locks longer than the command timeout. - cmdTimeout, set, err := protoutil.DurationFromProto(req.GetTimeout()) + ksLockTimeout, set, err := protoutil.DurationFromProto(req.GetTimeout()) if err != nil { return nil, vterrors.Wrapf(err, "unable to parse Timeout into a valid duration") } if !set { - cmdTimeout = DefaultTimeout + ksLockTimeout = DefaultTimeout } - // We enforce the 1 second minimum as Etcd only takes a seconds value so - // you'd get unexpected behavior if you e.g. set the timeout to 500ms as - // Etcd would get a value of 0 or a never-ending TTL. - if cmdTimeout.Seconds() < 1 { - return nil, vterrors.Wrapf(err, "Timeout must be at least 1 second") - } - // Give ourselves extra time to be sure the lock is not lost. - ksLockTimeout := cmdTimeout * 2 ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTimeout)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) @@ -3338,7 +3335,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } // switchWrites is a generic way of migrating write traffic for a workflow. -func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, timeout time.Duration, +func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest, ts *trafficSwitcher, waitTimeout time.Duration, cancel bool, ) (journalID int64, dryRunResults *[]string, err error) { var sw iswitcher @@ -3377,22 +3374,12 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError(fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr) } defer workflowUnlock(&err) - // We need to hold the keyspace locks longer than the command timeout. - cmdTimeout, set, err := protoutil.DurationFromProto(req.GetTimeout()) - if err != nil { - return handleError("unable to parse Timeout into a valid duration", err) - } - if !set { - cmdTimeout = DefaultTimeout - } - // We enforce the 1 second minimum as Etcd only takes a seconds value so - // you'd get unexpected behavior if you e.g. set the timeout to 500ms as - // Etcd would get a value of 0 or a never-ending TTL. - if cmdTimeout.Seconds() < 1 { - return handleError("Timeout must be at least 1 second", err) - } - // Give ourselves extra time to be sure the lock is not lost. - ksLockTimeout := cmdTimeout * 2 + + // We need to hold the keyspace locks longer than waitTimeout*X -- where X + // is the number of sub-steps where the waitTimeout value is used: stopping + // existing streams, waiting for replication to catch up, and initializing + // the target sequences -- to be sure the lock is not lost. + ksLockTimeout := waitTimeout * 3 ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTimeout)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) @@ -3468,7 +3455,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // materializations then we have to wait for them to catchup before switching traffic for the // Reshard workflow. We use the the same timeout value here that is used for VReplication catchup // with the inter-keyspace workflows. - stopCtx, stopCancel := context.WithTimeout(ctx, timeout) + stopCtx, stopCancel := context.WithTimeout(ctx, waitTimeout) defer stopCancel() sourceWorkflows, err = sw.stopStreams(stopCtx, sm) if err != nil { @@ -3500,7 +3487,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError("locks were lost", err) } ts.Logger().Infof("Waiting for streams to catchup") - if err := sw.waitForCatchup(ctx, timeout); err != nil { + if err := sw.waitForCatchup(ctx, waitTimeout); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to sync up replication between the source and target", err) } @@ -3541,7 +3528,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // Writes are blocked so we can safely initialize the sequence tables but // we also want to use a shorter timeout than the parent context. // We use at most half of the overall timeout. - initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) + initSeqCtx, cancel := context.WithTimeout(ctx, waitTimeout/2) defer cancel() if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { sw.cancelMigration(ctx, sm) @@ -3561,6 +3548,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // This is the point of no return. Once a journal is created, // traffic can be redirected to target shards. + if err := confirmKeyspaceLocksHeld(); err != nil { + return handleError("locks were lost", err) + } if err := sw.createJournals(ctx, sourceWorkflows); err != nil { return handleError("failed to create the journal", err) } From 9ad2a964d96b48596f4944036d620ce4a1a253f2 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 30 Jun 2024 12:05:09 -0400 Subject: [PATCH 10/14] Tie vdiff's replication wait time in Signed-off-by: Matt Lord --- go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go index dfca3386491..9355049e39b 100644 --- a/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go +++ b/go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" @@ -874,7 +875,7 @@ func registerCommands(root *cobra.Command) { create.Flags().StringSliceVar(&createOptions.TargetCells, "target-cells", nil, "The target cell(s) to compare with; default is any available cell.") create.Flags().Var((*topoprotopb.TabletTypeListFlag)(&createOptions.TabletTypes), "tablet-types", "Tablet types to use on the source and target.") create.Flags().BoolVar(&common.CreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.") - create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.") + create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", workflow.DefaultTimeout, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.") create.Flags().Int64Var(&createOptions.Limit, "limit", math.MaxInt64, "Max rows to stop comparing after.") create.Flags().BoolVar(&createOptions.DebugQuery, "debug-query", false, "Adds a mysql query to the report that can be used for further debugging.") create.Flags().Int64Var(&createOptions.MaxReportSampleRows, "max-report-sample-rows", 10, "Maximum number of row differences to report (0 for all differences). NOTE: when increasing this value it is highly recommended to also specify --only-pks") From c284f6a6ba1e935b60782c663ae10d8238f768c2 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 30 Jun 2024 18:30:02 -0400 Subject: [PATCH 11/14] Nitty var name and comment changes Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index b78fc5e0d68..df04f476dc0 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3273,14 +3273,14 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // For switching reads, locking the source keyspace is sufficient. // We need to hold the keyspace locks longer than the command timeout. - ksLockTimeout, set, err := protoutil.DurationFromProto(req.GetTimeout()) + ksLockTTL, set, err := protoutil.DurationFromProto(req.GetTimeout()) if err != nil { return nil, vterrors.Wrapf(err, "unable to parse Timeout into a valid duration") } if !set { - ksLockTimeout = DefaultTimeout + ksLockTTL = DefaultTimeout } - ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTimeout)) + ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTTL)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } @@ -3379,14 +3379,14 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // is the number of sub-steps where the waitTimeout value is used: stopping // existing streams, waiting for replication to catch up, and initializing // the target sequences -- to be sure the lock is not lost. - ksLockTimeout := waitTimeout * 3 - ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTimeout)) + ksLockTTL := waitTimeout * 3 + ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTTL)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer sourceUnlock(&err) if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTimeout)) + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTTL)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } @@ -3526,8 +3526,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit if req.InitializeTargetSequences && len(sequenceMetadata) > 0 { ts.Logger().Infof("Initializing target sequences") // Writes are blocked so we can safely initialize the sequence tables but - // we also want to use a shorter timeout than the parent context. - // We use at most half of the overall timeout. + // we also want to use a shorter timeout than the the default. initSeqCtx, cancel := context.WithTimeout(ctx, waitTimeout/2) defer cancel() if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { From 02971c117c564e36dfb0416fd06d6c3bcf104728 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 8 Jul 2024 10:07:48 -0400 Subject: [PATCH 12/14] Fixup after merge Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 791836dc9e8..06ebce25b78 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3287,7 +3287,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } // For reads, locking the source keyspace is sufficient. - ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads") + ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTTL)) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } @@ -3387,7 +3387,12 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // existing streams, waiting for replication to catch up, and initializing // the target sequences -- to be sure the lock is not lost. ksLockTTL := waitTimeout * 3 + // Need to lock both source and target keyspaces. ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTTL)) + if lockErr != nil { + return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) + } + defer sourceUnlock(&err) // Remove mirror rules for the primary tablet type. if err := sw.mirrorTableTraffic(ctx, []topodata.TabletType{topodatapb.TabletType_PRIMARY}, 0); err != nil { @@ -3395,12 +3400,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) } - // Need to lock both source and target keyspaces. - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") - if lockErr != nil { - return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) - } - defer sourceUnlock(&err) if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTTL)) if lockErr != nil { From a84e907520a85ad66a54add8b7d98cb7063ee098 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 8 Jul 2024 10:39:15 -0400 Subject: [PATCH 13/14] More post merge fixups Signed-off-by: Matt Lord --- .../vreplication/vreplication_test_env.go | 4 +-- go/vt/vtctl/workflow/server.go | 25 ++++++++++--------- go/vt/vtctl/workflow/server_test.go | 8 +++--- go/vt/vtctl/workflow/traffic_switcher.go | 2 -- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index 238242f0e65..c62d871380d 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -17,9 +17,9 @@ limitations under the License. package vreplication var dryRunResultsSwitchWritesCustomerShard = []string{ - "Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [PRIMARY]", "Lock keyspace product", "Lock keyspace customer", + "Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [PRIMARY]", "/Stop writes on keyspace product for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:product;shard:0;position:", "Wait for vreplication on stopped streams to catchup for up to 30s", "Create reverse vreplication workflow p2c_reverse", @@ -36,8 +36,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ } var dryRunResultsReadCustomerShard = []string{ - "Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [RDONLY,REPLICA]", "Lock keyspace product", + "Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [RDONLY,REPLICA]", "Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]", "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated", "Serving VSchema will be rebuilt for the customer keyspace", diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 06ebce25b78..5b6c3f05343 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3280,12 +3280,6 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc ksLockTTL = DefaultTimeout } - // Remove mirror rules for the specified tablet types. - if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil { - return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types", - ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) - } - // For reads, locking the source keyspace is sufficient. ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTTL)) if lockErr != nil { @@ -3302,6 +3296,12 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return nil } + // Remove mirror rules for the specified tablet types. + if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil { + return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types", + ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) + } + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { switch { case ts.IsMultiTenantMigration(): @@ -3387,6 +3387,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // existing streams, waiting for replication to catch up, and initializing // the target sequences -- to be sure the lock is not lost. ksLockTTL := waitTimeout * 3 + // Need to lock both source and target keyspaces. ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTTL)) if lockErr != nil { @@ -3394,12 +3395,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } defer sourceUnlock(&err) - // Remove mirror rules for the primary tablet type. - if err := sw.mirrorTableTraffic(ctx, []topodata.TabletType{topodatapb.TabletType_PRIMARY}, 0); err != nil { - return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for primary tablet type", - ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) - } - if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites", topo.WithTTL(ksLockTTL)) if lockErr != nil { @@ -3421,6 +3416,12 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return nil } + // Remove mirror rules for the primary tablet type. + if err := sw.mirrorTableTraffic(ctx, []topodata.TabletType{topodatapb.TabletType_PRIMARY}, 0); err != nil { + return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for primary tablet type", + ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) + } + // Find out if the target is using any sequence tables for auto_increment // value generation. If so, then we'll need to ensure that they are // initialized properly before allowing new writes on the target. diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index f2b9f6b2496..c67d45bb9e6 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -689,14 +689,14 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { DryRun: true, }, want: []string{ - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", sourceKeyspaceName, targetKeyspaceName), fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", sourceKeyspaceName, targetKeyspaceName), fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, targetKeyspaceName), fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName), - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", sourceKeyspaceName, targetKeyspaceName), fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", sourceKeyspaceName, targetKeyspaceName), fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:-80;position:%s,keyspace:%s;shard:80-;position:%s]", sourceKeyspaceName, tablesStr, sourceKeyspaceName, position, sourceKeyspaceName, position), "Wait for vreplication on stopped streams to catchup for up to 30s", @@ -730,14 +730,14 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { DryRun: true, }, want: []string{ - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", targetKeyspaceName, sourceKeyspaceName), fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", targetKeyspaceName, sourceKeyspaceName), fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, targetKeyspaceName), fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), - fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", targetKeyspaceName, sourceKeyspaceName), fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [PRIMARY]", targetKeyspaceName, sourceKeyspaceName), fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:-80;position:%s,keyspace:%s;shard:80-;position:%s]", targetKeyspaceName, tablesStr, targetKeyspaceName, position, targetKeyspaceName, position), "Wait for vreplication on stopped streams to catchup for up to 30s", diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 35930bc3e97..c9d9952ef8f 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1747,8 +1747,6 @@ func (ts *trafficSwitcher) IsMultiTenantMigration() bool { } func (ts *trafficSwitcher) mirrorTableTraffic(ctx context.Context, types []topodatapb.TabletType, percent float32) error { - log.Infof("mirrorTableTraffic") - mrs, err := topotools.GetMirrorRules(ctx, ts.TopoServer()) if err != nil { return err From 8de719a542967378ad632634e172cd5d7bf7fbfd Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 8 Jul 2024 14:19:23 -0400 Subject: [PATCH 14/14] Address review comment Signed-off-by: Matt Lord --- go/vt/topo/locks.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/go/vt/topo/locks.go b/go/vt/topo/locks.go index 6abe3d363e7..f46e5f06e4b 100644 --- a/go/vt/topo/locks.go +++ b/go/vt/topo/locks.go @@ -130,21 +130,21 @@ var locksKey locksKeyType type LockType int const ( - Blocking LockType = iota - NonBlocking - Named + // Blocking is the default lock type when no other valid type + // is specified. + Blocking LockType = iota + NonBlocking // Uses TryLock + Named // Uses LockName ) func (lt LockType) String() string { switch lt { - case Blocking: - return "blocking" case NonBlocking: return "non blocking" case Named: return "named" default: - return "unknown" + return "blocking" } }