Skip to content

Commit

Permalink
Update TransferDeletes for S3 deletes (matrixorigin#18706)
Browse files Browse the repository at this point in the history
Flush data when transfer S3 deletes.
Return error when WAL entry is too large.

Approved by: @XuPeng-SH
  • Loading branch information
jiangxinmeng1 authored Sep 12, 2024
1 parent 7d313f1 commit 303d13d
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 7 deletions.
11 changes: 11 additions & 0 deletions pkg/vm/engine/tae/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,14 @@ func NewTombstoneBatchByPKType(pkType types.Type, mp *mpool.MPool) *containers.B
bat.AddVector(AttrCommitTs, commitTSVec)
return bat
}

// rowid, pk, commitTS
// used in Collect Delete in Range
func NewCNTombstoneBatchByPKType(pkType types.Type, mp *mpool.MPool) *containers.Batch {
bat := containers.NewBatch()
rowIDVec := containers.MakeVector(types.T_Rowid.ToType(), mp)
pkVec := containers.MakeVector(pkType, mp)
bat.AddVector(AttrRowID, rowIDVec)
bat.AddVector(AttrPKVal, pkVec)
return bat
}
28 changes: 28 additions & 0 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9418,6 +9418,34 @@ func TestFillBlockTombstonesPersistedAobj(t *testing.T) {
assert.Equal(t, int64(0), common.DebugAllocator.CurrNB())
}

func TestTransferS3Deletes(t *testing.T) {
defer testutils.AfterTest(t)()
ctx := context.Background()

opts := config.WithLongScanAndCKPOpts(nil)
tae := testutil.NewTestEngine(ctx, ModuleName, t, opts)
defer tae.Close()
rows := 10
schema := catalog.MockSchemaAll(2, 1)
schema.BlockMaxRows = 10
tae.BindSchema(schema)
bat := catalog.MockBatch(schema, rows)
defer bat.Close()
tae.CreateRelAndAppend(bat, true)

// apply deleteloc fails on ablk
txn, _ := tae.StartTxn(nil)
v1 := bat.Vecs[schema.GetSingleSortKeyIdx()].Get(1)
ok, err := tae.TryDeleteByDeltalocWithTxn([]any{v1}, txn)
{
tae.CompactBlocks(true)
}
assert.NoError(t, err)
assert.True(t, ok)
assert.NoError(t, txn.Commit(ctx))
tae.CheckRowsByScan(9, true)
t.Log(tae.Catalog.SimplePPString(3))
}
func TestStartStopTableMerge(t *testing.T) {
db := testutil.InitTestDB(context.Background(), "MergeTest", t, nil)
defer db.Close()
Expand Down
6 changes: 4 additions & 2 deletions pkg/vm/engine/tae/db/testutil/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,14 @@ func (e *TestEngine) TryDeleteByDeltalocWithTxn(vals []any, txn txnif.AsyncTxn)
pks.Append(val, false)
}

stats, err := MockCNDeleteInS3(e.Runtime.Fs, rowIDs, pks, e.schema, txn)
s3stats, err := MockCNDeleteInS3(e.Runtime.Fs, rowIDs, pks, e.schema, txn)
stats := objectio.NewObjectStatsWithObjectID(s3stats.ObjectName().ObjectId(), false, true, true)
objectio.SetObjectStats(stats, &s3stats)
pks.Close()
rowIDs.Close()
assert.NoError(e.T, err)
require.False(e.T, stats.IsZero())
ok, err = rel.TryDeleteByStats(firstID, stats)
ok, err = rel.TryDeleteByStats(firstID, *stats)
assert.NoError(e.T, err)
if !ok {
return ok, err
Expand Down
7 changes: 6 additions & 1 deletion pkg/vm/engine/tae/txn/txnimpl/base_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ func (tbl *baseTable) DedupWorkSpace(key containers.Vector) (err error) {
}
return
}

