From cd6db9d839f8a2cda20e693896e6d8507fd76ab1 Mon Sep 17 00:00:00 2001 From: Noble Mittal Date: Thu, 23 Jan 2025 18:19:48 +0530 Subject: [PATCH] test: Add unit tests for vtctl/workflow Signed-off-by: Noble Mittal --- go/vt/vtctl/workflow/framework_test.go | 31 +++-- go/vt/vtctl/workflow/server_test.go | 82 ++++++++++++ go/vt/vtctl/workflow/traffic_switcher_test.go | 121 ++++++++++++++++++ go/vt/vtctl/workflow/utils_test.go | 76 +++++++++++ 4 files changed, 302 insertions(+), 8 deletions(-) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index a2c1b2ef8e3..fad48e31e0c 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -272,7 +272,7 @@ type testTMClient struct { createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse readVReplicationWorkflowRequests map[uint32]*readVReplicationWorkflowRequestResponse updateVReplicationWorklowsRequests map[uint32]*tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest - applySchemaRequests map[uint32]*applySchemaRequestResponse + applySchemaRequests map[uint32][]*applySchemaRequestResponse primaryPositions map[uint32]string vdiffRequests map[uint32]*vdiffRequestResponse refreshStateErrors map[uint32]error @@ -296,7 +296,7 @@ func newTestTMClient(env *testEnv) *testTMClient { createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse), readVReplicationWorkflowRequests: make(map[uint32]*readVReplicationWorkflowRequestResponse), updateVReplicationWorklowsRequests: make(map[uint32]*tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest), - applySchemaRequests: make(map[uint32]*applySchemaRequestResponse), + applySchemaRequests: make(map[uint32][]*applySchemaRequestResponse), readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse), primaryPositions: make(map[uint32]string), vdiffRequests: make(map[uint32]*vdiffRequestResponse), @@ -398,10 +398,9 @@ func (tmc *testTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Table schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} for _, table := range req.Tables { if table == "/.*/" { - // Special case of all tables in keyspace. - for key, tableDefn := range tmc.schema { + for key, schemaDefinition := range tmc.schema { if strings.HasPrefix(key, tablet.Keyspace+".") { - schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...) + schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, schemaDefinition.TableDefinitions...) } } break @@ -414,6 +413,12 @@ func (tmc *testTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Table } schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...) } + for key, schemaDefinition := range tmc.schema { + if strings.HasPrefix(key, tablet.Keyspace) { + schemaDefn.DatabaseSchema = schemaDefinition.DatabaseSchema + break + } + } return schemaDefn, nil } @@ -508,10 +513,10 @@ func (tmc *testTMClient) expectApplySchemaRequest(tabletID uint32, req *applySch defer tmc.mu.Unlock() if tmc.applySchemaRequests == nil { - tmc.applySchemaRequests = make(map[uint32]*applySchemaRequestResponse) + tmc.applySchemaRequests = make(map[uint32][]*applySchemaRequestResponse) } - tmc.applySchemaRequests[tabletID] = req + tmc.applySchemaRequests[tabletID] = append(tmc.applySchemaRequests[tabletID], req) } // Note: ONLY breaks up change.SQL into individual statements and executes it. Does NOT fully implement ApplySchema. @@ -519,11 +524,17 @@ func (tmc *testTMClient) ApplySchema(ctx context.Context, tablet *topodatapb.Tab tmc.mu.Lock() defer tmc.mu.Unlock() - if expect, ok := tmc.applySchemaRequests[tablet.Alias.Uid]; ok { + if requests, ok := tmc.applySchemaRequests[tablet.Alias.Uid]; ok { + if len(requests) == 0 { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ApplySchema request on tablet %s: got %+v", + topoproto.TabletAliasString(tablet.Alias), change) + } + expect := requests[0] if !reflect.DeepEqual(change, expect.change) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ApplySchema request on tablet %s: got %+v, want %+v", topoproto.TabletAliasString(tablet.Alias), change, expect.change) } + tmc.applySchemaRequests[tablet.Alias.Uid] = tmc.applySchemaRequests[tablet.Alias.Uid][1:] return expect.res, expect.err } @@ -779,6 +790,10 @@ func (tmc *testTMClient) getVReplicationWorkflowsResponse(key string) *tabletman return resp } +func (tmc *testTMClient) ReloadSchema(ctx context.Context, tablet *topodatapb.Tablet, waitPosition string) error { + return nil +} + // // Utility / helper functions. // diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 8bb4a06f23a..57041570d4b 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -34,10 +34,12 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtenv" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tmclient" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -2306,3 +2308,83 @@ func TestWorkflowStatus(t *testing.T) { assert.Equal(t, float32(50), stateTable1.RowsPercentage) assert.Equal(t, float32(50), stateTable2.RowsPercentage) } + +func TestDeleteShard(t *testing.T) { + ctx := context.Background() + + sourceKeyspace := &testKeyspace{"source_keyspace", []string{"-"}} + targetKeyspace := &testKeyspace{"target_keyspace", []string{"-"}} + + te := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer te.close() + + // Verify that shard exists. + si, err := te.ts.GetShard(ctx, targetKeyspace.KeyspaceName, targetKeyspace.ShardNames[0]) + require.NoError(t, err) + require.NotNil(t, si) + + // Expect to fail if recursive is false. + err = te.ws.DeleteShard(ctx, targetKeyspace.KeyspaceName, targetKeyspace.ShardNames[0], false, true) + assert.ErrorContains(t, err, "shard target_keyspace/- still has 1 tablets in cell") + + // Should not throw error if given keyspace or shard is invalid. + err = te.ws.DeleteShard(ctx, "invalid_keyspace", "-", false, true) + assert.NoError(t, err) + + // Successful shard delete. + err = te.ws.DeleteShard(ctx, targetKeyspace.KeyspaceName, targetKeyspace.ShardNames[0], true, true) + assert.NoError(t, err) + + // Check if the shard was deleted. + _, err = te.ts.GetShard(ctx, targetKeyspace.KeyspaceName, targetKeyspace.ShardNames[0]) + assert.ErrorContains(t, err, "node doesn't exist") +} + +func TestCopySchemaShard(t *testing.T) { + ctx := context.Background() + + sourceKeyspace := &testKeyspace{"source_keyspace", []string{"-"}} + targetKeyspace := &testKeyspace{"target_keyspace", []string{"-"}} + + te := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer te.close() + + sqlSchema := `create table t1(id bigint(20) unsigned auto_increment, msg varchar(64), primary key (id)) Engine=InnoDB;` + te.tmc.schema[fmt.Sprintf("%s.t1", sourceKeyspace.KeyspaceName)] = &tabletmanagerdatapb.SchemaDefinition{ + DatabaseSchema: "CREATE DATABASE {{.DatabaseName}}", + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "t1", + Schema: sqlSchema, + Columns: []string{ + "id", + "msg", + }, + Type: tmutils.TableBaseTable, + }, + }, + } + + // Expect queries on target shards + te.tmc.expectApplySchemaRequest(200, &applySchemaRequestResponse{ + change: &tmutils.SchemaChange{ + SQL: "CREATE DATABASE `vt_target_keyspace`", + Force: false, + AllowReplication: true, + SQLMode: vreplication.SQLMode, + }, + }) + te.tmc.expectApplySchemaRequest(200, &applySchemaRequestResponse{ + change: &tmutils.SchemaChange{ + SQL: sqlSchema, + Force: false, + AllowReplication: true, + SQLMode: vreplication.SQLMode, + }, + }) + + sourceTablet := te.tablets[sourceKeyspace.KeyspaceName][100] + err := te.ws.CopySchemaShard(ctx, sourceTablet.Alias, []string{"/.*/"}, nil, false, targetKeyspace.KeyspaceName, "-", 1*time.Second, true) + assert.NoError(t, err) + assert.Empty(t, te.tmc.applySchemaRequests[200]) +} diff --git a/go/vt/vtctl/workflow/traffic_switcher_test.go b/go/vt/vtctl/workflow/traffic_switcher_test.go index b06c95b6c16..3c170d2cba1 100644 --- a/go/vt/vtctl/workflow/traffic_switcher_test.go +++ b/go/vt/vtctl/workflow/traffic_switcher_test.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/proto/vschema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" @@ -903,3 +904,123 @@ func TestAddParticipatingTablesToKeyspace(t *testing.T) { assert.Len(t, vs.Tables["t1"].ColumnVindexes, 2) assert.Len(t, vs.Tables["t2"].ColumnVindexes, 1) } + +func TestCancelMigration_TABLES(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + tableName := "t1" + + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"0"}, + } + + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + tableName: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: tableName, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), + }, + }, + }, + } + + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + env.tmc.schema = schema + + ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspace.KeyspaceName, workflowName) + require.NoError(t, err) + + sm, err := BuildStreamMigrator(ctx, ts, false, sqlparser.NewTestParser()) + require.NoError(t, err) + + env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running', message='' where db_name='vt_targetks' and workflow='wf1'", &sqltypes.Result{}) + env.tmc.expectVRQuery(100, "delete from _vt.vreplication where db_name = 'vt_sourceks' and workflow = 'wf1_reverse'", &sqltypes.Result{}) + + ctx, _, err = env.ts.LockKeyspace(ctx, targetKeyspace.KeyspaceName, "test") + require.NoError(t, err) + + ctx, _, err = env.ts.LockKeyspace(ctx, sourceKeyspace.KeyspaceName, "test") + require.NoError(t, err) + + err = topo.CheckKeyspaceLocked(ctx, ts.targetKeyspace) + require.NoError(t, err) + + err = topo.CheckKeyspaceLocked(ctx, ts.sourceKeyspace) + require.NoError(t, err) + + ts.cancelMigration(ctx, sm) + + // Expect the queries to be cleared + assert.Empty(t, env.tmc.vrQueries[100]) + assert.Empty(t, env.tmc.vrQueries[200]) +} + +func TestCancelMigration_SHARDS(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + tableName := "t1" + + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"0"}, + } + + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + tableName: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: tableName, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), + }, + }, + }, + } + + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + env.tmc.schema = schema + + ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspace.KeyspaceName, workflowName) + require.NoError(t, err) + ts.migrationType = binlogdata.MigrationType_SHARDS + + sm, err := BuildStreamMigrator(ctx, ts, false, sqlparser.NewTestParser()) + require.NoError(t, err) + + env.tmc.expectVRQuery(100, "update /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ _vt.vreplication set state='Running', stop_pos=null, message='' where db_name='vt_sourceks' and workflow != 'wf1_reverse'", &sqltypes.Result{}) + env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running', message='' where db_name='vt_targetks' and workflow='wf1'", &sqltypes.Result{}) + env.tmc.expectVRQuery(100, "delete from _vt.vreplication where db_name = 'vt_sourceks' and workflow = 'wf1_reverse'", &sqltypes.Result{}) + + ctx, _, err = env.ts.LockKeyspace(ctx, targetKeyspace.KeyspaceName, "test") + require.NoError(t, err) + + ctx, _, err = env.ts.LockKeyspace(ctx, sourceKeyspace.KeyspaceName, "test") + require.NoError(t, err) + + err = topo.CheckKeyspaceLocked(ctx, ts.targetKeyspace) + require.NoError(t, err) + + err = topo.CheckKeyspaceLocked(ctx, ts.sourceKeyspace) + require.NoError(t, err) + + ts.cancelMigration(ctx, sm) + + // Expect the queries to be cleared + assert.Empty(t, env.tmc.vrQueries[100]) + assert.Empty(t, env.tmc.vrQueries[200]) +} diff --git a/go/vt/vtctl/workflow/utils_test.go b/go/vt/vtctl/workflow/utils_test.go index 8458cf60995..94760939fa0 100644 --- a/go/vt/vtctl/workflow/utils_test.go +++ b/go/vt/vtctl/workflow/utils_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/testfiles" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -22,6 +23,7 @@ import ( "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/proto/vtctldata" ) @@ -281,3 +283,77 @@ func TestValidateSourceTablesExist(t *testing.T) { }) } } + +func TestLegacyBuildTargets(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + tableName := "t1" + + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + tableName: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: tableName, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), + }, + }, + }, + } + + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + env.tmc.schema = schema + + result1 := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types|workflow_type|workflow_sub_type|defer_secondary_keys", + "int64|varchar|varchar|varchar|varchar|int64|int64|int64"), + "1|keyspace:\"source\" shard:\"-80\" filter:{rules:{match:\"t1\"} rules:{match:\"t2\"}}||||0|0|0", + ) + result2 := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types|workflow_type|workflow_sub_type|defer_secondary_keys", + "int64|varchar|varchar|varchar|varchar|int64|int64|int64"), + "1|keyspace:\"source\" shard:\"80-\" filter:{rules:{match:\"t1\"} rules:{match:\"t2\"}}||||0|0|0", + "2|keyspace:\"source\" shard:\"80-\" filter:{rules:{match:\"t3\"} rules:{match:\"t4\"}}||||0|0|0", + ) + env.tmc.expectVRQuery(200, "select id, source, message, cell, tablet_types, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where workflow='wf1' and db_name='vt_targetks'", result1) + env.tmc.expectVRQuery(210, "select id, source, message, cell, tablet_types, workflow_type, workflow_sub_type, defer_secondary_keys from _vt.vreplication where workflow='wf1' and db_name='vt_targetks'", result2) + + ti, err := LegacyBuildTargets(ctx, env.ts, env.tmc, targetKeyspace.KeyspaceName, workflowName, targetKeyspace.ShardNames) + require.NoError(t, err) + // Expect 2 targets as there are 2 target shards. + assert.Len(t, ti.Targets, 2) + + assert.NotNil(t, ti.Targets["-80"]) + assert.NotNil(t, ti.Targets["80-"]) + + t1 := ti.Targets["-80"] + t2 := ti.Targets["80-"] + assert.Len(t, t1.Sources, 1) + assert.Len(t, t2.Sources, 2) + assert.Len(t, t1.Sources[1].Filter.Rules, 2) + + assert.Equal(t, t1.Sources[1].Filter.Rules[0].Match, "t1") + assert.Equal(t, t1.Sources[1].Filter.Rules[1].Match, "t2") + assert.Equal(t, t1.Sources[1].Shard, "-80") + + assert.Len(t, t2.Sources[1].Filter.Rules, 2) + assert.Len(t, t2.Sources[2].Filter.Rules, 2) + + assert.Equal(t, t2.Sources[1].Shard, "80-") + assert.Equal(t, t2.Sources[2].Shard, "80-") + assert.Equal(t, t2.Sources[1].Filter.Rules[0].Match, "t1") + assert.Equal(t, t2.Sources[1].Filter.Rules[1].Match, "t2") + assert.Equal(t, t2.Sources[2].Filter.Rules[0].Match, "t3") + assert.Equal(t, t2.Sources[2].Filter.Rules[1].Match, "t4") +}