Skip to content

Commit

Permalink
Auto renew KS lock on traffic switch and vdiff init
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jun 20, 2024
1 parent 4a7ad80 commit 0962e47
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 23 deletions.
1 change: 0 additions & 1 deletion go/vt/topo/etcd2topo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"path"

"github.com/spf13/pflag"

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"

Expand Down
28 changes: 28 additions & 0 deletions go/vt/topo/keyspace_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package topo
import (
"context"
"path"
"time"

"vitess.io/vitess/go/vt/vterrors"
)

type keyspaceLock struct {
Expand Down Expand Up @@ -56,3 +59,28 @@ func CheckKeyspaceLocked(ctx context.Context, keyspace string) error {
keyspace: keyspace,
})
}

// AutoRenewKeyspaceLockLease will renew the keyspace lock lease every renewTime
// in a goroutine until the maxTime is reached -- exiting early if the context
// is cancelled, the done channel is closed, or an error is encountered.
func AutoRenewKeyspaceLockLease(ctx context.Context, keyspace string, renewTime, maxTime time.Duration, doneCh <-chan struct{}, errCh chan error) {
go func() {
ksLock := &keyspaceLock{keyspace: keyspace}
renewAttempts := int(maxTime.Seconds() / renewTime.Seconds())
for i := 0; i < renewAttempts; i++ {
time.Sleep(renewTime)
select {
case <-ctx.Done():
return
case <-doneCh:
return
default:
// Attempt to renew lease.
if err := checkLocked(ctx, ksLock); err != nil {
errCh <- vterrors.Wrapf(err, "failed to renew keyspace %s lock lease", keyspace)
return
}
}
}
}()
}
111 changes: 111 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ const (
rdonlyTabletSuffix = "@rdonly"
// Globally routable tables don't have a keyspace prefix.
globalTableQualifier = ""

maxKeyspaceLockLeaseTTL = 10 * time.Minute
)

var tabletTypeSuffixes = []string{primaryTabletSuffix, replicaTabletSuffix, rdonlyTabletSuffix}
Expand Down Expand Up @@ -3245,6 +3247,28 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}
defer unlock(&err)

// Ensure that the leases are renewed and we're able to hold the locks during the
// traffic switching.
leaseDoneCh := make(chan struct{})
leaseErrCh := make(chan error, 1)
defer func() {
close(leaseDoneCh)
for range leaseErrCh { // Drain the channel
}
close(leaseErrCh)
}()
// Renew it every 5 seconds.
topo.AutoRenewKeyspaceLockLease(ctx, ts.SourceKeyspaceName(), time.Second*5, maxKeyspaceLockLeaseTTL, leaseDoneCh, leaseErrCh)
// Check for errors in our diff goroutine and the lock lease renewal goroutine.
haveLeaseError := func() error {
select {
case err := <-leaseErrCh:
return err
default:
return nil
}
}

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
switch {
case ts.IsMultiTenantMigration():
Expand All @@ -3263,6 +3287,9 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}
return sw.logs(), nil
}
if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction)
if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil {
return handleError("failed to switch read traffic for the shards", err)
Expand Down Expand Up @@ -3326,6 +3353,29 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
defer targetUnlock(&err)
}

// Ensure that the leases are renewed and we're able to hold the locks during the
// traffic switching.
leaseDoneCh := make(chan struct{})
leaseErrCh := make(chan error, 2)
defer func() {
close(leaseDoneCh)
for range leaseErrCh { // Drain the channel
}
close(leaseErrCh)
}()
// Renew it every 5 seconds.
topo.AutoRenewKeyspaceLockLease(ctx, ts.SourceKeyspaceName(), time.Second*5, maxKeyspaceLockLeaseTTL, leaseDoneCh, leaseErrCh)
topo.AutoRenewKeyspaceLockLease(ctx, ts.TargetKeyspaceName(), time.Second*5, maxKeyspaceLockLeaseTTL, leaseDoneCh, leaseErrCh)
// Check for errors in our diff goroutine and the lock lease renewal goroutine.
haveLeaseError := func() error {
select {
case err := <-leaseErrCh:
return err
default:
return nil
}
}