func (tbl *baseTable) approxSize() int {
if tbl == nil || tbl.tableSpace == nil || tbl.tableSpace.node == nil {
return 0
}
return tbl.tableSpace.node.data.ApproxSize()
}
func (tbl *baseTable) BatchDedupLocal(bat *containers.Batch) error {
if tbl.tableSpace == nil || !tbl.schema.HasPK() {
return nil
Expand Down
21 changes: 20 additions & 1 deletion pkg/vm/engine/tae/txn/txnimpl/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package txnimpl

import (
"context"
"fmt"
"runtime/trace"
"sync"
"sync/atomic"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/moprobe"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/nulls"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
Expand Down Expand Up @@ -50,6 +52,10 @@ var (
}
)

const (
MaxWalSize = 70 * mpool.MB
)

func getTracer() *txnTracer {
return _tracerPool.Get().(*txnTracer)
}
Expand Down Expand Up @@ -726,9 +732,22 @@ func (store *txnStore) PrePrepare(ctx context.Context) (err error) {
return
}
}
approxSize := store.approxSize()
if approxSize > MaxWalSize {
return moerr.NewInternalError(ctx, fmt.Sprintf("WAL entry approxSize %d is too large, max is %d", approxSize, MaxWalSize))
}
if approxSize > 50*mpool.MB {
logutil.Warnf("[Large-WAL-Entry]txn %x, WAL entry approxSize %d", store.txn.GetID(), approxSize)
}
return
}

