Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: use new topo named locks and TTL override for workflow coordination #16260

Merged
merged 15 commits into from
Jul 10, 2024
Merged
69 changes: 69 additions & 0 deletions go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ectd2
import (
"context"
"flag"
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -201,6 +202,74 @@ func TestKeyspaceLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestLockingWithTTL tests that locking with the TTL override works as intended.
func TestLockingWithTTL(t *testing.T) {
// Create the topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

ctx := context.Background()

// Acquire a keyspace lock with a short custom TTL.
ttl := 1 * time.Second
ctx, unlock, err := ts.LockKeyspace(ctx, KeyspaceName, "TestLockingWithTTL", topo.WithTTL(ttl))
require.NoError(t, err)
defer unlock(&err)

// Check that CheckKeyspaceLocked DOES return an error after waiting more than
// the specified TTL as we should have lost our lock.
time.Sleep(ttl * 2)
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
require.Error(t, err)
}

// TestNamedLocking tests that named locking works as intended.
func TestNamedLocking(t *testing.T) {
// Create topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)

ctx := context.Background()
lockName := "TestNamedLocking"
action := "Testing"

// Acquire a named lock.
ctx, unlock, err := ts.LockName(ctx, lockName, action)
require.NoError(t, err)

// Check that we can't reacquire it from the same context.
_, _, err = ts.LockName(ctx, lockName, action)
require.ErrorContains(t, err, fmt.Sprintf("lock for named %s is already held", lockName))

// Check that CheckNameLocked doesn't return an error as we should still be
// holding the lock.
err = topo.CheckNameLocked(ctx, lockName)
require.NoError(t, err)

// We'll now try to acquire the lock from a different goroutine.
secondCallerAcquired := false
go func() {
_, unlock, err := ts.LockName(context.Background(), lockName, action)
defer unlock(&err)
require.NoError(t, err)
secondCallerAcquired = true
}()

// Wait for some time and ensure that the second attempt at acquiring the lock
// is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondCallerAcquired)

// Unlock the name.
unlock(&err)
// Check that we no longer have the named lock.
err = topo.CheckNameLocked(ctx, lockName)
require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName))

// Wait to see that the second goroutine WAS now able to acquire the named lock.
topoutils.WaitForBoolValue(t, &secondCallerAcquired, true)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down
16 changes: 16 additions & 0 deletions go/vt/topo/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package topo
import (
"context"
"sort"
"time"
)

// Conn defines the interface that must be implemented by topology
Expand Down Expand Up @@ -120,6 +121,21 @@ type Conn interface {
// Returns ErrInterrupted if ctx is canceled.
Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error)

// LockWithTTL is similar to `Lock` but the difference is that it allows
// you to override the global default TTL that is configured for the
// implementation (--topo_etcd_lease_ttl and --topo_consul_lock_session_ttl).
// Note: this is no different than `Lock` for ZooKeeper as it does not
// support lock TTLs and they exist until released or the session ends.
LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (LockDescriptor, error)

// LockName is similar to `Lock` but the difference is that it does not require
// the path to exist and have children in order to lock it. This is because with
// named locks you are NOT locking an actual topo entity such as a Keyspace record.
// Because this lock is not blocking any Vitess operations OTHER than another
// caller that is trying to get the same named lock, there is a static 24 hour
// TTL on them to ensure that they are eventually cleaned up.
LockName(ctx context.Context, dirPath, contents string) (LockDescriptor, error)

// TryLock takes lock on the given directory with a fail-fast approach.
// It is similar to `Lock` but the difference is it attempts to acquire the lock
// if it is likely to succeed. If there is already a lock on given path, then unlike `Lock`
Expand Down
40 changes: 34 additions & 6 deletions go/vt/topo/consultopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"path"
"time"

"github.com/hashicorp/consul/api"

Expand Down Expand Up @@ -49,7 +50,27 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD
return nil, convertError(err, dirPath)
}

return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, s.lockTTL)
}

// LockWithTTL is part of the topo.Conn interface.
func (s *Server) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (topo.LockDescriptor, error) {
// We list the directory first to make sure it exists.
if _, err := s.ListDir(ctx, dirPath, false /*full*/); err != nil {
// We need to return the right error codes, like
// topo.ErrNoNode and topo.ErrInterrupted, and the
// easiest way to do this is to return convertError(err).
// It may lose some of the context, if this is an issue,
// maybe logging the error would work here.
return nil, convertError(err, dirPath)
}

return s.lock(ctx, dirPath, contents, ttl.String())
}

// LockName is part of the topo.Conn interface.
func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
return s.lock(ctx, dirPath, contents, topo.NamedLockTTL.String())
}

// TryLock is part of the topo.Conn interface.
Expand All @@ -74,11 +95,11 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo
}

// everything is good let's acquire the lock.
return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, s.lockTTL)
}