// Find out if the target is using any sequence tables for auto_increment
// value generation. If so, then we'll need to ensure that they are
// initialized properly before allowing new writes on the target.
Expand All @@ -3335,18 +3385,27 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
if req.InitializeTargetSequences && ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables &&
ts.SourceKeyspaceSchema() != nil && ts.SourceKeyspaceSchema().Keyspace != nil &&
!ts.SourceKeyspaceSchema().Keyspace.Sharded {
if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
sequenceMetadata, err = ts.getTargetSequenceMetadata(ctx)
if err != nil {
return handleError(fmt.Sprintf("failed to get the sequence information in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
// If no journals exist, sourceWorkflows will be initialized by sm.MigrateStreams.
journalsExist, sourceWorkflows, err := ts.checkJournals(ctx)
if err != nil {
return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err)
}
if !journalsExist {
if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
ts.Logger().Infof("No previous journals were found. Proceeding normally.")
sm, err := BuildStreamMigrator(ctx, ts, cancel, s.env.Parser())
if err != nil {
Expand All @@ -3364,12 +3423,18 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
// For intra-keyspace materialization streams that we migrate where the source and target are
// the keyspace being resharded, we wait for those to catchup in the stopStreams path before
// we actually stop them.
if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
sw.cancelMigration(ctx, sm)
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
ts.Logger().Infof("Stopping streams")
// Use a shorter context for this since since when doing a Reshard, if there are intra-keyspace
// materializations then we have to wait for them to catchup before switching traffic for the
Expand All @@ -3388,6 +3453,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
ts.Logger().Infof("Executing LOCK TABLES on source tables %d times", lockTablesCycles)
// Doing this twice with a pause in-between to catch any writes that may have raced in between
Expand All @@ -3403,24 +3471,36 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
ts.Logger().Infof("Waiting for streams to catchup")
if err := sw.waitForCatchup(ctx, timeout); err != nil {
sw.cancelMigration(ctx, sm)
return handleError("failed to sync up replication between the source and target", err)
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
ts.Logger().Infof("Migrating streams")
if err := sw.migrateStreams(ctx, sm); err != nil {
sw.cancelMigration(ctx, sm)
return handleError("failed to migrate the workflow streams", err)
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
ts.Logger().Infof("Resetting sequences")
if err := sw.resetSequences(ctx); err != nil {
sw.cancelMigration(ctx, sm)
return handleError("failed to reset the sequences", err)
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
ts.Logger().Infof("Creating reverse streams")
if err := sw.createReverseVReplication(ctx); err != nil {
sw.cancelMigration(ctx, sm)
Expand All @@ -3429,10 +3509,16 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit

// Initialize any target sequences, if there are any, before allowing new writes.
if req.InitializeTargetSequences && len(sequenceMetadata) > 0 {
if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
ts.Logger().Infof("Initializing target sequences")
// Writes are blocked so we can safely initialize the sequence tables but
// we also want to use a shorter timeout than the parent context.
// We use at most half of the overall timeout.
if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2)
defer cancel()
if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
Expand All @@ -3444,33 +3530,58 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
if cancel {
return handleError("invalid cancel", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "traffic switching has reached the point of no return, cannot cancel"))
}
if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
ts.Logger().Infof("Journals were found. Completing the left over steps.")
// Need to gather positions in case all journals were not created.
if err := ts.gatherPositions(ctx); err != nil {
return handleError("failed to gather replication positions", err)
}
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
// This is the point of no return. Once a journal is created,
// traffic can be redirected to target shards.
if err := sw.createJournals(ctx, sourceWorkflows); err != nil {
return handleError("failed to create the journal", err)
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
if err := sw.allowTargetWrites(ctx); err != nil {
return handleError(fmt.Sprintf("failed to allow writes in the %s keyspace", ts.TargetKeyspaceName()), err)
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
if err := sw.changeRouting(ctx); err != nil {
return handleError("failed to update the routing rules", err)
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
if err := sw.streamMigraterfinalize(ctx, ts, sourceWorkflows); err != nil {
return handleError("failed to finalize the traffic switch", err)
}

if req.EnableReverseReplication {
if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
if err := sw.startReverseVReplication(ctx); err != nil {
return handleError("failed to start the reverse workflow", err)
}
}

if leaseError := haveLeaseError(); leaseError != nil {
return handleError("lost keyspace lock lease", err)
}
if err := sw.freezeTargetVReplication(ctx); err != nil {
return handleError(fmt.Sprintf("failed to freeze the workflow in the %s keyspace", ts.TargetKeyspaceName()), err)
}
Expand Down
6 changes: 2 additions & 4 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ import (
"strings"
"sync"

querypb "vitess.io/vitess/go/vt/proto/query"

"vitess.io/vitess/go/vt/vtgate/vindexes"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/sets"
Expand All @@ -46,9 +42,11 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
Expand Down
Loading

0 comments on commit 0962e47

Please sign in to comment.