Skip to content

Commit

Permalink
Online DDL forced cut-over: terminate transactions holding metadata l…
Browse files Browse the repository at this point in the history
…ocks on table (#17535)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jan 19, 2025
1 parent eaaa206 commit cd94eff
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 43 deletions.
41 changes: 22 additions & 19 deletions go/mysql/capabilities/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,26 @@ var (
type FlavorCapability int

const (
NoneFlavorCapability FlavorCapability = iota // default placeholder
FastDropTableFlavorCapability // supported in MySQL 8.0.23 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-23.html
TransactionalGtidExecutedFlavorCapability //
InstantDDLFlavorCapability // ALGORITHM=INSTANT general support
InstantAddLastColumnFlavorCapability //
InstantAddDropVirtualColumnFlavorCapability //
InstantAddDropColumnFlavorCapability // Adding/dropping column in any position/ordinal.
InstantChangeColumnDefaultFlavorCapability //
InstantExpandEnumCapability //
InstantChangeColumnVisibilityCapability //
MySQLUpgradeInServerFlavorCapability //
DynamicRedoLogCapacityFlavorCapability // supported in MySQL 8.0.30 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-30.html
DisableRedoLogFlavorCapability // supported in MySQL 8.0.21 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-21.html
CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html
PerformanceSchemaDataLocksTableCapability // supported in MySQL 8.0.1 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-1.html
InstantDDLXtrabackupCapability // Supported in 8.0.32 and above, solving a MySQL-vs-Xtrabackup bug starting 8.0.29
ReplicaTerminologyCapability // Supported in 8.0.26 and above, using SHOW REPLICA STATUS and all variations.
BinaryLogStatus // Supported in 8.2.0 and above, uses SHOW BINARY LOG STATUS
RestrictFKOnNonStandardKey // Supported in 8.4.0 and above, restricts usage of non-standard indexes for foreign keys.
NoneFlavorCapability FlavorCapability = iota // default placeholder
FastDropTableFlavorCapability // supported in MySQL 8.0.23 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-23.html
TransactionalGtidExecutedFlavorCapability //
InstantDDLFlavorCapability // ALGORITHM=INSTANT general support
InstantAddLastColumnFlavorCapability //
InstantAddDropVirtualColumnFlavorCapability //
InstantAddDropColumnFlavorCapability // Adding/dropping column in any position/ordinal.
InstantChangeColumnDefaultFlavorCapability //
InstantExpandEnumCapability //
InstantChangeColumnVisibilityCapability //
MySQLUpgradeInServerFlavorCapability //
DynamicRedoLogCapacityFlavorCapability // supported in MySQL 8.0.30 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-30.html
DisableRedoLogFlavorCapability // supported in MySQL 8.0.21 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-21.html
CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html
PerformanceSchemaDataLocksTableCapability // supported in MySQL 8.0.1 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-1.html
PerformanceSchemaMetadataLocksTableCapability // supported in MySQL 8.0.2 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-2.html
InstantDDLXtrabackupCapability // Supported in 8.0.32 and above, solving a MySQL-vs-Xtrabackup bug starting 8.0.29
ReplicaTerminologyCapability // Supported in 8.0.26 and above, using SHOW REPLICA STATUS and all variations.
BinaryLogStatus // Supported in 8.2.0 and above, uses SHOW BINARY LOG STATUS
RestrictFKOnNonStandardKey // Supported in 8.4.0 and above, restricts usage of non-standard indexes for foreign keys.
)

type CapableOf func(capability FlavorCapability) (bool, error)
Expand Down Expand Up @@ -97,6 +98,8 @@ func MySQLVersionHasCapability(serverVersion string, capability FlavorCapability
return atLeast(8, 0, 0)
case PerformanceSchemaDataLocksTableCapability:
return atLeast(8, 0, 1)
case PerformanceSchemaMetadataLocksTableCapability:
return atLeast(8, 0, 2)
case MySQLUpgradeInServerFlavorCapability:
return atLeast(8, 0, 16)
case CheckConstraintsCapability:
Expand Down
19 changes: 19 additions & 0 deletions go/mysql/capabilities/capability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,25 @@ func TestMySQLVersionCapableOf(t *testing.T) {
version: "8.0.20",
capability: PerformanceSchemaDataLocksTableCapability,
isCapable: true,
}, {
version: "5.7.38",
capability: PerformanceSchemaMetadataLocksTableCapability,
isCapable: false,
},
{
version: "8.0",
capability: PerformanceSchemaMetadataLocksTableCapability,
isCapable: false,
},
{
version: "8.0.1",
capability: PerformanceSchemaMetadataLocksTableCapability,
isCapable: false,
},
{
version: "8.0.2",
capability: PerformanceSchemaMetadataLocksTableCapability,
isCapable: true,
},
{
version: "8.0.29",
Expand Down
9 changes: 9 additions & 0 deletions go/mysql/flavor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ func TestServerVersionCapableOf(t *testing.T) {
version: "8.0.20",
capability: capabilities.PerformanceSchemaDataLocksTableCapability,
isCapable: true,
}, {
version: "5.7.38",
capability: capabilities.PerformanceSchemaMetadataLocksTableCapability,
isCapable: false,
},
{
version: "8.0.20",
capability: capabilities.PerformanceSchemaMetadataLocksTableCapability,
isCapable: true,
},
{
// Some ridiculous version
Expand Down
83 changes: 83 additions & 0 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,89 @@ func testScheduler(t *testing.T) {
}
})
})
t.Run("force_cutover mdl", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), extendedWaitTime*5)
defer cancel()

t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion", "vtgate", "", "", true)) // skip wait

