Skip to content

Commit

Permalink
Add named lock support
Browse files Browse the repository at this point in the history
And use that for VReplication workflows when coordination is
necessary, such as between the VReplicaiton engine and the VDiff
engine.

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jun 25, 2024
1 parent 048d4d2 commit e2e58c9
Show file tree
Hide file tree
Showing 18 changed files with 239 additions and 58 deletions.
8 changes: 8 additions & 0 deletions go/vt/topo/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ type Conn interface {
// Returns ErrInterrupted if ctx is canceled.
Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error)

// LockName is similar to `Lock` but the difference is that it does not require
// the path to exist and have children in order to lock it. This is because with
// named locks you are NOT locking an actual topo entity such as a Keyspace record.
// Because this lock is not blocking any Vitess operations OTHER than another
// caller that is trying to get the same named lock, there is a static 24 hour
// TTL on them to ensure that they are eventually cleaned up.
LockName(ctx context.Context, dirPath, contents string) (LockDescriptor, error)

// TryLock takes lock on the given directory with a fail-fast approach.
// It is similar to `Lock` but the difference is it attempts to acquire the lock
// if it is likely to succeed. If there is already a lock on given path, then unlike `Lock`
Expand Down
17 changes: 10 additions & 7 deletions go/vt/topo/consultopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD
return nil, convertError(err, dirPath)
}

return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, s.lockTTL)
}

// LockName is part of the topo.Conn interface.
func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
return s.lock(ctx, dirPath, contents, topo.NamedLockTTL.String())
}

// TryLock is part of the topo.Conn interface.
Expand All @@ -74,28 +79,26 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo
}

// everything is good let's acquire the lock.
return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, s.lockTTL)
}

// Lock is part of the topo.Conn interface.
func (s *Server) lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
func (s *Server) lock(ctx context.Context, dirPath, contents, ttl string) (topo.LockDescriptor, error) {
lockPath := path.Join(s.root, dirPath, locksFilename)

lockOpts := &api.LockOptions{
Key: lockPath,
Value: []byte(contents),
SessionOpts: &api.SessionEntry{
Name: api.DefaultLockSessionName,
TTL: api.DefaultLockSessionTTL,
TTL: ttl,
},
}
lockOpts.SessionOpts.Checks = s.lockChecks
if s.lockDelay > 0 {
lockOpts.SessionOpts.LockDelay = s.lockDelay
}
if s.lockTTL != "" {
lockOpts.SessionOpts.TTL = s.lockTTL
}
lockOpts.SessionOpts.TTL = ttl
// Build the lock structure.
l, err := s.client.LockOpts(lockOpts)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/consultopo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type Server struct {
locks map[string]*lockInstance

lockChecks []string
lockTTL string
lockTTL string // This is the default used for all locks
lockDelay time.Duration
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/etcd2topo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (mp *etcdLeaderParticipation) WaitForLeadership() (context.Context, error)

// Try to get the primaryship, by getting a lock.
var err error
ld, err = mp.s.lock(lockCtx, electionPath, mp.id)
ld, err = mp.s.lock(lockCtx, electionPath, mp.id, leaseTTL)
if err != nil {
// It can be that we were interrupted.
return nil, err
Expand Down
13 changes: 9 additions & 4 deletions go/vt/topo/etcd2topo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo
}

// everything is good let's acquire the lock.
return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, leaseTTL)
}

// Lock is part of the topo.Conn interface.
Expand All @@ -168,15 +168,20 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD
return nil, convertError(err, dirPath)
}

return s.lock(ctx, dirPath, contents)
return s.lock(ctx, dirPath, contents, leaseTTL)
}

// LockName is part of the topo.Conn interface.
func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
return s.lock(ctx, dirPath, contents, int(topo.NamedLockTTL.Seconds()))
}

// lock is used by both Lock() and primary election.
func (s *Server) lock(ctx context.Context, nodePath, contents string) (topo.LockDescriptor, error) {
func (s *Server) lock(ctx context.Context, nodePath, contents string, ttl int) (topo.LockDescriptor, error) {
nodePath = path.Join(s.root, nodePath, locksPath)

// Get a lease, set its KeepAlive.
lease, err := s.cli.Grant(ctx, int64(leaseTTL))
lease, err := s.cli.Grant(ctx, int64(ttl))
if err != nil {
return nil, convertError(err, nodePath)
}
Expand Down
7 changes: 7 additions & 0 deletions go/vt/topo/faketopo/faketopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,13 @@ func (f *FakeConn) Lock(ctx context.Context, dirPath, contents string) (topo.Loc
return &fakeLockDescriptor{}, nil
}

// LockName implements the Conn interface
func (f *FakeConn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
f.mu.Lock()
defer f.mu.Unlock()
return &fakeLockDescriptor{}, nil
}

// TryLock is part of the topo.Conn interface. Its implementation is same as Lock
func (f *FakeConn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
return f.Lock(ctx, dirPath, contents)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/keyspace_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *keyspaceLock) Path() string {
func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) {
return ts.internalLock(ctx, &keyspaceLock{
keyspace: keyspace,
}, action, true)
}, action, Blocking)
}

