Skip to content

Commit

Permalink
refactor filter related (matrixorigin#18972)
Browse files Browse the repository at this point in the history
1. remove dummy code
2. reuse reader and filter related code for tae and disttae

Approved by: @triump2020, @LeftHandCold
  • Loading branch information
XuPeng-SH authored Sep 24, 2024
1 parent 6b04a52 commit a32519f
Show file tree
Hide file tree
Showing 19 changed files with 251 additions and 585 deletions.
5 changes: 2 additions & 3 deletions pkg/vm/engine/disttae/change_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay"
"github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util"
)

func (tbl *txnTable) CollectChanges(ctx context.Context, from, to types.TS, mp *mpool.MPool) (engine.ChangesHandle, error) {
Expand Down Expand Up @@ -126,9 +127,8 @@ func (h *CheckpointChangesHandle) initReader(ctx context.Context) (err error) {
}

var blockList objectio.BlockInfoSlice
if _, err = TryFastFilterBlocks(
if _, err = engine_util.TryFastFilterBlocks(
ctx,
h.table,
h.end.ToTimestamp(),
tblDef,
nil,
Expand All @@ -137,7 +137,6 @@ func (h *CheckpointChangesHandle) initReader(ctx context.Context) (err error) {
nil,
&blockList,
h.fs,
h.table.proc.Load(),
); err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func (e *Engine) BuildBlockReaders(
shard)
rd, err := NewReader(
ctx,
proc,
proc.Mp(),
e,
def,
ts,
Expand Down
154 changes: 0 additions & 154 deletions pkg/vm/engine/disttae/filter.go

This file was deleted.

5 changes: 2 additions & 3 deletions pkg/vm/engine/disttae/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

// -----------------------------------------------------------------
Expand Down Expand Up @@ -233,7 +232,7 @@ func (r *mergeReader) Read(
// -----------------------------------------------------------------
func NewReader(
ctx context.Context,
proc *process.Process, //it comes from transaction if reader run in local,otherwise it comes from remote compile.
mp *mpool.MPool,
e *Engine,
tableDef *plan.TableDef,
ts timestamp.Timestamp,
Expand All @@ -245,7 +244,7 @@ func NewReader(
baseFilter, err := engine_util.ConstructBasePKFilter(
expr,
tableDef,
proc,
mp,
)
if err != nil {
return nil, err
Expand Down
12 changes: 0 additions & 12 deletions pkg/vm/engine/disttae/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/txn/trace"
)

func newColumnExpr(pos int, typ plan.Type, name string) *plan.Expr {
return &plan.Expr{
Typ: typ,
Expr: &plan.Expr_Col{
Col: &plan.ColRef{
Name: name,
ColPos: int32(pos),
},
},
}
}

func genWriteReqs(
ctx context.Context,
txnCommit *Transaction,
Expand Down
24 changes: 3 additions & 21 deletions pkg/vm/engine/disttae/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,14 @@ import (
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay"
"github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort"
)

func ConstructInExpr(
ctx context.Context,
colName string,
colVec *vector.Vector,
) *plan.Expr {
data, _ := colVec.MarshalBinary()
colExpr := newColumnExpr(0, plan2.MakePlan2Type(colVec.GetType()), colName)
return plan2.MakeInExpr(
ctx,
colExpr,
int32(colVec.Length()),
data,
false,
)
}

func TransferTombstones(
ctx context.Context,
table *txnTable,
Expand Down Expand Up @@ -345,12 +329,11 @@ func doTransferRowids(
v2.BatchTransferTombstonesDurationHistogram.Observe(duration.Seconds())
}()
pkColumName := table.GetTableDef(ctx).Pkey.PkeyColName
expr := ConstructInExpr(ctx, pkColumName, searchPKColumn)
expr := engine_util.ConstructInExpr(ctx, pkColumName, searchPKColumn)

var blockList objectio.BlockInfoSlice
if _, err = TryFastFilterBlocks(
if _, err = engine_util.TryFastFilterBlocks(
ctx,
table,
table.db.op.SnapshotTS(),
table.GetTableDef(ctx),
[]*plan.Expr{expr},
Expand All @@ -359,7 +342,6 @@ func doTransferRowids(
nil,
&blockList,
fs,
table.proc.Load(),
); err != nil {
return
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,8 @@ func (tbl *txnTable) rangesOnePart(
) (err error) {
var done bool

if done, err = TryFastFilterBlocks(
if done, err = engine_util.TryFastFilterBlocks(
ctx,
tbl,
tbl.db.op.SnapshotTS(),
tbl.tableDef,
exprs,
Expand All @@ -713,7 +712,6 @@ func (tbl *txnTable) rangesOnePart(
uncommittedObjects,
outBlocks,
tbl.getTxn().engine.fs,
tbl.proc.Load(),
); err != nil {
return err
} else if done {
Expand Down Expand Up @@ -1769,7 +1767,7 @@ func (tbl *txnTable) BuildReaders(
}
rd, err := NewReader(
ctx,
proc,
proc.Mp(),
tbl.getTxn().engine,
def,
tbl.db.op.SnapshotTS(),
Expand Down Expand Up @@ -1945,15 +1943,15 @@ func (tbl *txnTable) PKPersistedBetween(

keys.InplaceSort()
bytes, _ := keys.MarshalBinary()
colExpr := newColumnExpr(0, plan2.MakePlan2Type(keys.GetType()), tbl.tableDef.Pkey.PkeyColName)
colExpr := engine_util.NewColumnExpr(0, plan2.MakePlan2Type(keys.GetType()), tbl.tableDef.Pkey.PkeyColName)
inExpr := plan2.MakeInExpr(
tbl.proc.Load().Ctx,
colExpr,
int32(keys.Length()),
bytes,
false)

basePKFilter, err := engine_util.ConstructBasePKFilter(inExpr, tbl.tableDef, tbl.proc.Load())
basePKFilter, err := engine_util.ConstructBasePKFilter(inExpr, tbl.tableDef, tbl.proc.Load().Mp())
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/txn_table_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func (tbl *txnTableDelegate) BuildShardingReaders(
}
lrd, err := NewReader(
ctx,
proc,
proc.Mp(),
tbl.origin.getTxn().engine,
tbl.origin.GetTableDef(ctx),
tbl.origin.db.op.SnapshotTS(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/txn_table_sharding_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func HandleShardingReadBuildReader(

rd, err := NewReader(
ctx,
tbl.proc.Load(),
tbl.proc.Load().Mp(),
e.(*Engine),
tbl.tableDef,
tbl.db.op.SnapshotTS(),
Expand Down
Loading

0 comments on commit a32519f

Please sign in to comment.