func (store *txnStore) approxSize() int {
size := 0
for _, db := range store.dbs {
size += db.approxSize()
}
return size
}
func (store *txnStore) PrepareCommit() (err error) {
if store.warChecker != nil {
if err = store.warChecker.checkAll(
Expand Down
84 changes: 81 additions & 3 deletions pkg/vm/engine/tae/txn/txnimpl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func (tbl *txnTable) TransferDeleteIntent(
return
}

func (tbl *txnTable) approxSize() int {
return tbl.dataTable.approxSize() + tbl.tombstoneTable.approxSize()
}
func (tbl *txnTable) TransferDeletes(ts types.TS, phase string) (err error) {
if tbl.store.rt.TransferTable == nil {
return
Expand All @@ -186,6 +189,12 @@ func (tbl *txnTable) TransferDeletes(ts types.TS, phase string) (err error) {
v2.TxnS3TombstoneTransferGetSoftdeleteObjectsHistogram.Observe(time.Since(tGetSoftdeleteObjects).Seconds())
v2.TxnS3TombstoneSoftdeleteObjectCounter.Add(float64(len(softDeleteObjects)))
var findTombstoneDuration, readTombstoneDuration, deleteRowsDuration time.Duration
var transferBatch *containers.Batch
defer func() {
if transferBatch != nil {
transferBatch.Close()
}
}()
// transfer deltaloc
for _, obj := range softDeleteObjects {
tFindTombstone := time.Now()
Expand All @@ -197,6 +206,8 @@ func (tbl *txnTable) TransferDeletes(ts types.TS, phase string) (err error) {
if sel.IsEmpty() {
continue
}
id := obj.AsCommonID()

v2.TxnS3TombstoneTransferDataObjectCounter.Add(1)
v2.TxnS3TombstoneTransferStatsCounter.Add(float64(sel.Count()))
iter := sel.Iterator()
Expand Down Expand Up @@ -224,11 +235,31 @@ func (tbl *txnTable) TransferDeletes(ts types.TS, phase string) (err error) {
var pkType *types.Type
for i := 0; i < vectors[0].Length(); i++ {
rowID := vectors[0].Get(i).(types.Rowid)
blkID2, offset := rowID.Decode()
blkID2, row := rowID.Decode()
if *blkID2.Object() != *obj.ID() {
continue
}
id.BlockID = blkID2
pinned, err := tbl.store.rt.TransferTable.Pin(*id)
// cannot find a transferred record. maybe the transferred record was TTL'ed
// here we can convert the error back to r-w conflict
if err != nil {
err = moerr.NewTxnRWConflictNoCtx()
return err
}
page := pinned.Item()
newRowID, ok := page.Transfer(row)
if !ok {
err := moerr.NewTxnWWConflictNoCtx(0, "")
msg := fmt.Sprintf("table-%d blk-%d delete row-%d",
id.TableID,
id.BlockID,
row)
logutil.Warnf("[ts=%s]TransferDeleteNode: %v",
tbl.store.txn.GetStartTS().ToString(),
msg)
return err
}
if pkType == nil {
pkType = vectors[1].GetType()
}
Expand All @@ -239,15 +270,62 @@ func (tbl *txnTable) TransferDeletes(ts types.TS, phase string) (err error) {
// ErrTxnRWConflict: the target block was also be compacted
// ErrTxnWWConflict: w-w error
tDeleteRows := time.Now()
if _, err = tbl.TransferDeleteRows(id, offset, pk, pkType, phase, ts); err != nil {
return err
if transferBatch == nil {
transferBatch = catalog.NewCNTombstoneBatchByPKType(*pkType, common.WorkspaceAllocator)
}
transferBatch.GetVectorByName(catalog.AttrPKVal).Append(pk, false)
transferBatch.GetVectorByName(catalog.AttrRowID).Append(newRowID, false)
deleteRowsDuration += time.Since(tDeleteRows)
}
}
}
tbl.store.warChecker.Delete(id)
}
if transferBatch != nil {
schema := tbl.getSchema(true)
seqnums := make([]uint16, 0, len(schema.ColDefs)-1)
name := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid())
writer, err := blockio.NewBlockWriterNew(tbl.store.rt.Fs.Service, name, schema.Version, seqnums)
if err != nil {
return err
}

writer.SetDataType(objectio.SchemaTombstone)
writer.SetPrimaryKeyWithType(
uint16(catalog.TombstonePrimaryKeyIdx),
index.HBF,
index.ObjectPrefixFn,
index.BlockPrefixFn,
)

split := containers.NewBatchSplitter(transferBatch, int(schema.BlockMaxRows))

for {
bat, err := split.Next()
if err != nil {
break
}
cnBatch := containers.ToCNBatch(bat)
for _, vec := range cnBatch.Vecs {
if vec == nil {
// this task has been canceled
return nil
}
}
_, err = writer.WriteBatch(cnBatch)
if err != nil {
return err
}
}
_, _, err = writer.Sync(context.Background())
if err != nil {
return err
}
writerStats := writer.GetObjectStats()
stats := objectio.NewObjectStatsWithObjectID(name.ObjectId(), false, true, true)
objectio.SetObjectStats(stats, &writerStats)
tbl.tombstoneTable.tableSpace.stats = append(tbl.tombstoneTable.tableSpace.stats, *stats)
}
v2.TxnS3TombstoneTransferFindTombstonesHistogram.Observe(findTombstoneDuration.Seconds())
v2.TxnS3TombstoneTransferReadTombstoneHistogram.Observe(readTombstoneDuration.Seconds())
v2.TxnS3TombstoneTransferDeleteRowsHistogram.Observe(deleteRowsDuration.Seconds())
Expand Down
8 changes: 8 additions & 0 deletions pkg/vm/engine/tae/txn/txnimpl/txndb.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,14 @@ func (db *txnDB) Freeze() (err error) {
return
}

func (db *txnDB) approxSize() int {
size := 0
for _, tbl := range db.tables {
size += tbl.approxSize()
}
return size
}

func (db *txnDB) PrePrepare(ctx context.Context) (err error) {
for _, table := range db.tables {
if err = table.PrePreareTransfer(txnif.PrePreparePhase, table.store.rt.Now()); err != nil {
Expand Down

0 comments on commit 303d13d

Please sign in to comment.