// CheckKeyspaceLocked can be called on a context to make sure we have the lock
Expand Down
42 changes: 37 additions & 5 deletions go/vt/topo/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ var (
RemoteOperationTimeout = 15 * time.Second
)

// How long named locks are kept in the topo server.
// This ensures that orphaned named locks are not kept around.
const NamedLockTTL = 24 * time.Hour

// Lock describes a long-running lock on a keyspace or a shard.
// It needs to be public as we JSON-serialize it.
type Lock struct {
Expand Down Expand Up @@ -120,6 +124,28 @@ type locksKeyType int

var locksKey locksKeyType

// Support different lock types.
type LockType int

const (
Blocking LockType = iota
NonBlocking
Named
)

func (lt LockType) String() string {
switch lt {
case Blocking:
return "blocking"
case NonBlocking:
return "non blocking"
case Named:
return "named"
default:
return "unknown"
}
}

// iTopoLock is the interface for knowing the resource that is being locked.
// It allows for better controlling nuances for different lock types and log messages.
type iTopoLock interface {
Expand All @@ -129,7 +155,7 @@ type iTopoLock interface {
}

// perform the topo lock operation
func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, isBlocking bool) (LockDescriptor, error) {
func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, lockType LockType) (LockDescriptor, error) {
log.Infof("Locking %v %v for action %v", lt.Type(), lt.ResourceName(), l.Action)

ctx, cancel := context.WithTimeout(ctx, LockTimeout)
Expand All @@ -143,10 +169,16 @@ func (l *Lock) lock(ctx context.Context, ts *Server, lt iTopoLock, isBlocking bo
if err != nil {
return nil, err
}
if isBlocking {
switch lockType {
case Blocking:
return ts.globalCell.Lock(ctx, lt.Path(), j)
case NonBlocking:
return ts.globalCell.TryLock(ctx, lt.Path(), j)
case Named:
return ts.globalCell.LockName(ctx, lt.Path(), j)
default:
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unknown lock type %s", lockType)
}
return ts.globalCell.TryLock(ctx, lt.Path(), j)
}

// unlock unlocks a previously locked key.
Expand Down Expand Up @@ -174,7 +206,7 @@ func (l *Lock) unlock(ctx context.Context, lt iTopoLock, lockDescriptor LockDesc
return lockDescriptor.Unlock(ctx)
}

func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string, isBlocking bool) (context.Context, func(*error), error) {
func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string, lockType LockType) (context.Context, func(*error), error) {
i, ok := ctx.Value(locksKey).(*locksInfo)
if !ok {
i = &locksInfo{
Expand All @@ -191,7 +223,7 @@ func (ts *Server) internalLock(ctx context.Context, lt iTopoLock, action string,

// lock it
l := newLock(action)
lockDescriptor, err := l.lock(ctx, ts, lt, isBlocking)
lockDescriptor, err := l.lock(ctx, ts, lt, lockType)
if err != nil {
return nil, nil, err
}
Expand Down
16 changes: 13 additions & 3 deletions go/vt/topo/memorytopo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,16 @@ func (c *Conn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDes
return nil, err
}

return c.lock(ctx, dirPath, contents)
return c.lock(ctx, dirPath, contents, false)
}

// LockName is part of the topo.Conn interface.
func (c *Conn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
return c.lock(ctx, dirPath, contents, true)
}

// Lock is part of the topo.Conn interface.
func (c *Conn) lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
func (c *Conn) lock(ctx context.Context, dirPath, contents string, named bool) (topo.LockDescriptor, error) {
for {
if err := c.dial(ctx); err != nil {
return nil, err
Expand All @@ -82,7 +87,12 @@ func (c *Conn) lock(ctx context.Context, dirPath, contents string) (topo.LockDes
return nil, c.factory.err
}

n := c.factory.nodeByPath(c.cell, dirPath)
var n *node
if named {
n = c.factory.getOrCreatePath(c.cell, dirPath)
} else {
n = c.factory.nodeByPath(c.cell, dirPath)
}
if n == nil {
c.factory.mu.Unlock()
return nil, topo.NewError(topo.NoNode, dirPath)
Expand Down
58 changes: 58 additions & 0 deletions go/vt/topo/named_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topo

import (
"context"
"path"
)

type namedLock struct {
name string
}

var _ iTopoLock = (*namedLock)(nil)

func (s *namedLock) Type() string {
return "named"
}

func (s *namedLock) ResourceName() string {
return s.name
}

func (s *namedLock) Path() string {
return path.Join(NamedLocksPath, s.name)
}

// LockName will lock the opaque identifier, and return:
// - a context with a locksInfo structure for future reference.
// - an unlock method
// - an error if anything failed.
func (ts *Server) LockName(ctx context.Context, name, action string) (context.Context, func(*error), error) {
return ts.internalLock(ctx, &namedLock{
name: name,
}, action, Named)
}

// CheckNamedLocked can be called on a context to make sure we have the lock
// for a given opaque identifier.
func CheckNameLocked(ctx context.Context, name string) error {
return checkLocked(ctx, &namedLock{
name: name,
})
}
2 changes: 1 addition & 1 deletion go/vt/topo/routing_rules_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *routingRules) Path() string {

// LockRoutingRules acquires a lock for routing rules.
func (ts *Server) LockRoutingRules(ctx context.Context, action string) (context.Context, func(*error), error) {
return ts.internalLock(ctx, &routingRules{}, action, true)
return ts.internalLock(ctx, &routingRules{}, action, Blocking)
}

// CheckRoutingRulesLocked checks if a lock for routing rules is still possessed.
Expand Down
1 change: 1 addition & 0 deletions go/vt/topo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const (
ExternalClusterVitess = "vitess"
RoutingRulesPath = "routing_rules"
KeyspaceRoutingRulesPath = "keyspace"
NamedLocksPath = "internal/named_locks"
)

// Factory is a factory interface to create Conn objects.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/shard_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (ts *Server) LockShard(ctx context.Context, keyspace, shard, action string)
return ts.internalLock(ctx, &shardLock{
keyspace: keyspace,
shard: shard,
}, action, true)
}, action, Blocking)
}

// TryLockShard will lock the shard, and return:
Expand All @@ -85,7 +85,7 @@ func (ts *Server) TryLockShard(ctx context.Context, keyspace, shard, action stri
return ts.internalLock(ctx, &shardLock{
keyspace: keyspace,
shard: shard,
}, action, false)
}, action, NonBlocking)
}

// CheckShardLocked can be called on a context to make sure we have the lock
Expand Down
20 changes: 15 additions & 5 deletions go/vt/topo/stats_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,21 @@ func (st *StatsConn) Delete(ctx context.Context, filePath string, version Versio

// Lock is part of the Conn interface
func (st *StatsConn) Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) {
return st.internalLock(ctx, dirPath, contents, true)
return st.internalLock(ctx, dirPath, contents, Blocking)
}

// LockName is part of the Conn interface
func (st *StatsConn) LockName(ctx context.Context, dirPath, contents string) (LockDescriptor, error) {
return st.internalLock(ctx, dirPath, contents, Named)
}

// TryLock is part of the topo.Conn interface. Its implementation is same as Lock
func (st *StatsConn) TryLock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) {
return st.internalLock(ctx, dirPath, contents, false)
return st.internalLock(ctx, dirPath, contents, NonBlocking)
}

// TryLock is part of the topo.Conn interface. Its implementation is same as Lock
func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, isBlocking bool) (LockDescriptor, error) {
func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, lockType LockType) (LockDescriptor, error) {
statsKey := []string{"Lock", st.cell}
if st.readOnly {
return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, readOnlyErrorStrFormat, statsKey[0], dirPath)
Expand All @@ -177,10 +182,15 @@ func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string,
defer topoStatsConnTimings.Record(statsKey, startTime)
var res LockDescriptor
var err error
if isBlocking {
switch lockType {
case Blocking:
res, err = st.conn.Lock(ctx, dirPath, contents)
} else {
case NonBlocking:
res, err = st.conn.TryLock(ctx, dirPath, contents)
case Named:
res, err = st.conn.LockName(ctx, dirPath, contents)
default:
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unknown lock type %s", lockType)
}
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
Expand Down
Loading

0 comments on commit e2e58c9

Please sign in to comment.