Skip to content

Commit

Permalink
Explain the pipeline information of SQL physical plan (matrixorigin#1…
Browse files Browse the repository at this point in the history
…8676)

Explain the pipeline information of SQL physical plan

Approved by: @badboynt1, @daviszhen, @iamlinjunhong, @aunjgr, @ouyuanning, @heni02
  • Loading branch information
qingxinhome authored Sep 12, 2024
1 parent 303d13d commit e115c58
Show file tree
Hide file tree
Showing 19 changed files with 9,628 additions and 8,962 deletions.
2 changes: 1 addition & 1 deletion pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5298,7 +5298,7 @@ func determinePrivilegeSetOfStatement(stmt tree.Statement) *privilege {
kind = privilegeKindSpecial
special = specialTagAdmin
canExecInRestricted = true
case *tree.ExplainFor, *tree.ExplainAnalyze, *tree.ExplainStmt:
case *tree.ExplainFor, *tree.ExplainAnalyze, *tree.ExplainStmt, *tree.ExplainPhyPlan:
objType = objectTypeNone
kind = privilegeKindNone
case *tree.BeginTransaction, *tree.CommitTransaction, *tree.RollbackTransaction:
Expand Down
12 changes: 12 additions & 0 deletions pkg/frontend/computation_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package frontend

import (
"bytes"
"context"
"time"

Expand Down Expand Up @@ -56,6 +57,8 @@ type TxnComputationWrapper struct {
uuid uuid.UUID
//holds values of params in the PREPARE
paramVals []any

explainBuffer *bytes.Buffer
}

func InitTxnComputationWrapper(
Expand Down Expand Up @@ -283,6 +286,10 @@ func (cwft *TxnComputationWrapper) RecordExecPlan(ctx context.Context, phyPlan *
return nil
}

func (cwft *TxnComputationWrapper) SetExplainBuffer(buf *bytes.Buffer) {
cwft.explainBuffer = buf
}

func (cwft *TxnComputationWrapper) GetUUID() []byte {
return cwft.uuid[:]
}
Expand Down Expand Up @@ -439,6 +446,11 @@ func createCompile(
if _, ok := stmt.(*tree.ExplainAnalyze); ok {
fill = func(bat *batch.Batch) error { return nil }
}

if _, ok := stmt.(*tree.ExplainPhyPlan); ok {
fill = func(bat *batch.Batch) error { return nil }
}

err = retCompile.Compile(execCtx.reqCtx, plan, fill)
if err != nil {
return
Expand Down
42 changes: 41 additions & 1 deletion pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package frontend

import (
"bufio"
"bytes"
"context"
"encoding/binary"
Expand Down Expand Up @@ -1822,6 +1823,41 @@ func buildMoExplainQuery(execCtx *ExecCtx, explainColName string, buffer *explai
return err
}

func buildMoExplainPhyPlan(execCtx *ExecCtx, explainColName string, reader *bufio.Reader, session *Session, fill outputCallBackFunc) error {
bat := batch.New(true, []string{explainColName})
vs := make([][]byte, 0)
count := 0
for {
line, err := reader.ReadString('\n')
if err == io.EOF && len(line) > 0 {
vs = append(vs, []byte(strings.TrimSuffix(line, "\n")))
count++
break
}
if err != nil {
return moerr.NewInvalidInputf(execCtx.reqCtx, "Error when read explain phyplan buffer: %s", err.Error())
}

vs = append(vs, []byte(strings.TrimSuffix(line, "\n")))
count++
}

vs = vs[:count]
vec := vector.NewVec(types.T_varchar.ToType())
defer vec.Free(session.GetMemPool())
vector.AppendBytesList(vec, vs, nil, session.GetMemPool())
bat.Vecs[0] = vec
bat.SetRowCount(count)

err := fill(session, execCtx, bat)
if err != nil {
return err
}
// to trigger save result meta
err = fill(session, execCtx, nil)
return err
}

func buildPlan(reqCtx context.Context, ses FeSession, ctx plan2.CompilerContext, stmt tree.Statement) (*plan2.Plan, error) {
var ret *plan2.Plan
var err error
Expand Down Expand Up @@ -1895,7 +1931,7 @@ func buildPlan(reqCtx context.Context, ses FeSession, ctx plan2.CompilerContext,
*tree.Update, *tree.Delete, *tree.Insert,
*tree.ShowDatabases, *tree.ShowTables, *tree.ShowSequences, *tree.ShowColumns, *tree.ShowColumnNumber, *tree.ShowTableNumber,
*tree.ShowCreateDatabase, *tree.ShowCreateTable, *tree.ShowIndex,
*tree.ExplainStmt, *tree.ExplainAnalyze:
*tree.ExplainStmt, *tree.ExplainAnalyze, *tree.ExplainPhyPlan:
opt := plan2.NewBaseOptimizer(ctx)
optimized, err := opt.Optimize(stmt, isPrepareStmt)
if err != nil {
Expand Down Expand Up @@ -2594,6 +2630,8 @@ func executeStmt(ses *Session,
}
case *tree.ExplainAnalyze:
ses.SetData(nil)
case *tree.ExplainPhyPlan:
ses.SetData(nil)
case *tree.ShowTableStatus:
ses.SetShowStmtType(ShowTableStatus)
ses.SetData(nil)
Expand Down Expand Up @@ -2624,7 +2662,9 @@ func executeStmt(ses *Session,
analyzeModule := c.GetAnalyzeModule()
if analyzeModule != nil {
phyPlan = analyzeModule.GetPhyPlan()
execCtx.cw.SetExplainBuffer(analyzeModule.GetExplainPhyBuffer())
}

// Serialize the execution plan as json
_ = execCtx.cw.RecordExecPlan(execCtx.reqCtx, phyPlan)
c.Release()
Expand Down
65 changes: 65 additions & 0 deletions pkg/frontend/result_row_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package frontend

import (
"bufio"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -112,7 +113,49 @@ func executeResultRowStmt(ses *Session, execCtx *ExecCtx) (err error) {
if time.Since(runBegin) > time.Second {
ses.Infof(execCtx.reqCtx, "time of Exec.Run : %s", time.Since(runBegin).String())
}
//----------------------------------------------------------------------------------------------------------------------
case *tree.ExplainPhyPlan:
queryPlan := execCtx.cw.Plan()
txnHaveDDL := false
ws := ses.proc.GetTxnOperator().GetWorkspace()
if ws != nil {
txnHaveDDL = ws.GetHaveDDL()
}
explainColName := plan2.GetPhyPlanTitle(queryPlan.GetQuery(), txnHaveDDL)
colDefs, columns, err = GetExplainColumns(execCtx.reqCtx, explainColName)
if err != nil {
ses.Error(execCtx.reqCtx,
"Failed to get columns from ExplainColumns handler",
zap.Error(err))
return
}

ses.rs = &plan.ResultColDef{ResultCols: colDefs}

ses.EnterFPrint(FPResultRowStmtExplainAnalyze1)
defer ses.ExitFPrint(FPResultRowStmtExplainAnalyze1)
err = execCtx.resper.RespPreMeta(execCtx, columns)
if err != nil {
return
}

ses.EnterFPrint(FPResultRowStmtExplainAnalyze2)
defer ses.ExitFPrint(FPResultRowStmtExplainAnalyze2)
fPrintTxnOp := execCtx.ses.GetTxnHandler().GetTxn()
setFPrints(fPrintTxnOp, execCtx.ses.GetFPrints())
runBegin := time.Now()
/*
Step 1: Start
*/
if _, err = execCtx.runner.Run(0); err != nil {
return
}

// only log if run time is longer than 1s
if time.Since(runBegin) > time.Second {
ses.Infof(execCtx.reqCtx, "time of Exec.Run : %s", time.Since(runBegin).String())
}
//----------------------------------------------------------------------------------------------------------------------
default:
columns, err = execCtx.cw.GetColumns(execCtx.reqCtx)
if err != nil {
Expand Down Expand Up @@ -273,11 +316,33 @@ func (resper *MysqlResp) respStreamResultRow(ses *Session,
return
}

err = resper.mysqlRrWr.WriteEOFOrOK(0, checkMoreResultSet(ses.getStatusAfterTxnIsEnded(execCtx.reqCtx), execCtx.isLastStmt))
if err != nil {
return
}
//--------------------------------------------------------------------------------------------------------------
case *tree.ExplainPhyPlan:
queryPlan := execCtx.cw.Plan()
txnHaveDDL := false
ws := ses.proc.GetTxnOperator().GetWorkspace()
if ws != nil {
txnHaveDDL = ws.GetHaveDDL()
}
explainColName := plan2.GetPlanTitle(queryPlan.GetQuery(), txnHaveDDL)

txnCompileWrapper := execCtx.cw.(*TxnComputationWrapper)
reader := bufio.NewReader(txnCompileWrapper.explainBuffer)
err = buildMoExplainPhyPlan(execCtx, explainColName, reader, ses, getDataFromPipeline)
if err != nil {
return
}

err = resper.mysqlRrWr.WriteEOFOrOK(0, checkMoreResultSet(ses.getStatusAfterTxnIsEnded(execCtx.reqCtx), execCtx.isLastStmt))
if err != nil {
return
}

//--------------------------------------------------------------------------------------------------------------
default:
err = resper.mysqlRrWr.WriteEOFOrOK(0, checkMoreResultSet(ses.getStatusAfterTxnIsEnded(execCtx.reqCtx), execCtx.isLastStmt))
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/frontend/test/types_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/frontend/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package frontend

import (
"bytes"
"context"
"io"
"strings"
Expand Down Expand Up @@ -208,6 +209,8 @@ type ComputationWrapper interface {

RecordExecPlan(ctx context.Context, phyPlan *models.PhyPlan) error

SetExplainBuffer(buf *bytes.Buffer)

GetLoadTag() bool

GetServerStatus() uint16
Expand Down
Loading

0 comments on commit e115c58

Please sign in to comment.