Skip to content

Commit

Permalink
test: Add unit tests for vtctl/workflow
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Jan 23, 2025
1 parent d1aa2f4 commit cd6db9d
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 8 deletions.
31 changes: 23 additions & 8 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ type testTMClient struct {
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse
readVReplicationWorkflowRequests map[uint32]*readVReplicationWorkflowRequestResponse
updateVReplicationWorklowsRequests map[uint32]*tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest
applySchemaRequests map[uint32]*applySchemaRequestResponse
applySchemaRequests map[uint32][]*applySchemaRequestResponse
primaryPositions map[uint32]string
vdiffRequests map[uint32]*vdiffRequestResponse
refreshStateErrors map[uint32]error
Expand All @@ -296,7 +296,7 @@ func newTestTMClient(env *testEnv) *testTMClient {
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
readVReplicationWorkflowRequests: make(map[uint32]*readVReplicationWorkflowRequestResponse),
updateVReplicationWorklowsRequests: make(map[uint32]*tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest),
applySchemaRequests: make(map[uint32]*applySchemaRequestResponse),
applySchemaRequests: make(map[uint32][]*applySchemaRequestResponse),
readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse),
primaryPositions: make(map[uint32]string),
vdiffRequests: make(map[uint32]*vdiffRequestResponse),
Expand Down Expand Up @@ -398,10 +398,9 @@ func (tmc *testTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Table
schemaDefn := &tabletmanagerdatapb.SchemaDefinition{}
for _, table := range req.Tables {
if table == "/.*/" {
// Special case of all tables in keyspace.
for key, tableDefn := range tmc.schema {
for key, schemaDefinition := range tmc.schema {
if strings.HasPrefix(key, tablet.Keyspace+".") {
schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...)
schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, schemaDefinition.TableDefinitions...)
}
}
break
Expand All @@ -414,6 +413,12 @@ func (tmc *testTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Table
}
schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...)
}
for key, schemaDefinition := range tmc.schema {
if strings.HasPrefix(key, tablet.Keyspace) {
schemaDefn.DatabaseSchema = schemaDefinition.DatabaseSchema
break
}
}
return schemaDefn, nil
}

Expand Down Expand Up @@ -508,22 +513,28 @@ func (tmc *testTMClient) expectApplySchemaRequest(tabletID uint32, req *applySch
defer tmc.mu.Unlock()

if tmc.applySchemaRequests == nil {
tmc.applySchemaRequests = make(map[uint32]*applySchemaRequestResponse)
tmc.applySchemaRequests = make(map[uint32][]*applySchemaRequestResponse)
}

tmc.applySchemaRequests[tabletID] = req
tmc.applySchemaRequests[tabletID] = append(tmc.applySchemaRequests[tabletID], req)
}

// Note: ONLY breaks up change.SQL into individual statements and executes it. Does NOT fully implement ApplySchema.
func (tmc *testTMClient) ApplySchema(ctx context.Context, tablet *topodatapb.Tablet, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if expect, ok := tmc.applySchemaRequests[tablet.Alias.Uid]; ok {
if requests, ok := tmc.applySchemaRequests[tablet.Alias.Uid]; ok {
if len(requests) == 0 {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ApplySchema request on tablet %s: got %+v",
topoproto.TabletAliasString(tablet.Alias), change)
}
expect := requests[0]
if !reflect.DeepEqual(change, expect.change) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ApplySchema request on tablet %s: got %+v, want %+v",
topoproto.TabletAliasString(tablet.Alias), change, expect.change)
}
tmc.applySchemaRequests[tablet.Alias.Uid] = tmc.applySchemaRequests[tablet.Alias.Uid][1:]
return expect.res, expect.err
}

Expand Down Expand Up @@ -779,6 +790,10 @@ func (tmc *testTMClient) getVReplicationWorkflowsResponse(key string) *tabletman
return resp
}

func (tmc *testTMClient) ReloadSchema(ctx context.Context, tablet *topodatapb.Tablet, waitPosition string) error {
return nil
}

//
// Utility / helper functions.
//
Expand Down
82 changes: 82 additions & 0 deletions go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -2306,3 +2308,83 @@ func TestWorkflowStatus(t *testing.T) {
assert.Equal(t, float32(50), stateTable1.RowsPercentage)
assert.Equal(t, float32(50), stateTable2.RowsPercentage)
}

func TestDeleteShard(t *testing.T) {
ctx := context.Background()

sourceKeyspace := &testKeyspace{"source_keyspace", []string{"-"}}
targetKeyspace := &testKeyspace{"target_keyspace", []string{"-"}}

te := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace)
defer te.close()

// Verify that shard exists.
si, err := te.ts.GetShard(ctx, targetKeyspace.KeyspaceName, targetKeyspace.ShardNames[0])
require.NoError(t, err)
require.NotNil(t, si)

// Expect to fail if recursive is false.
err = te.ws.DeleteShard(ctx, targetKeyspace.KeyspaceName, targetKeyspace.ShardNames[0], false, true)
assert.ErrorContains(t, err, "shard target_keyspace/- still has 1 tablets in cell")

// Should not throw error if given keyspace or shard is invalid.
err = te.ws.DeleteShard(ctx, "invalid_keyspace", "-", false, true)
assert.NoError(t, err)

// Successful shard delete.
err = te.ws.DeleteShard(ctx, targetKeyspace.KeyspaceName, targetKeyspace.ShardNames[0], true, true)
assert.NoError(t, err)

// Check if the shard was deleted.
_, err = te.ts.GetShard(ctx, targetKeyspace.KeyspaceName, targetKeyspace.ShardNames[0])
assert.ErrorContains(t, err, "node doesn't exist")
}

