From cd94eff7f259d275a3e9aa0bd488d48434f581af Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 19 Jan 2025 07:48:21 +0200 Subject: [PATCH] Online DDL forced cut-over: terminate transactions holding metadata locks on table (#17535) Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/mysql/capabilities/capability.go | 41 ++++----- go/mysql/capabilities/capability_test.go | 19 +++++ go/mysql/flavor_test.go | 9 ++ .../scheduler/onlineddl_scheduler_test.go | 83 +++++++++++++++++++ go/vt/vttablet/onlineddl/executor.go | 55 ++++++------ go/vt/vttablet/onlineddl/schema.go | 9 ++ 6 files changed, 173 insertions(+), 43 deletions(-) diff --git a/go/mysql/capabilities/capability.go b/go/mysql/capabilities/capability.go index 4015059e686..eac25585089 100644 --- a/go/mysql/capabilities/capability.go +++ b/go/mysql/capabilities/capability.go @@ -31,25 +31,26 @@ var ( type FlavorCapability int const ( - NoneFlavorCapability FlavorCapability = iota // default placeholder - FastDropTableFlavorCapability // supported in MySQL 8.0.23 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-23.html - TransactionalGtidExecutedFlavorCapability // - InstantDDLFlavorCapability // ALGORITHM=INSTANT general support - InstantAddLastColumnFlavorCapability // - InstantAddDropVirtualColumnFlavorCapability // - InstantAddDropColumnFlavorCapability // Adding/dropping column in any position/ordinal. - InstantChangeColumnDefaultFlavorCapability // - InstantExpandEnumCapability // - InstantChangeColumnVisibilityCapability // - MySQLUpgradeInServerFlavorCapability // - DynamicRedoLogCapacityFlavorCapability // supported in MySQL 8.0.30 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-30.html - DisableRedoLogFlavorCapability // supported in MySQL 8.0.21 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-21.html - CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html - PerformanceSchemaDataLocksTableCapability // supported in MySQL 8.0.1 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-1.html - InstantDDLXtrabackupCapability // Supported in 8.0.32 and above, solving a MySQL-vs-Xtrabackup bug starting 8.0.29 - ReplicaTerminologyCapability // Supported in 8.0.26 and above, using SHOW REPLICA STATUS and all variations. - BinaryLogStatus // Supported in 8.2.0 and above, uses SHOW BINARY LOG STATUS - RestrictFKOnNonStandardKey // Supported in 8.4.0 and above, restricts usage of non-standard indexes for foreign keys. + NoneFlavorCapability FlavorCapability = iota // default placeholder + FastDropTableFlavorCapability // supported in MySQL 8.0.23 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-23.html + TransactionalGtidExecutedFlavorCapability // + InstantDDLFlavorCapability // ALGORITHM=INSTANT general support + InstantAddLastColumnFlavorCapability // + InstantAddDropVirtualColumnFlavorCapability // + InstantAddDropColumnFlavorCapability // Adding/dropping column in any position/ordinal. + InstantChangeColumnDefaultFlavorCapability // + InstantExpandEnumCapability // + InstantChangeColumnVisibilityCapability // + MySQLUpgradeInServerFlavorCapability // + DynamicRedoLogCapacityFlavorCapability // supported in MySQL 8.0.30 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-30.html + DisableRedoLogFlavorCapability // supported in MySQL 8.0.21 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-21.html + CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html + PerformanceSchemaDataLocksTableCapability // supported in MySQL 8.0.1 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-1.html + PerformanceSchemaMetadataLocksTableCapability // supported in MySQL 8.0.2 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-2.html + InstantDDLXtrabackupCapability // Supported in 8.0.32 and above, solving a MySQL-vs-Xtrabackup bug starting 8.0.29 + ReplicaTerminologyCapability // Supported in 8.0.26 and above, using SHOW REPLICA STATUS and all variations. + BinaryLogStatus // Supported in 8.2.0 and above, uses SHOW BINARY LOG STATUS + RestrictFKOnNonStandardKey // Supported in 8.4.0 and above, restricts usage of non-standard indexes for foreign keys. ) type CapableOf func(capability FlavorCapability) (bool, error) @@ -97,6 +98,8 @@ func MySQLVersionHasCapability(serverVersion string, capability FlavorCapability return atLeast(8, 0, 0) case PerformanceSchemaDataLocksTableCapability: return atLeast(8, 0, 1) + case PerformanceSchemaMetadataLocksTableCapability: + return atLeast(8, 0, 2) case MySQLUpgradeInServerFlavorCapability: return atLeast(8, 0, 16) case CheckConstraintsCapability: diff --git a/go/mysql/capabilities/capability_test.go b/go/mysql/capabilities/capability_test.go index aeb18bed22e..cf5e693840e 100644 --- a/go/mysql/capabilities/capability_test.go +++ b/go/mysql/capabilities/capability_test.go @@ -218,6 +218,25 @@ func TestMySQLVersionCapableOf(t *testing.T) { version: "8.0.20", capability: PerformanceSchemaDataLocksTableCapability, isCapable: true, + }, { + version: "5.7.38", + capability: PerformanceSchemaMetadataLocksTableCapability, + isCapable: false, + }, + { + version: "8.0", + capability: PerformanceSchemaMetadataLocksTableCapability, + isCapable: false, + }, + { + version: "8.0.1", + capability: PerformanceSchemaMetadataLocksTableCapability, + isCapable: false, + }, + { + version: "8.0.2", + capability: PerformanceSchemaMetadataLocksTableCapability, + isCapable: true, }, { version: "8.0.29", diff --git a/go/mysql/flavor_test.go b/go/mysql/flavor_test.go index 3d584b8293b..219b9803933 100644 --- a/go/mysql/flavor_test.go +++ b/go/mysql/flavor_test.go @@ -102,6 +102,15 @@ func TestServerVersionCapableOf(t *testing.T) { version: "8.0.20", capability: capabilities.PerformanceSchemaDataLocksTableCapability, isCapable: true, + }, { + version: "5.7.38", + capability: capabilities.PerformanceSchemaMetadataLocksTableCapability, + isCapable: false, + }, + { + version: "8.0.20", + capability: capabilities.PerformanceSchemaMetadataLocksTableCapability, + isCapable: true, }, { // Some ridiculous version diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 5f6423b2556..8547431ddd3 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -714,6 +714,89 @@ func testScheduler(t *testing.T) { } }) }) + t.Run("force_cutover mdl", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), extendedWaitTime*5) + defer cancel() + + t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion", "vtgate", "", "", true)) // skip wait + + t.Run("wait for t1 running", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + }) + t.Run("wait for t1 ready to complete", func(t *testing.T) { + // Waiting for 'running', above, is not enough. We want to let vreplication a chance to start running, or else + // we attempt the cut-over too early. Specifically in this test, we're going to lock rows FOR UPDATE, which, + // if vreplication does not get the chance to start, will prevent it from doing anything at all. + // ready_to_complete is a great signal for us that vreplication is healthy and up to date. + waitForReadyToComplete(t, t1uuid, true) + }) + + conn, err := primaryTablet.VttabletProcess.TabletConn(keyspaceName, true) + require.NoError(t, err) + defer conn.Close() + + unlockTables := func() error { + _, err := conn.ExecuteFetch("unlock tables", 0, false) + return err + } + t.Run("locking table", func(t *testing.T) { + _, err := conn.ExecuteFetch("lock tables t1_test write", 0, false) + require.NoError(t, err) + }) + defer unlockTables() + t.Run("injecting heartbeats asynchronously", func(t *testing.T) { + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.OnlineDDLName, nil) + select { + case <-ticker.C: + case <-ctx.Done(): + return + } + } + }() + }) + t.Run("check no force_cutover", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + forceCutOver := row.AsInt64("force_cutover", 0) + assert.Equal(t, int64(0), forceCutOver) // disabled + } + }) + t.Run("attempt to complete", func(t *testing.T) { + onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true) + }) + t.Run("cut-over fail due to timeout", func(t *testing.T) { + waitForMessage(t, t1uuid, "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded") + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusRunning) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) + }) + t.Run("force_cutover", func(t *testing.T) { + onlineddl.CheckForceMigrationCutOver(t, &vtParams, shards, t1uuid, true) + }) + t.Run("check force_cutover", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + forceCutOver := row.AsInt64("force_cutover", 0) + assert.Equal(t, int64(1), forceCutOver) // enabled + } + }) + t.Run("expect completion", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("expect unlock failure", func(t *testing.T) { + err := unlockTables() + assert.ErrorContains(t, err, "broken pipe") + }) + }) } t.Run("ALTER both tables non-concurrent", func(t *testing.T) { t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index ce81a2dd516..76c7af7fc2e 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -838,34 +838,41 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa } } capableOf := mysql.ServerVersionCapableOf(conn.ServerVersion) - capable, err := capableOf(capabilities.PerformanceSchemaDataLocksTableCapability) - if err != nil { - return err - } - if capable { - { - // Kill connections that have open transactions locking the table. These potentially (probably?) are not - // actively running a query on our table. They're doing other things while holding locks on our table. - query, err := sqlparser.ParseAndBind(sqlProcessWithLocksOnTable, sqltypes.StringBindVariable(tableName)) - if err != nil { - return err - } - rs, err := conn.Conn.ExecuteFetch(query, -1, true) + terminateTransactions := func(capability capabilities.FlavorCapability, query string, column string, description string) error { + capable, err := capableOf(capability) + if err != nil { + return err + } + if !capable { + return nil + } + query, err = sqlparser.ParseAndBind(query, sqltypes.StringBindVariable(tableName)) + if err != nil { + return err + } + rs, err := conn.Conn.ExecuteFetch(query, -1, true) + if err != nil { + return vterrors.Wrapf(err, "finding transactions locking table `%s` %s", tableName, description) + } + log.Infof("terminateTransactions: found %v transactions locking table `%s` %s", len(rs.Rows), tableName, description) + for _, row := range rs.Named().Rows { + threadId := row.AsInt64(column, 0) + log.Infof("terminateTransactions: killing connection %v with transaction locking table `%s` %s", threadId, tableName, description) + killConnection := fmt.Sprintf("KILL %d", threadId) + _, err = conn.Conn.ExecuteFetch(killConnection, 1, false) if err != nil { - return vterrors.Wrapf(err, "finding transactions locking table") - } - log.Infof("killTableLockHoldersAndAccessors: found %v locking transactions", len(rs.Rows)) - for _, row := range rs.Named().Rows { - threadId := row.AsInt64("trx_mysql_thread_id", 0) - log.Infof("killTableLockHoldersAndAccessors: killing connection %v with transaction on table", threadId) - killConnection := fmt.Sprintf("KILL %d", threadId) - _, err = conn.Conn.ExecuteFetch(killConnection, 1, false) - if err != nil { - log.Errorf("Unable to kill the connection %d: %v", threadId, err) - } + log.Errorf("terminateTransactions: unable to kill the connection %d locking table `%s` %s: %v", threadId, tableName, description, err) } } + return nil + } + if err := terminateTransactions(capabilities.PerformanceSchemaDataLocksTableCapability, sqlProcessWithLocksOnTable, "trx_mysql_thread_id", "data"); err != nil { + return err } + if err := terminateTransactions(capabilities.PerformanceSchemaMetadataLocksTableCapability, sqlProcessWithMetadataLocksOnTable, "processlist_id", "metadata"); err != nil { + return err + } + return nil } diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 6ba0217519c..6c0bff1086f 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -567,6 +567,15 @@ const ( where data_locks.OBJECT_SCHEMA=database() AND data_locks.OBJECT_NAME=%a ` + sqlProcessWithMetadataLocksOnTable = ` + SELECT + DISTINCT threads.processlist_id + from + performance_schema.metadata_locks + join performance_schema.threads on (metadata_locks.OWNER_THREAD_ID=threads.THREAD_ID) + where + metadata_locks.OBJECT_SCHEMA=database() AND metadata_locks.OBJECT_NAME=%a + ` ) var (