Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Count only partitions #659

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions sql/analyzer/process.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package analyzer

import (
"sync"

"gopkg.in/src-d/go-mysql-server.v0/sql"
"gopkg.in/src-d/go-mysql-server.v0/sql/plan"
)
Expand All @@ -18,7 +20,8 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {

processList := a.Catalog.ProcessList

var seen = make(map[string]struct{})
seenProcess := make(map[string]struct{})
seenPartition := &sync.Map{}
n, err := n.TransformUp(func(n sql.Node) (sql.Node, error) {
switch n := n.(type) {
case *plan.ResolvedTable:
Expand All @@ -28,7 +31,7 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
}

name := n.Table.Name()
if _, ok := seen[name]; ok {
if _, ok := seenProcess[name]; ok {
return n, nil
}

Expand All @@ -42,18 +45,22 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
}
processList.AddProgressItem(ctx.Pid(), name, total)

seen[name] = struct{}{}
seenProcess[name] = struct{}{}

notify := func() {
processList.UpdateProgress(ctx.Pid(), name, 1)
onUpdatePartitionProgress := func(key string) {
delta := int64(0)
if _, ok := seenPartition.LoadOrStore(key, struct{}{}); !ok {
delta = 1
}
processList.UpdateProgress(ctx.Pid(), name, delta)
}

var t sql.Table
switch table := n.Table.(type) {
case sql.IndexableTable:
t = plan.NewProcessIndexableTable(table, notify)
t = plan.NewProcessIndexableTable(table, onUpdatePartitionProgress)
default:
t = plan.NewProcessTable(table, notify)
t = plan.NewProcessTable(table, onUpdatePartitionProgress)
}

return plan.NewResolvedTable(t), nil
Expand Down Expand Up @@ -85,5 +92,7 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
return nil, err
}

return plan.NewQueryProcess(node, func() { processList.Done(ctx.Pid()) }), nil
return plan.NewQueryProcess(node, func(string) {
processList.Done(ctx.Pid())
}), nil
}
49 changes: 34 additions & 15 deletions sql/plan/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type QueryProcess struct {
}

// NotifyFunc is a function to notify about some event.
type NotifyFunc func()
type NotifyFunc func(key string)

// NewQueryProcess creates a new QueryProcess node.
func NewQueryProcess(node sql.Node, notify NotifyFunc) *QueryProcess {
Expand Down Expand Up @@ -52,7 +52,15 @@ func (p *QueryProcess) RowIter(ctx *sql.Context) (sql.RowIter, error) {
return nil, err
}

return &trackedRowIter{iter, p.Notify}, nil
var key string
if t, ok := p.Child.(sql.Partition); ok {
key = string(t.Key())
}
return &trackedRowIter{
key: key,
iter: iter,
notify: p.Notify,
}, nil
}

func (p *QueryProcess) String() string { return p.Child.String() }
Expand All @@ -63,7 +71,7 @@ func (p *QueryProcess) String() string { return p.Child.String() }
// partition is processed.
type ProcessIndexableTable struct {
sql.IndexableTable
Notify NotifyFunc
notify NotifyFunc
}

// NewProcessIndexableTable returns a new ProcessIndexableTable.
Expand All @@ -86,7 +94,7 @@ func (t *ProcessIndexableTable) IndexKeyValues(
return nil, err
}

return &trackedPartitionIndexKeyValueIter{iter, t.Notify}, nil
return &trackedPartitionIndexKeyValueIter{iter, t.notify}, nil
}

// PartitionRows implements the sql.Table interface.
Expand All @@ -96,7 +104,11 @@ func (t *ProcessIndexableTable) PartitionRows(ctx *sql.Context, p sql.Partition)
return nil, err
}

return &trackedRowIter{iter, t.Notify}, nil
return &trackedRowIter{
key: string(p.Key()),
iter: iter,
notify: t.notify,
}, nil
}

var _ sql.IndexableTable = (*ProcessIndexableTable)(nil)
Expand All @@ -106,7 +118,7 @@ var _ sql.IndexableTable = (*ProcessIndexableTable)(nil)
// is processed.
type ProcessTable struct {
sql.Table
Notify NotifyFunc
notify NotifyFunc
}

// NewProcessTable returns a new ProcessTable.
Expand All @@ -126,17 +138,22 @@ func (t *ProcessTable) PartitionRows(ctx *sql.Context, p sql.Partition) (sql.Row
return nil, err
}

return &trackedRowIter{iter, t.Notify}, nil
return &trackedRowIter{
key: string(p.Key()),
iter: iter,
notify: t.notify,
}, nil
}

type trackedRowIter struct {
key string
iter sql.RowIter
notify NotifyFunc
}

func (i *trackedRowIter) done() {
if i.notify != nil {
i.notify()
i.notify(i.key)
i.notify = nil
}
}
Expand Down Expand Up @@ -168,27 +185,29 @@ func (i *trackedPartitionIndexKeyValueIter) Next() (sql.Partition, sql.IndexKeyV
return nil, nil, err
}

return p, &trackedIndexKeyValueIter{iter, i.notify}, nil
return p, &trackedIndexKeyValueIter{
key: string(p.Key()),
iter: iter,
notify: i.notify,
}, nil
}

type trackedIndexKeyValueIter struct {
key string
iter sql.IndexKeyValueIter
notify NotifyFunc
}

func (i *trackedIndexKeyValueIter) done() {
if i.notify != nil {
i.notify()
i.notify(i.key)
i.notify = nil
}
}

func (i *trackedIndexKeyValueIter) Close() (err error) {
func (i *trackedIndexKeyValueIter) Close() error {
i.done()
if i.iter != nil {
err = i.iter.Close()
}
return err
return nil
}

func (i *trackedIndexKeyValueIter) Next() ([]interface{}, []byte, error) {
Expand Down
6 changes: 3 additions & 3 deletions sql/plan/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestQueryProcess(t *testing.T) {
},
NewResolvedTable(table),
),
func() {
func(key string) {
notifications++
},
)
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestProcessTable(t *testing.T) {
NewResolvedTable(
NewProcessTable(
table,
func() {
func(key string) {
notifications++
},
),
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestProcessIndexableTable(t *testing.T) {

pt := NewProcessIndexableTable(
table,
func() {
func(key string) {
notifications++
},
)
Expand Down