Skip to content

Commit

Permalink
simplify shuffle join scopes (matrixorigin#18403)
Browse files Browse the repository at this point in the history
builds huffle join scopes at compile time, remove the logic of building shuffle scopes in runtime.

Approved by: @ouyuanning
  • Loading branch information
badboynt1 authored Aug 29, 2024
1 parent b25c66e commit 720364f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 48 deletions.
66 changes: 37 additions & 29 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2209,7 +2209,7 @@ func (c *Compile) compileJoin(node, left, right *plan.Node, probeScopes, buildSc
panic("build scopes should be single parallel!")
}
//construct join build operator for tp join
buildScopes[0].setRootOperator(constructJoinBuildOperator(c, rs[0].RootOp, false, 1))
buildScopes[0].setRootOperator(constructJoinBuildOperator(c, rs[0].RootOp, 1))
buildScopes[0].IsEnd = true
rs[0].Magic = Merge
}
Expand Down Expand Up @@ -2301,6 +2301,15 @@ func (c *Compile) compileShuffleJoin(node, left, right *plan.Node, lefts, rights
panic(moerr.NewNYI(c.proc.Ctx, fmt.Sprintf("shuffle join do not support join type '%v'", node.JoinType)))
}

//construct shuffle build
for i := range shuffleJoins {
buildScope := shuffleJoins[i].PreScopes[0]
mergeOp := merge.NewArgument()
buildScope.setRootOperator(mergeOp)
buildOp := constructShuffleBuild(shuffleJoins[i].RootOp, c.proc)
buildScope.setRootOperator(buildOp)
}

return shuffleJoins
}

Expand Down Expand Up @@ -3513,25 +3522,36 @@ func (c *Compile) newShuffleJoinScopeList(left, right []*Scope, n *plan.Node) []

dop := plan2.GetShuffleDop(ncpu)
shuffleJoins := make([]*Scope, 0, len(c.cnList)*dop)
lnum := len(left)
sum := lnum + len(right)
shuffleBuilds := make([]*Scope, 0, len(c.cnList)*dop)

lenLeft := len(left)
lenRight := len(right)