func TestCopySchemaShard(t *testing.T) {
ctx := context.Background()

sourceKeyspace := &testKeyspace{"source_keyspace", []string{"-"}}
targetKeyspace := &testKeyspace{"target_keyspace", []string{"-"}}

te := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace)
defer te.close()

sqlSchema := `create table t1(id bigint(20) unsigned auto_increment, msg varchar(64), primary key (id)) Engine=InnoDB;`
te.tmc.schema[fmt.Sprintf("%s.t1", sourceKeyspace.KeyspaceName)] = &tabletmanagerdatapb.SchemaDefinition{
DatabaseSchema: "CREATE DATABASE {{.DatabaseName}}",
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: "t1",
Schema: sqlSchema,
Columns: []string{
"id",
"msg",
},
Type: tmutils.TableBaseTable,
},
},
}

// Expect queries on target shards
te.tmc.expectApplySchemaRequest(200, &applySchemaRequestResponse{
change: &tmutils.SchemaChange{
SQL: "CREATE DATABASE `vt_target_keyspace`",
Force: false,
AllowReplication: true,
SQLMode: vreplication.SQLMode,
},
})
te.tmc.expectApplySchemaRequest(200, &applySchemaRequestResponse{
change: &tmutils.SchemaChange{
SQL: sqlSchema,
Force: false,
AllowReplication: true,
SQLMode: vreplication.SQLMode,
},
})

sourceTablet := te.tablets[sourceKeyspace.KeyspaceName][100]
err := te.ws.CopySchemaShard(ctx, sourceTablet.Alias, []string{"/.*/"}, nil, false, targetKeyspace.KeyspaceName, "-", 1*time.Second, true)
assert.NoError(t, err)
assert.Empty(t, te.tmc.applySchemaRequests[200])
}
121 changes: 121 additions & 0 deletions go/vt/vtctl/workflow/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -903,3 +904,123 @@ func TestAddParticipatingTablesToKeyspace(t *testing.T) {
assert.Len(t, vs.Tables["t1"].ColumnVindexes, 2)
assert.Len(t, vs.Tables["t2"].ColumnVindexes, 1)
}

func TestCancelMigration_TABLES(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

workflowName := "wf1"
tableName := "t1"

sourceKeyspace := &testKeyspace{
KeyspaceName: "sourceks",
ShardNames: []string{"0"},
}
targetKeyspace := &testKeyspace{
KeyspaceName: "targetks",
ShardNames: []string{"0"},
}

schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
tableName: {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: tableName,
Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName),
},
},
},
}

env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace)
defer env.close()
env.tmc.schema = schema

ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspace.KeyspaceName, workflowName)
require.NoError(t, err)

sm, err := BuildStreamMigrator(ctx, ts, false, sqlparser.NewTestParser())
require.NoError(t, err)

env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running', message='' where db_name='vt_targetks' and workflow='wf1'", &sqltypes.Result{})
env.tmc.expectVRQuery(100, "delete from _vt.vreplication where db_name = 'vt_sourceks' and workflow = 'wf1_reverse'", &sqltypes.Result{})

ctx, _, err = env.ts.LockKeyspace(ctx, targetKeyspace.KeyspaceName, "test")
require.NoError(t, err)

ctx, _, err = env.ts.LockKeyspace(ctx, sourceKeyspace.KeyspaceName, "test")
require.NoError(t, err)

err = topo.CheckKeyspaceLocked(ctx, ts.targetKeyspace)
require.NoError(t, err)

err = topo.CheckKeyspaceLocked(ctx, ts.sourceKeyspace)
require.NoError(t, err)

ts.cancelMigration(ctx, sm)

// Expect the queries to be cleared
assert.Empty(t, env.tmc.vrQueries[100])
assert.Empty(t, env.tmc.vrQueries[200])
}

func TestCancelMigration_SHARDS(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

workflowName := "wf1"
tableName := "t1"

sourceKeyspace := &testKeyspace{
KeyspaceName: "sourceks",
ShardNames: []string{"0"},
}
targetKeyspace := &testKeyspace{
KeyspaceName: "targetks",
ShardNames: []string{"0"},
}

schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
tableName: {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: tableName,
Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName),
},
},
},
}

env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace)
defer env.close()
env.tmc.schema = schema

ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspace.KeyspaceName, workflowName)
require.NoError(t, err)
ts.migrationType = binlogdata.MigrationType_SHARDS

sm, err := BuildStreamMigrator(ctx, ts, false, sqlparser.NewTestParser())
require.NoError(t, err)

env.tmc.expectVRQuery(100, "update /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ _vt.vreplication set state='Running', stop_pos=null, message='' where db_name='vt_sourceks' and workflow != 'wf1_reverse'", &sqltypes.Result{})
env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running', message='' where db_name='vt_targetks' and workflow='wf1'", &sqltypes.Result{})
env.tmc.expectVRQuery(100, "delete from _vt.vreplication where db_name = 'vt_sourceks' and workflow = 'wf1_reverse'", &sqltypes.Result{})

ctx, _, err = env.ts.LockKeyspace(ctx, targetKeyspace.KeyspaceName, "test")
require.NoError(t, err)

ctx, _, err = env.ts.LockKeyspace(ctx, sourceKeyspace.KeyspaceName, "test")
require.NoError(t, err)

err = topo.CheckKeyspaceLocked(ctx, ts.targetKeyspace)
require.NoError(t, err)

err = topo.CheckKeyspaceLocked(ctx, ts.sourceKeyspace)
require.NoError(t, err)

ts.cancelMigration(ctx, sm)

// Expect the queries to be cleared
assert.Empty(t, env.tmc.vrQueries[100])
assert.Empty(t, env.tmc.vrQueries[200])
}
Loading

0 comments on commit cd6db9d

Please sign in to comment.