diff --git a/domain/plan_replayer_dump.go b/domain/plan_replayer_dump.go index 1281f1cb87c69..4eeea99061da8 100644 --- a/domain/plan_replayer_dump.go +++ b/domain/plan_replayer_dump.go @@ -71,7 +71,40 @@ type tableNameExtractor struct { err error } +<<<<<<< HEAD:domain/plan_replayer_dump.go func (tne *tableNameExtractor) Enter(in ast.Node) (ast.Node, bool) { +======= +func (tne *tableNameExtractor) getTablesAndViews() (map[tableNamePair]struct{}, error) { + r := make(map[tableNamePair]struct{}) + for tablePair := range tne.names { + if tablePair.IsView { + r[tablePair] = struct{}{} + continue + } + // remove cte in table names + _, ok := tne.cteNames[tablePair.TableName] + if !ok { + r[tablePair] = struct{}{} + } + // if the table has a foreign key, we need to add the referenced table to the list + tblInfo, err := tne.is.TableByName(tne.ctx, model.NewCIStr(tablePair.DBName), model.NewCIStr(tablePair.TableName)) + if err != nil { + return nil, err + } + for _, fk := range tblInfo.Meta().ForeignKeys { + key := tableNamePair{ + DBName: fk.RefSchema.L, + TableName: fk.RefTable.L, + IsView: false, + } + r[key] = struct{}{} + } + } + return r, nil +} + +func (*tableNameExtractor) Enter(in ast.Node) (ast.Node, bool) { +>>>>>>> 1eb0c8c1235 (domain: fix play replay dump cannot save the table in the foreign key's reference (#56512)):pkg/domain/plan_replayer_dump.go if _, ok := in.(*ast.TableName); ok { return in, true } @@ -543,6 +576,7 @@ func extractTableNames(ctx context.Context, sctx sessionctx.Context, if tableExtractor.err != nil { return nil, tableExtractor.err } +<<<<<<< HEAD:domain/plan_replayer_dump.go r := make(map[tableNamePair]struct{}) for tablePair := range tableExtractor.names { if tablePair.IsView { @@ -556,6 +590,9 @@ func extractTableNames(ctx context.Context, sctx sessionctx.Context, } } return r, nil +======= + return tableExtractor.getTablesAndViews() +>>>>>>> 1eb0c8c1235 (domain: fix play replay dump cannot save the table in the foreign key's reference (#56512)):pkg/domain/plan_replayer_dump.go } func getStatsForTable(do *Domain, tblJSONStats map[int64]*handle.JSONTable, pair tableNamePair) (*handle.JSONTable, error) { diff --git a/pkg/domain/extract.go b/pkg/domain/extract.go new file mode 100644 index 0000000000000..7f0032de93923 --- /dev/null +++ b/pkg/domain/extract.go @@ -0,0 +1,530 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package domain + +import ( + "archive/zip" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "math/rand" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/BurntSushi/toml" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" +) + +const ( + // ExtractMetaFile indicates meta file for extract + ExtractMetaFile = "extract_meta.txt" +) + +const ( + // ExtractTaskType indicates type of extract task + ExtractTaskType = "taskType" + // ExtractPlanTaskSkipStats indicates skip stats for extract plan task + ExtractPlanTaskSkipStats = "SkipStats" +) + +// ExtractType indicates type +type ExtractType uint8 + +const ( + // ExtractPlanType indicates extract plan task + ExtractPlanType ExtractType = iota +) + +func taskTypeToString(t ExtractType) string { + if t == ExtractPlanType { + return "Plan" + } + return "Unknown" +} + +// ExtractHandle handles the extractWorker to run extract the information task like Plan or any others. +// extractHandle will provide 2 mode for extractWorker: +// 1. submit a background extract task, the response will be returned after the task is started to be solved +// 2. submit a task and wait until the task is solved, the result will be returned to the response. +type ExtractHandle struct { + worker *extractWorker +} + +// newExtractHandler new extract handler +func newExtractHandler(ctx context.Context, sctxs []sessionctx.Context) *ExtractHandle { + h := &ExtractHandle{} + h.worker = newExtractWorker(ctx, sctxs[0], false) + return h +} + +// ExtractTask extract tasks +func (h *ExtractHandle) ExtractTask(ctx context.Context, task *ExtractTask) (string, error) { + // TODO: support background job later + if task.IsBackgroundJob { + return "", nil + } + return h.worker.extractTask(ctx, task) +} + +type extractWorker struct { + ctx context.Context + sctx sessionctx.Context + isBackgroundWorker bool + sync.Mutex +} + +// ExtractTask indicates task +type ExtractTask struct { + ExtractType ExtractType + IsBackgroundJob bool + + // Param for Extract Plan + SkipStats bool + UseHistoryView bool + + // variables for plan task type + Begin time.Time + End time.Time +} + +// NewExtractPlanTask returns extract plan task +func NewExtractPlanTask(begin, end time.Time) *ExtractTask { + return &ExtractTask{ + Begin: begin, + End: end, + ExtractType: ExtractPlanType, + } +} + +func newExtractWorker( + ctx context.Context, + sctx sessionctx.Context, + isBackgroundWorker bool, +) *extractWorker { + return &extractWorker{ + ctx: ctx, + sctx: sctx, + isBackgroundWorker: isBackgroundWorker, + } +} + +func (w *extractWorker) extractTask(ctx context.Context, task *ExtractTask) (string, error) { + if task.ExtractType == ExtractPlanType { + return w.extractPlanTask(ctx, task) + } + return "", errors.New("unknown extract task") +} + +func (w *extractWorker) extractPlanTask(ctx context.Context, task *ExtractTask) (string, error) { + if task.UseHistoryView && !config.GetGlobalConfig().Instance.StmtSummaryEnablePersistent { + return "", errors.New("tidb_stmt_summary_enable_persistent should be enabled for extract task") + } + records, err := w.collectRecords(ctx, task) + if err != nil { + logutil.BgLogger().Error("collect stmt summary records failed for extract plan task", zap.Error(err)) + return "", err + } + p, err := w.packageExtractPlanRecords(ctx, records) + if err != nil { + logutil.BgLogger().Error("package stmt summary records failed for extract plan task", zap.Error(err)) + return "", err + } + return w.dumpExtractPlanPackage(task, p) +} + +func (w *extractWorker) collectRecords(ctx context.Context, task *ExtractTask) (map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, error) { + w.Lock() + defer w.Unlock() + exec := w.sctx.GetRestrictedSQLExecutor() + ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) + sourceTable := "STATEMENTS_SUMMARY_HISTORY" + if !task.UseHistoryView { + sourceTable = "STATEMENTS_SUMMARY" + } + rows, _, err := exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("SELECT STMT_TYPE, DIGEST, PLAN_DIGEST,QUERY_SAMPLE_TEXT, BINARY_PLAN, TABLE_NAMES, SAMPLE_USER FROM INFORMATION_SCHEMA.%s WHERE SUMMARY_END_TIME > '%s' AND SUMMARY_BEGIN_TIME < '%s'", + sourceTable, task.Begin.Format(types.TimeFormat), task.End.Format(types.TimeFormat))) + if err != nil { + return nil, err + } + collectMap := make(map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, 0) + for _, row := range rows { + record := &stmtSummaryHistoryRecord{} + record.stmtType = row.GetString(0) + record.digest = row.GetString(1) + record.planDigest = row.GetString(2) + record.sql = row.GetString(3) + record.binaryPlan = row.GetString(4) + tableNames := row.GetString(5) + key := stmtSummaryHistoryKey{ + digest: record.digest, + planDigest: record.planDigest, + } + record.userName = row.GetString(6) + record.tables = make([]tableNamePair, 0) + setRecord, err := w.handleTableNames(tableNames, record) + if err != nil { + return nil, err + } + if setRecord && checkRecordValid(record) { + collectMap[key] = record + } + } + return collectMap, nil +} + +func (w *extractWorker) handleTableNames(tableNames string, record *stmtSummaryHistoryRecord) (bool, error) { + is := GetDomain(w.sctx).InfoSchema() + for _, t := range strings.Split(tableNames, ",") { + names := strings.Split(t, ".") + if len(names) != 2 { + return false, nil + } + dbName := names[0] + tblName := names[1] + record.schemaName = dbName + // skip internal schema record + switch strings.ToLower(record.schemaName) { + case util.PerformanceSchemaName.L, util.InformationSchemaName.L, util.MetricSchemaName.L, "mysql": + return false, nil + } + exists := is.TableExists(model.NewCIStr(dbName), model.NewCIStr(tblName)) + if !exists { + return false, nil + } + t, err := is.TableByName(w.ctx, model.NewCIStr(dbName), model.NewCIStr(tblName)) + if err != nil { + return false, err + } + record.tables = append(record.tables, tableNamePair{DBName: dbName, TableName: tblName, IsView: t.Meta().IsView()}) + } + return true, nil +} + +func checkRecordValid(r *stmtSummaryHistoryRecord) bool { + if r.stmtType != "Select" { + return false + } + if r.schemaName == "" { + return false + } + if r.planDigest == "" { + return false + } + return true +} + +func (w *extractWorker) packageExtractPlanRecords(ctx context.Context, records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord) (*extractPlanPackage, error) { + p := &extractPlanPackage{} + p.records = records + p.tables = make(map[tableNamePair]struct{}, 0) + for _, record := range records { + // skip the sql which has been cut off + if strings.Contains(record.sql, "(len:") { + record.skip = true + continue + } + plan, err := w.decodeBinaryPlan(ctx, record.binaryPlan) + if err != nil { + return nil, err + } + record.plan = plan + for _, tbl := range record.tables { + p.tables[tbl] = struct{}{} + } + } + if err := w.handleIsView(ctx, p); err != nil { + return nil, err + } + return p, nil +} + +func (w *extractWorker) handleIsView(ctx context.Context, p *extractPlanPackage) error { + is := GetDomain(w.sctx).InfoSchema() + tne := &tableNameExtractor{ + ctx: ctx, + executor: w.sctx.GetRestrictedSQLExecutor(), + is: is, + curDB: model.NewCIStr(""), + names: make(map[tableNamePair]struct{}), + cteNames: make(map[string]struct{}), + } + for v := range p.tables { + if v.IsView { + v, err := is.TableByName(w.ctx, model.NewCIStr(v.DBName), model.NewCIStr(v.TableName)) + if err != nil { + return err + } + sql := v.Meta().View.SelectStmt + node, err := tne.executor.ParseWithParams(tne.ctx, sql) + if err != nil { + return err + } + node.Accept(tne) + } + } + if tne.err != nil { + return tne.err + } + r, err := tne.getTablesAndViews() + if err != nil { + return err + } + for t := range r { + p.tables[t] = struct{}{} + } + return nil +} + +func (w *extractWorker) decodeBinaryPlan(ctx context.Context, bPlan string) (string, error) { + exec := w.sctx.GetRestrictedSQLExecutor() + ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) + rows, _, err := exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("SELECT tidb_decode_binary_plan('%s')", bPlan)) + if err != nil { + return "", err + } + plan := rows[0].GetString(0) + return strings.Trim(plan, "\n"), nil +} + +// dumpExtractPlanPackage will dump the information about sqls collected in stmt_summary_history +// The files will be organized into the following format: +/* + |-extract_meta.txt + |-meta.txt + |-config.toml + |-variables.toml + |-bindings.sql + |-schema + | |-schema_meta.txt + | |-db1.table1.schema.txt + | |-db2.table2.schema.txt + | |-.... + |-view + | |-db1.view1.view.txt + | |-db2.view2.view.txt + | |-.... + |-stats + | |-stats1.json + | |-stats2.json + | |-.... + |-table_tiflash_replica.txt + |-sql + | |-digest1.sql + | |-digest2.sql + | |-.... + |-skippedSQLs + | |-digest1.sql + | |-... +*/ +func (w *extractWorker) dumpExtractPlanPackage(task *ExtractTask, p *extractPlanPackage) (name string, err error) { + f, name, err := GenerateExtractFile() + if err != nil { + return "", err + } + zw := zip.NewWriter(f) + defer func() { + if err != nil { + logutil.BgLogger().Error("dump extract plan task failed", zap.Error(err)) + } + if err1 := zw.Close(); err1 != nil { + logutil.BgLogger().Warn("close zip file failed", zap.String("file", name), zap.Error(err)) + } + if err1 := f.Close(); err1 != nil { + logutil.BgLogger().Warn("close file failed", zap.String("file", name), zap.Error(err)) + } + }() + + // Dump config + if err = dumpConfig(zw); err != nil { + return "", err + } + // Dump meta + if err = dumpMeta(zw); err != nil { + return "", err + } + // dump extract plan task meta + if err = dumpExtractMeta(task, zw); err != nil { + return "", err + } + // Dump Schema and View + if err = dumpSchemas(w.sctx, zw, p.tables); err != nil { + return "", err + } + // Dump tables tiflash replicas + if err = dumpTiFlashReplica(w.sctx, zw, p.tables); err != nil { + return "", err + } + // Dump variables + if err = dumpVariables(w.sctx, w.sctx.GetSessionVars(), zw); err != nil { + return "", err + } + // Dump global bindings + if err = dumpGlobalBindings(w.sctx, zw); err != nil { + return "", err + } + // Dump stats + if !task.SkipStats { + if _, err = dumpStats(zw, p.tables, GetDomain(w.sctx), 0); err != nil { + return "", err + } + } + // Dump sqls and plan + if err = dumpSQLRecords(p.records, zw); err != nil { + return "", err + } + return name, nil +} + +func dumpSQLRecords(records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, zw *zip.Writer) error { + for key, record := range records { + if record.skip { + err := dumpSQLRecord(record, fmt.Sprintf("skippedSQLs/%v.json", key.digest), zw) + if err != nil { + return err + } + } else { + err := dumpSQLRecord(record, fmt.Sprintf("SQLs/%v.json", key.digest), zw) + if err != nil { + return err + } + } + } + return nil +} + +type singleSQLRecord struct { + Schema string `json:"schema"` + Plan string `json:"plan"` + SQL string `json:"sql"` + Digest string `json:"digest"` + BinaryPlan string `json:"binaryPlan"` + UserName string `json:"userName"` +} + +// dumpSQLRecord dumps sql records into one file for each record, the format is in json. +func dumpSQLRecord(record *stmtSummaryHistoryRecord, path string, zw *zip.Writer) error { + zf, err := zw.Create(path) + if err != nil { + return err + } + singleSQLRecord := &singleSQLRecord{ + Schema: record.schemaName, + Plan: record.plan, + SQL: record.sql, + Digest: record.digest, + BinaryPlan: record.binaryPlan, + UserName: record.userName, + } + content, err := json.Marshal(singleSQLRecord) + if err != nil { + return err + } + _, err = zf.Write(content) + if err != nil { + return err + } + return nil +} + +func dumpExtractMeta(task *ExtractTask, zw *zip.Writer) error { + cf, err := zw.Create(ExtractMetaFile) + if err != nil { + return errors.AddStack(err) + } + varMap := make(map[string]string) + varMap[ExtractTaskType] = taskTypeToString(task.ExtractType) + if task.ExtractType == ExtractPlanType { + varMap[ExtractPlanTaskSkipStats] = strconv.FormatBool(task.SkipStats) + } + + if err := toml.NewEncoder(cf).Encode(varMap); err != nil { + return errors.AddStack(err) + } + return nil +} + +type extractPlanPackage struct { + tables map[tableNamePair]struct{} + records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord +} + +type stmtSummaryHistoryKey struct { + digest string + planDigest string +} + +type stmtSummaryHistoryRecord struct { + stmtType string + schemaName string + tables []tableNamePair + digest string + planDigest string + sql string + binaryPlan string + userName string + + plan string + skip bool +} + +// GenerateExtractFile generates extract stmt file +func GenerateExtractFile() (*os.File, string, error) { + path := GetExtractTaskDirName() + err := os.MkdirAll(path, os.ModePerm) + if err != nil { + return nil, "", errors.AddStack(err) + } + fileName, err := generateExtractStmtFile() + if err != nil { + return nil, "", errors.AddStack(err) + } + zf, err := os.Create(filepath.Join(path, fileName)) + if err != nil { + return nil, "", errors.AddStack(err) + } + return zf, fileName, err +} + +func generateExtractStmtFile() (string, error) { + // Generate key and create zip file + time := time.Now().UnixNano() + b := make([]byte, 16) + //nolint: gosec + _, err := rand.Read(b) + if err != nil { + return "", err + } + key := base64.URLEncoding.EncodeToString(b) + return fmt.Sprintf("extract_%v_%v.zip", key, time), nil +} + +// GetExtractTaskDirName get extract dir name +func GetExtractTaskDirName() string { + tidbLogDir := filepath.Dir(config.GetGlobalConfig().Log.File.Filename) + return filepath.Join(tidbLogDir, "extract") +} diff --git a/pkg/server/handler/optimizor/BUILD.bazel b/pkg/server/handler/optimizor/BUILD.bazel new file mode 100644 index 0000000000000..f1281d3456b1e --- /dev/null +++ b/pkg/server/handler/optimizor/BUILD.bazel @@ -0,0 +1,75 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "optimizor", + srcs = [ + "optimize_trace.go", + "plan_replayer.go", + "statistics_handler.go", + ], + importpath = "github.com/pingcap/tidb/pkg/server/handler/optimizor", + visibility = ["//visibility:public"], + deps = [ + "//pkg/domain", + "//pkg/domain/infosync", + "//pkg/infoschema", + "//pkg/meta/model", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/server/handler", + "//pkg/session", + "//pkg/sessionctx/variable", + "//pkg/statistics/handle", + "//pkg/statistics/handle/util", + "//pkg/table", + "//pkg/types", + "//pkg/util", + "//pkg/util/logutil", + "//pkg/util/replayer", + "@com_github_burntsushi_toml//:toml", + "@com_github_gorilla_mux//:mux", + "@com_github_pingcap_errors//:errors", + "@com_github_tikv_client_go_v2//oracle", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "optimizor_test", + timeout = "short", + srcs = [ + "main_test.go", + "optimize_trace_test.go", + "plan_replayer_test.go", + "statistics_handler_test.go", + ], + flaky = True, + shard_count = 6, + deps = [ + ":optimizor", + "//pkg/config", + "//pkg/domain", + "//pkg/kv", + "//pkg/metrics", + "//pkg/parser/model", + "//pkg/server", + "//pkg/server/internal/testserverclient", + "//pkg/server/internal/testutil", + "//pkg/server/internal/util", + "//pkg/session", + "//pkg/statistics/handle/util", + "//pkg/store/mockstore/unistore", + "//pkg/testkit", + "//pkg/testkit/testsetup", + "//pkg/util/replayer", + "//pkg/util/topsql/state", + "@com_github_burntsushi_toml//:toml", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_gorilla_mux//:mux", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/pkg/server/handler/optimizor/plan_replayer_test.go b/pkg/server/handler/optimizor/plan_replayer_test.go new file mode 100644 index 0000000000000..5ddbc1e4baed1 --- /dev/null +++ b/pkg/server/handler/optimizor/plan_replayer_test.go @@ -0,0 +1,628 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package optimizor_test + +import ( + "archive/zip" + "bytes" + "context" + "database/sql" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "slices" + "strconv" + "strings" + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/go-sql-driver/mysql" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/server" + "github.com/pingcap/tidb/pkg/server/internal/testserverclient" + "github.com/pingcap/tidb/pkg/server/internal/testutil" + "github.com/pingcap/tidb/pkg/server/internal/util" + "github.com/pingcap/tidb/pkg/session" + util2 "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/replayer" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" +) + +var expectedFilesInReplayer = []string{ + "config.toml", + "debug_trace/debug_trace0.json", + "explain.txt", + "global_bindings.sql", + "meta.txt", + "schema/planreplayer.t.schema.txt", + "schema/schema_meta.txt", + "session_bindings.sql", + "sql/sql0.sql", + "sql_meta.toml", + "stats/planreplayer.t.json", + "statsMem/planreplayer.t.txt", + "table_tiflash_replica.txt", + "variables.toml", +} + +var expectedFilesInReplayerForCapture = []string{ + "config.toml", + "debug_trace/debug_trace0.json", + "explain/sql.txt", + "global_bindings.sql", + "meta.txt", + "schema/planreplayer.t.schema.txt", + "schema/schema_meta.txt", + "session_bindings.sql", + "sql/sql0.sql", + "sql_meta.toml", + "stats/planreplayer.t.json", + "statsMem/planreplayer.t.txt", + "table_tiflash_replica.txt", + "variables.toml", +} + +func prepareServerAndClientForTest(t *testing.T, store kv.Storage, dom *domain.Domain) (srv *server.Server, client *testserverclient.TestServerClient) { + driver := server.NewTiDBDriver(store) + client = testserverclient.NewTestServerClient() + + cfg := util.NewTestConfig() + cfg.Port = client.Port + cfg.Status.StatusPort = client.StatusPort + cfg.Status.ReportStatus = true + + srv, err := server.NewServer(cfg, driver) + srv.SetDomain(dom) + require.NoError(t, err) + go func() { + err := srv.Run(nil) + require.NoError(t, err) + }() + <-server.RunInGoTestChan + client.Port = testutil.GetPortFromTCPAddr(srv.ListenAddr()) + client.StatusPort = testutil.GetPortFromTCPAddr(srv.StatusListenerAddr()) + client.WaitUntilServerOnline() + return +} + +func TestDumpPlanReplayerAPI(t *testing.T) { + store := testkit.CreateMockStore(t) + dom, err := session.GetDomain(store) + require.NoError(t, err) + // 1. setup and prepare plan replayer files by manual command and capture + server, client := prepareServerAndClientForTest(t, store, dom) + defer server.Close() + + filename, fileNameFromCapture := prepareData4PlanReplayer(t, client, dom) + defer os.RemoveAll(replayer.GetPlanReplayerDirName()) + + // 2. check the contents of the plan replayer zip files. + + var filesInReplayer []string + collectFileNameAndAssertFileSize := func(f *zip.File) { + // collect file name + filesInReplayer = append(filesInReplayer, f.Name) + // except for {global,session}_bindings.sql and table_tiflash_replica.txt, the file should not be empty + if !strings.Contains(f.Name, "table_tiflash_replica.txt") && + !strings.Contains(f.Name, "bindings.sql") { + require.NotZero(t, f.UncompressedSize64, f.Name) + } + } + + // 2-1. check the plan replayer file from manual command + resp0, err := client.FetchStatus(filepath.Join("/plan_replayer/dump/", filename)) + require.NoError(t, err) + defer func() { + require.NoError(t, resp0.Body.Close()) + }() + body, err := io.ReadAll(resp0.Body) + require.NoError(t, err) + forEachFileInZipBytes(t, body, collectFileNameAndAssertFileSize) + slices.Sort(filesInReplayer) + require.Equal(t, expectedFilesInReplayer, filesInReplayer) + + // 2-2. check the plan replayer file from capture + resp1, err := client.FetchStatus(filepath.Join("/plan_replayer/dump/", fileNameFromCapture)) + require.NoError(t, err) + defer func() { + require.NoError(t, resp1.Body.Close()) + }() + body, err = io.ReadAll(resp1.Body) + require.NoError(t, err) + filesInReplayer = filesInReplayer[:0] + forEachFileInZipBytes(t, body, collectFileNameAndAssertFileSize) + slices.Sort(filesInReplayer) + require.Equal(t, expectedFilesInReplayerForCapture, filesInReplayer) + + // 3. check plan replayer load + + // 3-1. write the plan replayer file from manual command to a file + path := "/tmp/plan_replayer.zip" + fp, err := os.Create(path) + require.NoError(t, err) + require.NotNil(t, fp) + defer func() { + require.NoError(t, fp.Close()) + require.NoError(t, os.Remove(path)) + }() + + _, err = io.Copy(fp, bytes.NewReader(body)) + require.NoError(t, err) + require.NoError(t, fp.Sync()) + + // 3-2. connect to tidb and use PLAN REPLAYER LOAD to load this file + db, err := sql.Open("mysql", client.GetDSN(func(config *mysql.Config) { + config.AllowAllFiles = true + })) + require.NoError(t, err, "Error connecting") + defer func() { + err := db.Close() + require.NoError(t, err) + }() + tk := testkit.NewDBTestKit(t, db) + + tk.MustExec("use planReplayer") + tk.MustExec("drop table planReplayer.t") + tk.MustExec(`plan replayer load "/tmp/plan_replayer.zip"`) + + // 3-3. assert that the count and modify count in the stats is as expected + rows := tk.MustQuery("show stats_meta") + require.True(t, rows.Next(), "unexpected data") + var dbName, tableName string + var modifyCount, count int64 + var other any + err = rows.Scan(&dbName, &tableName, &other, &other, &modifyCount, &count, &other) + require.NoError(t, err) + require.Equal(t, "planReplayer", dbName) + require.Equal(t, "t", tableName) + require.Equal(t, int64(4), modifyCount) + require.Equal(t, int64(8), count) +} + +// prepareData4PlanReplayer trigger tidb to dump 2 plan replayer files, +// one by manual command, the other by capture, and return the filenames. +func prepareData4PlanReplayer(t *testing.T, client *testserverclient.TestServerClient, dom *domain.Domain) (string, string) { + h := dom.StatsHandle() + replayerHandle := dom.GetPlanReplayerHandle() + db, err := sql.Open("mysql", client.GetDSN()) + require.NoError(t, err, "Error connecting") + defer func() { + err := db.Close() + require.NoError(t, err) + }() + tk := testkit.NewDBTestKit(t, db) + + tk.MustExec("create database planReplayer") + tk.MustExec("use planReplayer") + tk.MustExec("create table t(a int)") + err = h.HandleDDLEvent(<-h.DDLEventCh()) + require.NoError(t, err) + tk.MustExec("insert into t values(1), (2), (3), (4)") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + tk.MustExec("analyze table t") + tk.MustExec("insert into t values(5), (6), (7), (8)") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + rows := tk.MustQuery("plan replayer dump explain select * from t") + require.True(t, rows.Next(), "unexpected data") + var filename string + require.NoError(t, rows.Scan(&filename)) + require.NoError(t, rows.Close()) + rows = tk.MustQuery("select @@tidb_last_plan_replayer_token") + require.True(t, rows.Next(), "unexpected data") + var filename2 string + require.NoError(t, rows.Scan(&filename2)) + require.NoError(t, rows.Close()) + require.Equal(t, filename, filename2) + + tk.MustExec("plan replayer capture 'e5796985ccafe2f71126ed6c0ac939ffa015a8c0744a24b7aee6d587103fd2f7' '*'") + tk.MustQuery("select * from t") + task := replayerHandle.DrainTask() + require.NotNil(t, task) + worker := replayerHandle.GetWorker() + require.True(t, worker.HandleTask(task)) + rows = tk.MustQuery("select token from mysql.plan_replayer_status where length(sql_digest) > 0") + require.True(t, rows.Next(), "unexpected data") + var filename3 string + require.NoError(t, rows.Scan(&filename3)) + require.NoError(t, rows.Close()) + + return filename, filename3 +} + +func TestIssue56458(t *testing.T) { + store := testkit.CreateMockStore(t) + dom, err := session.GetDomain(store) + require.NoError(t, err) + // 1. setup and prepare plan replayer files by manual command and capture + server, client := prepareServerAndClientForTest(t, store, dom) + defer server.Close() + + filename := prepareData4Issue56458(t, client, dom) + defer os.RemoveAll(replayer.GetPlanReplayerDirName()) + + // 2. check the contents of the plan replayer zip files. + var filesInReplayer []string + collectFileNameAndAssertFileSize := func(f *zip.File) { + // collect file name + filesInReplayer = append(filesInReplayer, f.Name) + } + + // 2-1. check the plan replayer file from manual command + resp0, err := client.FetchStatus(filepath.Join("/plan_replayer/dump/", filename)) + require.NoError(t, err) + defer func() { + require.NoError(t, resp0.Body.Close()) + }() + body, err := io.ReadAll(resp0.Body) + require.NoError(t, err) + forEachFileInZipBytes(t, body, collectFileNameAndAssertFileSize) + slices.Sort(filesInReplayer) + require.Equal(t, []string{ + "config.toml", + "debug_trace/debug_trace0.json", + "explain.txt", + "global_bindings.sql", + "meta.txt", + "schema/planreplayer.t.schema.txt", + "schema/planreplayer.v.schema.txt", + "schema/schema_meta.txt", + "session_bindings.sql", + "sql/sql0.sql", + "sql_meta.toml", + "stats/planreplayer.t.json", + "stats/planreplayer.v.json", + "statsMem/planreplayer.t.txt", + "statsMem/planreplayer.v.txt", + "table_tiflash_replica.txt", + "variables.toml", + }, filesInReplayer) +} + +func TestIssue43192(t *testing.T) { + store := testkit.CreateMockStore(t) + dom, err := session.GetDomain(store) + require.NoError(t, err) + // 1. setup and prepare plan replayer files by manual command and capture + server, client := prepareServerAndClientForTest(t, store, dom) + defer server.Close() + + filename := prepareData4Issue43192(t, client, dom) + defer os.RemoveAll(replayer.GetPlanReplayerDirName()) + + // 2. check the contents of the plan replayer zip files. + var filesInReplayer []string + collectFileNameAndAssertFileSize := func(f *zip.File) { + // collect file name + filesInReplayer = append(filesInReplayer, f.Name) + } + + // 2-1. check the plan replayer file from manual command + resp0, err := client.FetchStatus(filepath.Join("/plan_replayer/dump/", filename)) + require.NoError(t, err) + defer func() { + require.NoError(t, resp0.Body.Close()) + }() + body, err := io.ReadAll(resp0.Body) + require.NoError(t, err) + forEachFileInZipBytes(t, body, collectFileNameAndAssertFileSize) + slices.Sort(filesInReplayer) + require.Equal(t, expectedFilesInReplayer, filesInReplayer) + + // 3. check plan replayer load + // 3-1. write the plan replayer file from manual command to a file + path := "/tmp/plan_replayer.zip" + fp, err := os.Create(path) + require.NoError(t, err) + require.NotNil(t, fp) + defer func() { + require.NoError(t, fp.Close()) + require.NoError(t, os.Remove(path)) + }() + + _, err = io.Copy(fp, bytes.NewReader(body)) + require.NoError(t, err) + require.NoError(t, fp.Sync()) + + // 3-2. connect to tidb and use PLAN REPLAYER LOAD to load this file + db, err := sql.Open("mysql", client.GetDSN(func(config *mysql.Config) { + config.AllowAllFiles = true + })) + require.NoError(t, err, "Error connecting") + defer func() { + err := db.Close() + require.NoError(t, err) + }() + tk := testkit.NewDBTestKit(t, db) + tk.MustExec("use planReplayer") + tk.MustExec("drop table planReplayer.t") + tk.MustExec(`plan replayer load "/tmp/plan_replayer.zip"`) + + // 3-3. check whether binding takes effect + tk.MustExec(`select a, b from t where a in (1, 2, 3)`) + rows := tk.MustQuery("select @@last_plan_from_binding") + require.True(t, rows.Next(), "unexpected data") + var count int64 + err = rows.Scan(&count) + require.NoError(t, err) + require.Equal(t, int64(1), count) +} + +func prepareData4Issue43192(t *testing.T, client *testserverclient.TestServerClient, dom *domain.Domain) string { + h := dom.StatsHandle() + db, err := sql.Open("mysql", client.GetDSN()) + require.NoError(t, err, "Error connecting") + defer func() { + err := db.Close() + require.NoError(t, err) + }() + tk := testkit.NewDBTestKit(t, db) + + tk.MustExec("create database planReplayer") + tk.MustExec("use planReplayer") + tk.MustExec("create table t(a int, b int, INDEX ia (a), INDEX ib (b));") + err = h.HandleDDLEvent(<-h.DDLEventCh()) + require.NoError(t, err) + tk.MustExec("create global binding for select a, b from t where a in (1, 2, 3) using select a, b from t use index (ib) where a in (1, 2, 3)") + rows := tk.MustQuery("plan replayer dump explain select a, b from t where a in (1, 2, 3)") + require.True(t, rows.Next(), "unexpected data") + var filename string + require.NoError(t, rows.Scan(&filename)) + require.NoError(t, rows.Close()) + rows = tk.MustQuery("select @@tidb_last_plan_replayer_token") + require.True(t, rows.Next(), "unexpected data") + return filename +} + +func prepareData4Issue56458(t *testing.T, client *testserverclient.TestServerClient, dom *domain.Domain) string { + h := dom.StatsHandle() + db, err := sql.Open("mysql", client.GetDSN()) + require.NoError(t, err, "Error connecting") + defer func() { + err := db.Close() + require.NoError(t, err) + }() + tk := testkit.NewDBTestKit(t, db) + + tk.MustExec("create database planReplayer") + tk.MustExec("use planReplayer") + tk.MustExec("CREATE TABLE v(id INT PRIMARY KEY AUTO_INCREMENT);") + tk.MustExec("create table t(a int, b int, INDEX ia (a), INDEX ib (b), author_id int, FOREIGN KEY (author_id) REFERENCES v(id) ON DELETE CASCADE);") + err = h.HandleDDLEvent(<-h.DDLEventCh()) + require.NoError(t, err) + tk.MustExec("create global binding for select a, b from t where a in (1, 2, 3) using select a, b from t use index (ib) where a in (1, 2, 3)") + rows := tk.MustQuery("plan replayer dump explain select a, b from t where a in (1, 2, 3)") + require.True(t, rows.Next(), "unexpected data") + var filename string + require.NoError(t, rows.Scan(&filename)) + require.NoError(t, rows.Close()) + rows = tk.MustQuery("select @@tidb_last_plan_replayer_token") + require.True(t, rows.Next(), "unexpected data") + return filename +} + +func forEachFileInZipBytes(t *testing.T, b []byte, fn func(file *zip.File)) { + br := bytes.NewReader(b) + z, err := zip.NewReader(br, int64(len(b))) + require.NoError(t, err) + for _, f := range z.File { + fn(f) + } +} + +func fetchZipFromPlanReplayerAPI(t *testing.T, client *testserverclient.TestServerClient, filename string) *zip.Reader { + resp0, err := client.FetchStatus(filepath.Join("/plan_replayer/dump/", filename)) + require.NoError(t, err) + defer func() { + require.NoError(t, resp0.Body.Close()) + }() + body, err := io.ReadAll(resp0.Body) + require.NoError(t, err) + b := bytes.NewReader(body) + z, err := zip.NewReader(b, int64(len(body))) + require.NoError(t, err) + return z +} + +func getInfoFromPlanReplayerZip( + t *testing.T, + z *zip.Reader, +) ( + jsonTbls []*util2.JSONTable, + metas []map[string]string, + errMsgs []string, +) { + for _, zipFile := range z.File { + if strings.HasPrefix(zipFile.Name, "stats/") { + jsonTbl := &util2.JSONTable{} + r, err := zipFile.Open() + require.NoError(t, err) + //nolint: all_revive + defer func() { + require.NoError(t, r.Close()) + }() + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(r) + require.NoError(t, err) + err = json.Unmarshal(buf.Bytes(), jsonTbl) + require.NoError(t, err) + + jsonTbls = append(jsonTbls, jsonTbl) + } else if zipFile.Name == "sql_meta.toml" { + meta := make(map[string]string) + r, err := zipFile.Open() + require.NoError(t, err) + //nolint: all_revive + defer func() { + require.NoError(t, r.Close()) + }() + _, err = toml.NewDecoder(r).Decode(&meta) + require.NoError(t, err) + + metas = append(metas, meta) + } else if zipFile.Name == "errors.txt" { + r, err := zipFile.Open() + require.NoError(t, err) + //nolint: all_revive + defer func() { + require.NoError(t, r.Close()) + }() + content, err := io.ReadAll(r) + require.NoError(t, err) + errMsgs = strings.Split(string(content), "\n") + } + } + return +} + +func TestDumpPlanReplayerAPIWithHistoryStats(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats")) + }() + store := testkit.CreateMockStore(t) + dom, err := session.GetDomain(store) + require.NoError(t, err) + server, client := prepareServerAndClientForTest(t, store, dom) + defer server.Close() + statsHandle := dom.StatsHandle() + hsWorker := dom.GetHistoricalStatsWorker() + + // 1. prepare test data + + // time1, ts1: before everything starts + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + defer tk.MustExec("set global tidb_enable_historical_stats = 0") + time1 := time.Now() + ts1 := oracle.GoTimeToTS(time1) + + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, c int, index ia(a))") + is := dom.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tblInfo := tbl.Meta() + + // 1-1. first insert and first analyze, trigger first dump history stats + tk.MustExec("insert into t value(1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("analyze table t with 1 samplerate") + tblID := hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, statsHandle) + require.NoError(t, err) + + // time2, stats1: after first analyze + time2 := time.Now() + ts2 := oracle.GoTimeToTS(time2) + stats1, err := statsHandle.DumpStatsToJSON("test", tblInfo, nil, true) + require.NoError(t, err) + + // 1-2. second insert and second analyze, trigger second dump history stats + tk.MustExec("insert into t value(4,4,4), (5,5,5), (6,6,6)") + tk.MustExec("analyze table t with 1 samplerate") + tblID = hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, statsHandle) + require.NoError(t, err) + + // time3, stats2: after second analyze + time3 := time.Now() + ts3 := oracle.GoTimeToTS(time3) + stats2, err := statsHandle.DumpStatsToJSON("test", tblInfo, nil, true) + require.NoError(t, err) + + // 2. get the plan replayer and assert + + template := "plan replayer dump with stats as of timestamp '%s' explain %s" + query := "select * from t where a > 1" + + // 2-1. specify time1 to get the plan replayer + filename1 := tk.MustQuery( + fmt.Sprintf(template, strconv.FormatUint(ts1, 10), query), + ).Rows()[0][0].(string) + zip1 := fetchZipFromPlanReplayerAPI(t, client, filename1) + jsonTbls1, metas1, errMsg1 := getInfoFromPlanReplayerZip(t, zip1) + + // the TS is recorded in the plan replayer, and it's the same as the TS we calculated above + require.Len(t, metas1, 1) + require.Contains(t, metas1[0], "historicalStatsTS") + tsInReplayerMeta1, err := strconv.ParseUint(metas1[0]["historicalStatsTS"], 10, 64) + require.NoError(t, err) + require.Equal(t, ts1, tsInReplayerMeta1) + + // the result is the same as stats2, and IsHistoricalStats is false. + require.Len(t, jsonTbls1, 1) + require.False(t, jsonTbls1[0].IsHistoricalStats) + require.Equal(t, jsonTbls1[0], stats2) + + // because we failed to get historical stats, there's an error message. + require.Equal(t, []string{"Historical stats for test.t are unavailable, fallback to latest stats", ""}, errMsg1) + + // 2-2. specify time2 to get the plan replayer + filename2 := tk.MustQuery( + fmt.Sprintf(template, time2.Format("2006-01-02 15:04:05.000000"), query), + ).Rows()[0][0].(string) + zip2 := fetchZipFromPlanReplayerAPI(t, client, filename2) + jsonTbls2, metas2, errMsg2 := getInfoFromPlanReplayerZip(t, zip2) + + // the TS is recorded in the plan replayer, and it's the same as the TS we calculated above + require.Len(t, metas2, 1) + require.Contains(t, metas2[0], "historicalStatsTS") + tsInReplayerMeta2, err := strconv.ParseUint(metas2[0]["historicalStatsTS"], 10, 64) + require.NoError(t, err) + require.Equal(t, ts2, tsInReplayerMeta2) + + // the result is the same as stats1, and IsHistoricalStats is true. + require.Len(t, jsonTbls2, 1) + require.True(t, jsonTbls2[0].IsHistoricalStats) + jsonTbls2[0].IsHistoricalStats = false + require.Equal(t, jsonTbls2[0], stats1) + + // succeeded to get historical stats, there should be no error message. + require.Empty(t, errMsg2) + + // 2-3. specify time3 to get the plan replayer + filename3 := tk.MustQuery( + fmt.Sprintf(template, time3.Format("2006-01-02T15:04:05.000000Z07:00"), query), + ).Rows()[0][0].(string) + zip3 := fetchZipFromPlanReplayerAPI(t, client, filename3) + jsonTbls3, metas3, errMsg3 := getInfoFromPlanReplayerZip(t, zip3) + + // the TS is recorded in the plan replayer, and it's the same as the TS we calculated above + require.Len(t, metas3, 1) + require.Contains(t, metas3[0], "historicalStatsTS") + tsInReplayerMeta3, err := strconv.ParseUint(metas3[0]["historicalStatsTS"], 10, 64) + require.NoError(t, err) + require.Equal(t, ts3, tsInReplayerMeta3) + + // the result is the same as stats2, and IsHistoricalStats is true. + require.Len(t, jsonTbls3, 1) + require.True(t, jsonTbls3[0].IsHistoricalStats) + jsonTbls3[0].IsHistoricalStats = false + require.Equal(t, jsonTbls3[0], stats2) + + // succeeded to get historical stats, there should be no error message. + require.Empty(t, errMsg3) + + // 3. remove the plan replayer files generated during the test + gcHandler := dom.GetDumpFileGCChecker() + gcHandler.GCDumpFiles(0, 0) +}