shuffleIdx := 0
for _, cn := range c.cnList {
ss := make([]*Scope, dop)
for i := range ss {
ss[i] = newScope(Remote)
ss[i].IsJoin = true
ss[i].NodeInfo.Addr = cn.Addr
ss[i].NodeInfo.Mcpu = 1
ss[i].Proc = c.proc.NewNoContextChildProc(sum)
ss[i].BuildIdx = lnum
joins := make([]*Scope, dop)
builds := make([]*Scope, dop)
for i := range joins {
joins[i] = newScope(Remote)
joins[i].IsJoin = true
joins[i].NodeInfo.Addr = cn.Addr
joins[i].NodeInfo.Mcpu = 1
joins[i].Proc = c.proc.NewNoContextChildProc(lenLeft)
builds[i] = newScope(Remote)
builds[i].NodeInfo = joins[i].NodeInfo
builds[i].Proc = c.proc.NewNoContextChildProc(lenRight)
joins[i].PreScopes = []*Scope{builds[i]}
shuffleIdx++
ss[i].ShuffleIdx = shuffleIdx
for _, rr := range ss[i].Proc.Reg.MergeReceivers {
joins[i].ShuffleIdx = shuffleIdx
for _, rr := range joins[i].Proc.Reg.MergeReceivers {
rr.Ch = make(chan *process.RegisterMessage, shuffleChannelBufferSize)
}
for _, rr := range builds[i].Proc.Reg.MergeReceivers {
rr.Ch = make(chan *process.RegisterMessage, shuffleChannelBufferSize)
}
}
shuffleJoins = append(shuffleJoins, ss...)
shuffleJoins = append(shuffleJoins, joins...)
shuffleBuilds = append(shuffleBuilds, builds...)
}

currentFirstFlag := c.anal.isFirst
Expand All @@ -3542,18 +3562,12 @@ func (c *Compile) newShuffleJoinScopeList(left, right []*Scope, n *plan.Node) []
scp.setRootOperator(constructDispatch(i, shuffleJoins, scp.NodeInfo.Addr, n, true, scp.NodeInfo.Mcpu))
scp.IsEnd = true

appended := false
for _, js := range shuffleJoins {
if isSameCN(js.NodeInfo.Addr, scp.NodeInfo.Addr) {
js.PreScopes = append(js.PreScopes, scp)
appended = true
break
}
}
if !appended {
c.proc.Errorf(c.proc.Ctx, "no same addr scope to append left scopes")
shuffleJoins[0].PreScopes = append(shuffleJoins[0].PreScopes, scp)
}
}

c.anal.isFirst = currentFirstFlag
Expand All @@ -3562,21 +3576,15 @@ func (c *Compile) newShuffleJoinScopeList(left, right []*Scope, n *plan.Node) []
shuffleOp.SetIdx(c.anal.curNodeIdx)
scp.setRootOperator(shuffleOp)

scp.setRootOperator(constructDispatch(i+lnum, shuffleJoins, scp.NodeInfo.Addr, n, false, scp.NodeInfo.Mcpu))
scp.setRootOperator(constructDispatch(i, shuffleBuilds, scp.NodeInfo.Addr, n, false, scp.NodeInfo.Mcpu))
scp.IsEnd = true

appended := false
for _, js := range shuffleJoins {
for _, js := range shuffleBuilds {
if isSameCN(js.NodeInfo.Addr, scp.NodeInfo.Addr) {
js.PreScopes = append(js.PreScopes, scp)
appended = true
break
}
}
if !appended {
c.proc.Errorf(c.proc.Ctx, "no same addr scope to append right scopes")
shuffleJoins[0].PreScopes = append(shuffleJoins[0].PreScopes, scp)
}
}
return shuffleJoins
}
Expand Down Expand Up @@ -3608,7 +3616,7 @@ func (c *Compile) newJoinBuildScope(s *Scope, mcpu int32) *Scope {
mergeOp.SetIdx(c.anal.curNodeIdx)
mergeOp.SetIsFirst(c.anal.isFirst)
rs.setRootOperator(mergeOp)
rs.setRootOperator(constructJoinBuildOperator(c, vm.GetLeafOpParent(nil, (s.RootOp)), s.ShuffleIdx > 0, mcpu))
rs.setRootOperator(constructJoinBuildOperator(c, vm.GetLeafOpParent(nil, s.RootOp), mcpu))

rs.IsEnd = true

Expand Down
8 changes: 1 addition & 7 deletions pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,7 @@ func constructLoopJoin(n *plan.Node, typs []types.Type, proc *process.Process, j
return arg
}

func constructJoinBuildOperator(c *Compile, op vm.Operator, isShuffle bool, mcpu int32) vm.Operator {
func constructJoinBuildOperator(c *Compile, op vm.Operator, mcpu int32) vm.Operator {
switch op.OpType() {
case vm.IndexJoin:
indexJoin := op.(*indexjoin.IndexJoin)
Expand All @@ -1588,12 +1588,6 @@ func constructJoinBuildOperator(c *Compile, op vm.Operator, isShuffle bool, mcpu
ret.SetIsFirst(true)
return ret
default:
if isShuffle {
res := constructShuffleBuild(op, c.proc)
res.SetIdx(op.GetOperatorBase().GetIdx())
res.SetIsFirst(true)
return res
}
res := constructHashBuild(op, c.proc, mcpu)
res.SetIdx(op.GetOperatorBase().GetIdx())
res.SetIsFirst(true)
Expand Down
12 changes: 0 additions & 12 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,19 +451,7 @@ func (s *Scope) ParallelRun(c *Compile) (err error) {
// buildJoinParallelRun deal one case of scope.ParallelRun.
// this function will create a pipeline to run a join in parallel.
func buildJoinParallelRun(s *Scope, c *Compile) (*Scope, error) {
if c.IsTpQuery() {
//tp query build scope in compile time, not runtime
return s, nil
}
mcpu := s.NodeInfo.Mcpu

if s.ShuffleIdx > 0 { //shuffle join
buildScope := c.newJoinBuildScope(s, 1)
s.PreScopes = append(s.PreScopes, buildScope)
s.Proc.Reg.MergeReceivers = s.Proc.Reg.MergeReceivers[:s.BuildIdx]
return s, nil
}

if mcpu <= 1 { // broadcast join with no parallel
return s, nil
}
Expand Down

0 comments on commit 720364f

Please sign in to comment.