// Lock is part of the topo.Conn interface.
func (s *Server) lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
func (s *Server) lock(ctx context.Context, dirPath, contents, ttl string) (topo.LockDescriptor, error) {
lockPath := path.Join(s.root, dirPath, locksFilename)

lockOpts := &api.LockOptions{
Expand All @@ -90,12 +111,19 @@ func (s *Server) lock(ctx context.Context, dirPath, contents string) (topo.LockD
},
}
lockOpts.SessionOpts.Checks = s.lockChecks
if s.lockDelay > 0 {
lockOpts.SessionOpts.LockDelay = s.lockDelay
}
if s.lockTTL != "" {
// Override the API default with the global default from
// --topo_consul_lock_session_ttl.
lockOpts.SessionOpts.TTL = s.lockTTL
}
if ttl != "" {
// Override the global default with the one provided by the
// caller.
lockOpts.SessionOpts.TTL = ttl
}
if s.lockDelay > 0 {
lockOpts.SessionOpts.LockDelay = s.lockDelay
}
// Build the lock structure.
l, err := s.client.LockOpts(lockOpts)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/consultopo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type Server struct {
locks map[string]*lockInstance

lockChecks []string
lockTTL string
lockTTL string // This is the default used for all locks
lockDelay time.Duration
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/etcd2topo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (mp *etcdLeaderParticipation) WaitForLeadership() (context.Context, error)

// Try to get the primaryship, by getting a lock.
var err error
ld, err = mp.s.lock(lockCtx, electionPath, mp.id)
ld, err = mp.s.lock(lockCtx, electionPath, mp.id, leaseTTL)
if err != nil {
// It can be that we were interrupted.
return nil, err
Expand Down
31 changes: 26 additions & 5 deletions go/vt/topo/etcd2topo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"path"
"time"

"github.com/spf13/pflag"

Expand All @@ -34,7 +35,7 @@ import (
)

var (
leaseTTL = 30
leaseTTL = 30 // This is the default used for all locks
)

func init() {
Expand Down Expand Up @@ -153,7 +154,7 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo
}

// everything is good let's acquire the lock.
return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, leaseTTL)
}

// Lock is part of the topo.Conn interface.
Expand All @@ -168,15 +169,35 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD
return nil, convertError(err, dirPath)
}

return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, leaseTTL)
}

// LockWithTTL is part of the topo.Conn interface.
func (s *Server) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (topo.LockDescriptor, error) {
// We list the directory first to make sure it exists.
if _, err := s.ListDir(ctx, dirPath, false /*full*/); err != nil {
// We need to return the right error codes, like
// topo.ErrNoNode and topo.ErrInterrupted, and the
// easiest way to do this is to return convertError(err).
// It may lose some of the context, if this is an issue,
// maybe logging the error would work here.
return nil, convertError(err, dirPath)
}

return s.lock(ctx, dirPath, contents, int(ttl.Seconds()))
}

// LockName is part of the topo.Conn interface.
func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
return s.lock(ctx, dirPath, contents, int(topo.NamedLockTTL.Seconds()))
}

// lock is used by both Lock() and primary election.
func (s *Server) lock(ctx context.Context, nodePath, contents string) (topo.LockDescriptor, error) {
func (s *Server) lock(ctx context.Context, nodePath, contents string, ttl int) (topo.LockDescriptor, error) {
nodePath = path.Join(s.root, nodePath, locksPath)

// Get a lease, set its KeepAlive.
lease, err := s.cli.Grant(ctx, int64(leaseTTL))
lease, err := s.cli.Grant(ctx, int64(ttl))
if err != nil {
return nil, convertError(err, nodePath)
}
Expand Down
15 changes: 15 additions & 0 deletions go/vt/topo/faketopo/faketopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"strings"
"sync"
"time"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -291,6 +292,20 @@ func (f *FakeConn) Lock(ctx context.Context, dirPath, contents string) (topo.Loc
return &fakeLockDescriptor{}, nil
}

// LockWithTTL implements the Conn interface.
func (f *FakeConn) LockWithTTL(ctx context.Context, dirPath, contents string, _ time.Duration) (topo.LockDescriptor, error) {
f.mu.Lock()
defer f.mu.Unlock()
return &fakeLockDescriptor{}, nil
}

// LockName implements the Conn interface.
func (f *FakeConn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
f.mu.Lock()
defer f.mu.Unlock()
return &fakeLockDescriptor{}, nil
}

// TryLock is part of the topo.Conn interface. Its implementation is same as Lock
func (f *FakeConn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
return f.Lock(ctx, dirPath, contents)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/keyspace_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func (s *keyspaceLock) Path() string {
// - a context with a locksInfo structure for future reference.
// - an unlock method
// - an error if anything failed.
func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) {
func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string, opts ...LockOption) (context.Context, func(*error), error) {
return ts.internalLock(ctx, &keyspaceLock{
keyspace: keyspace,
}, action, true)
}, action, opts...)
}

// CheckKeyspaceLocked can be called on a context to make sure we have the lock
Expand Down
35 changes: 34 additions & 1 deletion go/vt/topo/keyspace_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package topo_test
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// TestTopoKeyspaceLock tests keyspace lock operations.
Expand Down Expand Up @@ -82,3 +84,34 @@ func TestTopoKeyspaceLock(t *testing.T) {
require.NoError(t, err)
defer unlock(&err)
}

// TestTopoKeyspaceLockWithTTL tests keyspace lock with a custom TTL.
func TestTopoKeyspaceLockWithTTL(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts, tsf := memorytopo.NewServerAndFactory(ctx, "zone1")
defer ts.Close()

currentTopoLockTimeout := topo.LockTimeout
topo.LockTimeout = testLockTimeout
defer func() {
topo.LockTimeout = currentTopoLockTimeout
}()

ks1 := "ks1"
ttl := time.Second
err := ts.CreateKeyspace(ctx, ks1, &topodatapb.Keyspace{})
require.NoError(t, err)

ctx, unlock, err := ts.LockKeyspace(ctx, ks1, ks1, topo.WithTTL(ttl))
require.NoError(t, err)
defer unlock(&err)

err = topo.CheckKeyspaceLocked(ctx, ks1)
require.NoError(t, err)

// Confirm the new stats.
stats := tsf.GetCallStats()
require.NotNil(t, stats)
require.Equal(t, int64(1), stats.Counts()["LockWithTTL"])
}
Loading
Loading