From 7c6a1c0bec049bbf795985f7928f20511f9f7d9c Mon Sep 17 00:00:00 2001 From: jiweixiao <43225348@qq.com> Date: Sun, 25 Aug 2024 10:50:28 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9C=A8SQL=E5=AE=A1?= =?UTF-8?q?=E6=A0=B8=E9=98=B6=E6=AE=B5=E8=87=AA=E5=8A=A8=E8=AF=86=E5=88=AB?= =?UTF-8?q?=E5=B9=B6=E5=90=88=E5=B9=B6=E7=9B=B8=E5=90=8C=E8=A1=A8=E7=9A=84?= =?UTF-8?q?alter=20table=E8=AF=AD=E5=8F=A5=E7=9A=84=E5=8A=9F=E8=83=BD=20(#?= =?UTF-8?q?669)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 增加在SQL审核阶段自动识别并合并相同表的alter table语句的功能 --- config/config.go | 1 + session/inception_result.go | 9 ++- session/session.go | 12 ++++ session/session_inception.go | 127 +++++++++++++++++++++++++++++++++-- session/tidb.go | 3 +- 5 files changed, 145 insertions(+), 7 deletions(-) diff --git a/config/config.go b/config/config.go index 84ce2fedd..de15bb05d 100644 --- a/config/config.go +++ b/config/config.go @@ -221,6 +221,7 @@ type Binlog struct { // Inc is the inception section of the config. type Inc struct { + AlterAutoMerge bool `toml:"alter_auto_merge" json:"alter_auto_merge"` BackupHost string `toml:"backup_host" json:"backup_host"` // 远程备份库信息 BackupPassword string `toml:"backup_password" json:"backup_password"` BackupPort uint `toml:"backup_port" json:"backup_port"` diff --git a/session/inception_result.go b/session/inception_result.go index 1d4a14746..ec8abed94 100644 --- a/session/inception_result.go +++ b/session/inception_result.go @@ -97,6 +97,9 @@ type Record struct { // delete多表时,默认delete后第一个表为主表,其余表才会记录到该处 // 仅在发现多表操作时,初始化该参数 MultiTables map[string]*TableInfo + + // 判断该语句是否是需要被合并的(只有 alter table, create index, drop index三种语句需要被合并),不需要为0,已经被合并过的SQL会被设置为-1,需要的数字为对应的合并后的SQL的行号 + NeedMerge int } func (r *Record) appendWarningMessage(msg string) { @@ -297,7 +300,7 @@ func NewRecordSets() *MyRecordSets { fieldCount: 0, } - rc.fields = make([]*ast.ResultField, 12) + rc.fields = make([]*ast.ResultField, 13) // 序号 rc.CreateFiled("order_id", mysql.TypeLong) @@ -321,6 +324,8 @@ func NewRecordSets() *MyRecordSets { rc.CreateFiled("sqlsha1", mysql.TypeString) // 备份用时 rc.CreateFiled("backup_time", mysql.TypeString) + // 判断该语句是否是需要被合并的(只有 alter table, create index, drop index三种语句需要被合并),不需要为0,已经被合并过的SQL会被设置为-1,需要的数字为对应的合并后的SQL的行号 + rc.CreateFiled("needMerge", mysql.TypeTiny) t.rc = rc return t @@ -394,6 +399,8 @@ func (s *MyRecordSets) setFields(r *Record) { row[11].SetString(r.BackupCostTime) } + row[12].SetValue(r.NeedMerge) + s.rc.data[s.rc.count] = row s.rc.count++ } diff --git a/session/session.go b/session/session.go index d8de3f24f..80227fef1 100644 --- a/session/session.go +++ b/session/session.go @@ -135,7 +135,19 @@ func (h *StmtHistory) Count() int { return len(h.history) } +// jwx added +type alterTableInfo struct { + Name string + alterStmtList []ast.AlterTableStmt + mergedSql string + recordSetsPosList []int // 记录当前语句在s.recordSets里的位置,用于修改needMerge字段 +} + type session struct { + + //jwx added + alterTableInfoList []alterTableInfo + // processInfo is used by ShowProcess(), and should be modified atomically. processInfo atomic.Value txn TxnState diff --git a/session/session_inception.go b/session/session_inception.go index bb7c0805f..007cac74b 100644 --- a/session/session_inception.go +++ b/session/session_inception.go @@ -318,6 +318,36 @@ func (s *session) executeInc(ctx context.Context, sql string) (recordSets []sqle s.initDisableTypes() continue case *ast.InceptionCommitStmt: + /******* jwx added 将对同一个表的多条alter语句合并成一条 ******/ + if s.inc.AlterAutoMerge { + for _, info := range s.alterTableInfoList { + if len(info.alterStmtList) >= 2 { + merged := info.alterStmtList[0] + for seq, alterStmt := range info.alterStmtList { + if seq > 0 { + merged.Specs = append(merged.Specs, alterStmt.Specs...) + } + } + var builder strings.Builder + _ = merged.Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &builder)) + info.mergedSql = builder.String() + mergedRecord := &Record{ + Sql: info.mergedSql, + Buf: new(bytes.Buffer), + Type: &merged, + Stage: StageCheck, + ErrorMessage: "MERGED", + NeedMerge: -1, + } + s.recordSets.Append(mergedRecord) + for _, pos := range info.recordSetsPosList { + s.recordSets.records[pos].NeedMerge = s.recordSets.SeqNo + } + } + + } + } + /****************/ if !s.haveBegin { s.appendErrorMsg("Must start as begin statement.") @@ -606,7 +636,7 @@ func (s *session) processCommand(ctx context.Context, stmtNode ast.StmtNode, case *ast.CreateTableStmt: s.checkCreateTable(node, currentSql) case *ast.AlterTableStmt: - s.checkAlterTable(node, currentSql) + s.checkAlterTable(node, currentSql, false) case *ast.DropTableStmt: s.checkDropTable(node, currentSql) case *ast.RenameTableStmt: @@ -629,11 +659,24 @@ func (s *session) processCommand(ctx context.Context, stmtNode ast.StmtNode, if node.KeyType == ast.IndexKeyTypeFullText { tp = ast.ConstraintFulltext } - s.checkCreateIndex(node.Table, node.IndexName, - node.IndexColNames, node.IndexOption, nil, node.Unique, tp) + if !s.inc.AlterAutoMerge { // jwx added + s.checkCreateIndex(node.Table, node.IndexName, + node.IndexColNames, node.IndexOption, nil, node.Unique, tp) + } else { + alter := s.convertCreateIndexToAlterTable(node) + s.checkAlterTable(alter, node.Text(), true) + s.checkCreateIndex(node.Table, node.IndexName, + node.IndexColNames, node.IndexOption, nil, node.Unique, tp) + } case *ast.DropIndexStmt: - s.checkDropIndex(node, currentSql) + if !s.inc.AlterAutoMerge { // jwx added + s.checkDropIndex(node, currentSql) + } else { + alter := s.convertDropIndexToAlterTable(node) + s.checkAlterTable(alter, node.Text(), true) + s.checkDropIndex(node, currentSql) + } case *ast.CreateViewStmt: s.checkCreateView(node, currentSql) @@ -3294,7 +3337,7 @@ func (s *session) checkTableCharsetCollation(character, collation string) { } } -func (s *session) checkAlterTable(node *ast.AlterTableStmt, sql string) { +func (s *session) checkAlterTable(node *ast.AlterTableStmt, sql string, mergeOnly bool) { log.Debug("checkAlterTable") if node.Table.Schema.O == "" { @@ -3310,6 +3353,34 @@ func (s *session) checkAlterTable(node *ast.AlterTableStmt, sql string) { return } + /*********** jwx added **********/ + if s.inc.AlterAutoMerge { + tableNameInString := fmt.Sprintf("%s.%s", node.Table.Schema.O, node.Table.Name.O) + var found bool = false + var seq int = 0 + for j, i := range s.alterTableInfoList { + if tableNameInString == i.Name { + found = true + seq = j + break + } + } + if found { + s.alterTableInfoList[seq].alterStmtList = append(s.alterTableInfoList[seq].alterStmtList, *node) + s.alterTableInfoList[seq].recordSetsPosList = append(s.alterTableInfoList[seq].recordSetsPosList, s.recordSets.SeqNo) + } else { + var info alterTableInfo = alterTableInfo{Name: tableNameInString} + info.alterStmtList = append(info.alterStmtList, *node) + info.recordSetsPosList = append(info.recordSetsPosList, s.recordSets.SeqNo) + s.alterTableInfoList = append(s.alterTableInfoList, info) + } + + if mergeOnly { + return + } + } + /******************************/ + table.AlterCount += 1 if table.AlterCount > 1 { @@ -5508,6 +5579,52 @@ func (s *session) checkAddConstraint(t *TableInfo, c *ast.AlterTableSpec) { } } +func (s *session) convertCreateIndexToAlterTable(node *ast.CreateIndexStmt) *ast.AlterTableStmt { + log.Debug("convertCreateIndexToAlterTable") + var alter *ast.AlterTableStmt = &ast.AlterTableStmt{Specs: []*ast.AlterTableSpec{}} + var spec *ast.AlterTableSpec = &ast.AlterTableSpec{Tp: ast.AlterTableAddConstraint, Constraint: &ast.Constraint{}} + spec.IfNotExists = node.IfNotExists + spec.Constraint.Name = node.IndexName + if node.Unique { + spec.Constraint.Tp = ast.ConstraintUniq + } else { + spec.Constraint.Tp = ast.ConstraintIndex + } + spec.Constraint.Keys = node.IndexColNames + spec.Constraint.Option = node.IndexOption + if node.LockAlg != nil { + spec.LockType = node.LockAlg.LockTp + spec.Algorithm = node.LockAlg.AlgorithmTp + } else { + spec.LockType = 0 + spec.Algorithm = 0 + } + spec.Partition = node.Partition + alter.SetText(node.Text()) + alter.Table = node.Table + alter.Specs = append(alter.Specs, spec) + return alter +} + +func (s *session) convertDropIndexToAlterTable(node *ast.DropIndexStmt) *ast.AlterTableStmt { + log.Debug("convertDropIndexToAlterTable") + var alter *ast.AlterTableStmt = &ast.AlterTableStmt{Specs: []*ast.AlterTableSpec{}} + var spec *ast.AlterTableSpec = &ast.AlterTableSpec{Tp: ast.AlterTableDropIndex} + spec.IfExists = node.IfExists + spec.Name = node.IndexName + if node.LockAlg != nil { + spec.LockType = node.LockAlg.LockTp + spec.Algorithm = node.LockAlg.AlgorithmTp + } else { + spec.LockType = 0 + spec.Algorithm = 0 + } + alter.SetText(node.Text()) + alter.Table = node.Table + alter.Specs = append(alter.Specs, spec) + return alter +} + func (s *session) checkDBExists(db string, reportNotExists bool) bool { if db == "" { diff --git a/session/tidb.go b/session/tidb.go index 9df1c734e..78ecff67d 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -238,7 +238,8 @@ func RegisterStore(name string, driver kv.Driver) error { // session.Open() but with the dbname cut off. // Examples: // goleveldb://relative/path -// boltdb:///absolute/path + +// boltdb:///absolute/path // // The engine should be registered before creating storage. func NewStore(path string) (kv.Storage, error) {