Skip to content

Commit

Permalink
Add e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 23, 2025
1 parent fbf4334 commit 4e8e109
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 9 deletions.
99 changes: 99 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,105 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
shardNames = append(shardNames, shardName)
}
testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)

// Confirm that switching writes works as expected in the face of
// vreplication lag (canSwitch() precheck) and when cancelling the
// switch due to replication failing to catch up in time.
t.Run("validate switch writes error handling", func(t *testing.T) {
productConn, err := productTab.TabletConn("product", true)
require.NoError(t, err)
defer productConn.Close()
customerConn1, err := customerTab1.TabletConn("customer", true)
require.NoError(t, err)
customerConn2, err := customerTab2.TabletConn("customer", true)
require.NoError(t, err)
startingTestRowID := 10000000
numTestRows := 100
addTestRows := func() {
for i := 0; i < numTestRows; i++ {
execQuery(t, productConn, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')", startingTestRowID+i))
}
}
deleteTestRows := func() {
execQuery(t, productConn, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID))
}
addIndex := func() {
for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} {
execQuery(t, customerConn, "set session sql_mode=''")
execQuery(t, customerConn, "alter table customer add unique index name_idx (name)")
}
}
dropIndex := func() {
for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} {
execQuery(t, customerConn, "alter table customer drop index name_idx")
}
}
lockTargetTable := func() {
for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} {
execQuery(t, customerConn, "lock table customer read")
}
}
unlockTargetTable := func() {
for _, customerConn := range []*mysql.Conn{customerConn1, customerConn2} {
execQuery(t, customerConn, "unlock tables")
}
}
cleanupTestData := func() {
dropIndex()
deleteTestRows()
}
restartWorkflow := func() {
// We have to restart the workflow again as the duplicate key error
// is a permanent/terminal one.
err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow)
require.NoError(t, err, "failed to start workflow: %v", err)
}
waitForTargetToCatchup := func() {
restartWorkflow()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
waitForNoWorkflowLag(t, vc, targetKs, workflow)
}

// First let's test that the pre-checks work as expected.
// We ALTER the table on the customer (target) shard to
// add a unique index on the name field.
addIndex()
// Then we insert some test rows across both shards in the product
// (source) keyspace.
addTestRows()
// Now the workflow should go into the error state and the lag should
// start to climb. So we sleep for twice the max lag duration that we
// will set for the SwitchTraffic call.
lagDuration := 3 * time.Second
time.Sleep(lagDuration * 3)
out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String())
// It should fail in the canSwitch() precheck.
require.Error(t, err)
require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*",
workflow, lagDuration.String()), out)
cleanupTestData()
waitForTargetToCatchup()

// Now let's test that the cancel works by setting the command timeout
// to half the duration. Lock the customer table on the target tablets
// so that we cannot apply the INSERTs and catch up.
lockTargetTable()
addTestRows()
timeout := lagDuration * 2
// Use the default max-replication-lag-allowed value of 30s.
out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String())
// It should fail due to the command context timeout and we should
// successfully cancel.
require.Error(t, err)
require.Contains(t, out, "failed to sync up replication between the source and target")
unlockTargetTable()
deleteTestRows()
waitForTargetToCatchup()
})

// Now let's confirm that it now works as expected.
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)
Expand Down
8 changes: 3 additions & 5 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3069,16 +3069,14 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, maxAllowedR
if err != nil {
return "", err
}
if wf.MaxVReplicationLag > maxAllowedReplLagSecs {
return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationLag, maxAllowedReplLagSecs), nil
}
for _, stream := range wf.ShardStreams {
for _, st := range stream.GetStreams() {
if st.Message == Frozen {
return cannotSwitchFrozen, nil
}
// If no new events have been replicated after the copy phase then it will be 0.
vreplLag := int64(getVReplicationTrxLag(st.TransactionTimestamp, st.TimeUpdated, binlogdatapb.VReplicationWorkflowState(binlogdatapb.VReplicationWorkflowState_value[st.State])))
if vreplLag > maxAllowedReplLagSecs {
return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil
}
switch st.State {
case binlogdatapb.VReplicationWorkflowState_Copying.String():
return cannotSwitchCopyIncomplete, nil
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtctl/workflow/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,11 +667,11 @@ func getVReplicationTrxLag(trxTs, updatedTs *vttimepb.Time, state binlogdatapb.V
if state == binlogdatapb.VReplicationWorkflowState_Copying {
return math.MaxInt64
}
if lastTransactionTime == 0 /* no new events after copy */ ||
lastUpdateTime > lastTransactionTime /* no recent transactions, so all caught up */ {

if state == binlogdatapb.VReplicationWorkflowState_Running && // We could be in the ERROR state
(lastTransactionTime == 0 /* No new events after copy */ ||
lastUpdateTime > lastTransactionTime /* No recent transactions, so all caught up */) {
lastTransactionTime = lastUpdateTime
}
now := time.Now().Unix() /* seconds since epoch */
now := time.Now().Unix() // Seconds since epoch
return float64(now - lastTransactionTime)
}

0 comments on commit 4e8e109

Please sign in to comment.