Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail VTBackup early when replication or MySQL is failing #17356

Merged
merged 12 commits into from
Jan 21, 2025
24 changes: 24 additions & 0 deletions go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ const (
phaseNameTakeNewBackup = "TakeNewBackup"
phaseStatusCatchupReplicationStalled = "Stalled"
phaseStatusCatchupReplicationStopped = "Stopped"

// We will allow maximum 60 errors in a row when waiting for replication status before taking the new backup.
// As we try every second, this is equivalent to minimum 1 minute of continuously erroring before failing.
// It allows us to ignore transient errors while avoiding repeated errors in a loop (for over 60 seconds).
maximumErrorCountWhenWaitingForReplicationStatus = 60
)

var (
Expand Down Expand Up @@ -335,6 +340,14 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
if err != nil {
return fmt.Errorf("failed to initialize mysql config: %v", err)
}
ctx, cancelCtx := context.WithCancel(ctx)
backgroundCtx, cancelbackgroundCtx := context.WithCancel(backgroundCtx)
frouioui marked this conversation as resolved.
Show resolved Hide resolved
mysqld.OnFailure(func(err error) {
log.Warning("Cancelling the vtbackup context as MySQL has failed")
frouioui marked this conversation as resolved.
Show resolved Hide resolved
cancelCtx()
cancelbackgroundCtx()
})
frouioui marked this conversation as resolved.
Show resolved Hide resolved

initCtx, initCancel := context.WithTimeout(ctx, mysqlTimeout)
defer initCancel()
initMysqldAt := time.Now()
Expand Down Expand Up @@ -519,8 +532,14 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
statusErr error

waitStartTime = time.Now()

continuousErrorCount int
)
for {
if continuousErrorCount == maximumErrorCountWhenWaitingForReplicationStatus {
return fmt.Errorf("timeout waiting for replication status after %d errors", maximumErrorCountWhenWaitingForReplicationStatus)
}

select {
case <-ctx.Done():
return fmt.Errorf("error in replication catch up: %v", ctx.Err())
Expand All @@ -531,6 +550,7 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
status, statusErr = mysqld.ReplicationStatus(ctx)
if statusErr != nil {
log.Warningf("Error getting replication status: %v", statusErr)
continuousErrorCount++
continue
}
if status.Position.AtLeast(primaryPos) {
Expand All @@ -553,7 +573,11 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
if err := startReplication(ctx, mysqld, topoServer); err != nil {
log.Warningf("Failed to restart replication: %v", err)
}
continuousErrorCount++
} else {
// Since replication is working if we got here, let's reset the error count to zero.
// This allows us to avoid failing if we only have transient errors from time to time.
continuousErrorCount = 0
phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 0)
}
}
Expand Down
80 changes: 69 additions & 11 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,48 @@ var (
) Engine=InnoDB;`
)

func TestFailingReplication(t *testing.T) {
prepareCluster(t)

// Run the entire backup test
firstBackupTest(t, false)

// Insert one more row, the primary will be ahead of the last backup
_, err := primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test_failure')", keyspaceName, true)
require.NoError(t, err)

// Disable replication from the primary by removing the grants to 'vt_repl'.
_, err = primary.VttabletProcess.QueryTablet("REVOKE REPLICATION SLAVE ON *.* FROM 'vt_repl'@'%';", keyspaceName, true)
require.NoError(t, err)
_, err = primary.VttabletProcess.QueryTablet("FLUSH PRIVILEGES;", keyspaceName, true)
require.NoError(t, err)

// Take a backup with vtbackup: the process should fail entirely as it cannot replicate from the primary.
_, err = startVtBackup(t, false, false, false)
require.Error(t, err)

// keep in mind how many backups we have right now
backups, err := listBackups(shardKsName)
require.NoError(t, err)

// In 30 seconds, grant the replication permission again to 'vt_repl'.
// This will mean that vtbackup should fail to replicate for ~30 seconds, until we grant the permission again.
frouioui marked this conversation as resolved.
Show resolved Hide resolved
go func() {
<-time.After(30 * time.Second)
_, err = primary.VttabletProcess.QueryTablet("GRANT REPLICATION SLAVE ON *.* TO 'vt_repl'@'%';", keyspaceName, true)
require.NoError(t, err)
_, err = primary.VttabletProcess.QueryTablet("FLUSH PRIVILEGES;", keyspaceName, true)
require.NoError(t, err)
}()

// this will initially be stuck trying to replicate from the primary, and once we re-grant the permission in
// the goroutine above, the process will work and complete successfully.
_ = vtBackup(t, false, false, false)
verifyBackupCount(t, shardKsName, len(backups)+1)

tearDown(t, true)
}

func TestTabletInitialBackup(t *testing.T) {
// Test Initial Backup Flow
// TestTabletInitialBackup will:
Expand All @@ -59,6 +101,15 @@ func TestTabletInitialBackup(t *testing.T) {
// - Bring up a second replica, and restore from the second backup
// - list the backups, remove them

prepareCluster(t)

// Run the entire backup test
firstBackupTest(t, true)

tearDown(t, true)
}

func prepareCluster(t *testing.T) {
waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2})

dataPointReader := vtBackup(t, true, false, false)
Expand All @@ -84,11 +135,6 @@ func TestTabletInitialBackup(t *testing.T) {
"TabletExternallyReparented", primary.Alias)
require.NoError(t, err)
restore(t, replica1, "replica", "SERVING")

// Run the entire backup test
firstBackupTest(t, "replica")

tearDown(t, true)
}

func TestTabletBackupOnly(t *testing.T) {
Expand All @@ -107,12 +153,12 @@ func TestTabletBackupOnly(t *testing.T) {
replica1.VttabletProcess.ServingStatus = "NOT_SERVING"

initTablets(t, true, true)
firstBackupTest(t, "replica")
firstBackupTest(t, true)

tearDown(t, false)
}

func firstBackupTest(t *testing.T, tabletType string) {
func firstBackupTest(t *testing.T, removeBackup bool) {
// Test First Backup flow.
//
// firstBackupTest will:
Expand Down Expand Up @@ -168,11 +214,13 @@ func firstBackupTest(t *testing.T, tabletType string) {
// check the new replica has the data
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2)

removeBackups(t)
verifyBackupCount(t, shardKsName, 0)
if removeBackup {
removeBackups(t)
verifyBackupCount(t, shardKsName, 0)
}
}

func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) *opentsdb.DataPointReader {
func startVtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) (*os.File, error) {
mysqlSocket, err := os.CreateTemp("", "vtbackup_test_mysql.sock")
require.NoError(t, err)
defer os.Remove(mysqlSocket.Name())
Expand Down Expand Up @@ -207,9 +255,19 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo

log.Infof("starting backup tablet %s", time.Now())
err = localCluster.StartVtbackup(newInitDBFile, initialBackup, keyspaceName, shardName, cell, extraArgs...)
require.NoError(t, err)
if err != nil {
return nil, err
}

f, err := os.OpenFile(statsPath, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
return f, nil
}

func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) *opentsdb.DataPointReader {
f, err := startVtBackup(t, initialBackup, restartBeforeBackup, disableRedoLog)
require.NoError(t, err)
return opentsdb.NewDataPointReader(f)
}
Expand Down
18 changes: 15 additions & 3 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ type Mysqld struct {
capabilities capabilitySet

// mutex protects the fields below.
mutex sync.Mutex
onTermFuncs []func()
cancelWaitCmd chan struct{}
mutex sync.Mutex
onTermFuncs []func()
onFailureFuncs []func(error)
cancelWaitCmd chan struct{}

semiSyncType mysql.SemiSyncType
}
Expand Down Expand Up @@ -445,6 +446,11 @@ func (mysqld *Mysqld) startNoWait(cnf *Mycnf, mysqldArgs ...string) error {
for _, callback := range mysqld.onTermFuncs {
go callback()
}
if err != nil {
for _, failureFunc := range mysqld.onFailureFuncs {
go failureFunc(err)
}
}
mysqld.mutex.Unlock()
}
}(mysqld.cancelWaitCmd)
Expand Down Expand Up @@ -1247,6 +1253,12 @@ func (mysqld *Mysqld) OnTerm(f func()) {
mysqld.onTermFuncs = append(mysqld.onTermFuncs, f)
}

func (mysqld *Mysqld) OnFailure(f func(error)) {
mysqld.mutex.Lock()
defer mysqld.mutex.Unlock()
mysqld.onFailureFuncs = append(mysqld.onFailureFuncs, f)
}

func buildLdPaths() ([]string, error) {
vtMysqlRoot, err := vtenv.VtMysqlRoot()
if err != nil {
Expand Down
Loading