t.Run("wait for t1 running", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
})
t.Run("wait for t1 ready to complete", func(t *testing.T) {
// Waiting for 'running', above, is not enough. We want to let vreplication a chance to start running, or else
// we attempt the cut-over too early. Specifically in this test, we're going to lock rows FOR UPDATE, which,
// if vreplication does not get the chance to start, will prevent it from doing anything at all.
// ready_to_complete is a great signal for us that vreplication is healthy and up to date.
waitForReadyToComplete(t, t1uuid, true)
})

conn, err := primaryTablet.VttabletProcess.TabletConn(keyspaceName, true)
require.NoError(t, err)
defer conn.Close()

unlockTables := func() error {
_, err := conn.ExecuteFetch("unlock tables", 0, false)
return err
}
t.Run("locking table", func(t *testing.T) {
_, err := conn.ExecuteFetch("lock tables t1_test write", 0, false)
require.NoError(t, err)
})
defer unlockTables()
t.Run("injecting heartbeats asynchronously", func(t *testing.T) {
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.OnlineDDLName, nil)
select {
case <-ticker.C:
case <-ctx.Done():
return
}
}
}()
})
t.Run("check no force_cutover", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
forceCutOver := row.AsInt64("force_cutover", 0)
assert.Equal(t, int64(0), forceCutOver) // disabled
}
})
t.Run("attempt to complete", func(t *testing.T) {
onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true)
})
t.Run("cut-over fail due to timeout", func(t *testing.T) {
waitForMessage(t, t1uuid, "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded")
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusRunning)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
})
t.Run("force_cutover", func(t *testing.T) {
onlineddl.CheckForceMigrationCutOver(t, &vtParams, shards, t1uuid, true)
})
t.Run("check force_cutover", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
forceCutOver := row.AsInt64("force_cutover", 0)
assert.Equal(t, int64(1), forceCutOver) // enabled
}
})
t.Run("expect completion", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
t.Run("expect unlock failure", func(t *testing.T) {
err := unlockTables()
assert.ErrorContains(t, err, "broken pipe")
})
})
}
t.Run("ALTER both tables non-concurrent", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait
Expand Down
55 changes: 31 additions & 24 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,34 +838,41 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
}
}
capableOf := mysql.ServerVersionCapableOf(conn.ServerVersion)
capable, err := capableOf(capabilities.PerformanceSchemaDataLocksTableCapability)
if err != nil {
return err
}
if capable {
{
// Kill connections that have open transactions locking the table. These potentially (probably?) are not
// actively running a query on our table. They're doing other things while holding locks on our table.
query, err := sqlparser.ParseAndBind(sqlProcessWithLocksOnTable, sqltypes.StringBindVariable(tableName))
if err != nil {
return err
}
rs, err := conn.Conn.ExecuteFetch(query, -1, true)
terminateTransactions := func(capability capabilities.FlavorCapability, query string, column string, description string) error {
capable, err := capableOf(capability)
if err != nil {
return err
}
if !capable {
return nil
}
query, err = sqlparser.ParseAndBind(query, sqltypes.StringBindVariable(tableName))
if err != nil {
return err
}
rs, err := conn.Conn.ExecuteFetch(query, -1, true)
if err != nil {
return vterrors.Wrapf(err, "finding transactions locking table `%s` %s", tableName, description)
}
log.Infof("terminateTransactions: found %v transactions locking table `%s` %s", len(rs.Rows), tableName, description)
for _, row := range rs.Named().Rows {
threadId := row.AsInt64(column, 0)
log.Infof("terminateTransactions: killing connection %v with transaction locking table `%s` %s", threadId, tableName, description)
killConnection := fmt.Sprintf("KILL %d", threadId)
_, err = conn.Conn.ExecuteFetch(killConnection, 1, false)
if err != nil {
return vterrors.Wrapf(err, "finding transactions locking table")
}
log.Infof("killTableLockHoldersAndAccessors: found %v locking transactions", len(rs.Rows))
for _, row := range rs.Named().Rows {
threadId := row.AsInt64("trx_mysql_thread_id", 0)
log.Infof("killTableLockHoldersAndAccessors: killing connection %v with transaction on table", threadId)
killConnection := fmt.Sprintf("KILL %d", threadId)
_, err = conn.Conn.ExecuteFetch(killConnection, 1, false)
if err != nil {
log.Errorf("Unable to kill the connection %d: %v", threadId, err)
}
log.Errorf("terminateTransactions: unable to kill the connection %d locking table `%s` %s: %v", threadId, tableName, description, err)
}
}
return nil
}
if err := terminateTransactions(capabilities.PerformanceSchemaDataLocksTableCapability, sqlProcessWithLocksOnTable, "trx_mysql_thread_id", "data"); err != nil {
return err
}
if err := terminateTransactions(capabilities.PerformanceSchemaMetadataLocksTableCapability, sqlProcessWithMetadataLocksOnTable, "processlist_id", "metadata"); err != nil {
return err
}

return nil
}

Expand Down
9 changes: 9 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,15 @@ const (
where
data_locks.OBJECT_SCHEMA=database() AND data_locks.OBJECT_NAME=%a
`
sqlProcessWithMetadataLocksOnTable = `
SELECT
DISTINCT threads.processlist_id
from
performance_schema.metadata_locks
join performance_schema.threads on (metadata_locks.OWNER_THREAD_ID=threads.THREAD_ID)
where
metadata_locks.OBJECT_SCHEMA=database() AND metadata_locks.OBJECT_NAME=%a
`
)

var (
Expand Down

0 comments on commit cd94eff

Please sign in to comment.