diff --git a/pkg/vm/engine/tae/catalog/types.go b/pkg/vm/engine/tae/catalog/types.go index 8afc774d388b..1338b862b980 100644 --- a/pkg/vm/engine/tae/catalog/types.go +++ b/pkg/vm/engine/tae/catalog/types.go @@ -111,3 +111,14 @@ func NewTombstoneBatchByPKType(pkType types.Type, mp *mpool.MPool) *containers.B bat.AddVector(AttrCommitTs, commitTSVec) return bat } + +// rowid, pk, commitTS +// used in Collect Delete in Range +func NewCNTombstoneBatchByPKType(pkType types.Type, mp *mpool.MPool) *containers.Batch { + bat := containers.NewBatch() + rowIDVec := containers.MakeVector(types.T_Rowid.ToType(), mp) + pkVec := containers.MakeVector(pkType, mp) + bat.AddVector(AttrRowID, rowIDVec) + bat.AddVector(AttrPKVal, pkVec) + return bat +} diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 81373785adad..278e7da547bd 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -9418,6 +9418,34 @@ func TestFillBlockTombstonesPersistedAobj(t *testing.T) { assert.Equal(t, int64(0), common.DebugAllocator.CurrNB()) } +func TestTransferS3Deletes(t *testing.T) { + defer testutils.AfterTest(t)() + ctx := context.Background() + + opts := config.WithLongScanAndCKPOpts(nil) + tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer tae.Close() + rows := 10 + schema := catalog.MockSchemaAll(2, 1) + schema.BlockMaxRows = 10 + tae.BindSchema(schema) + bat := catalog.MockBatch(schema, rows) + defer bat.Close() + tae.CreateRelAndAppend(bat, true) + + // apply deleteloc fails on ablk + txn, _ := tae.StartTxn(nil) + v1 := bat.Vecs[schema.GetSingleSortKeyIdx()].Get(1) + ok, err := tae.TryDeleteByDeltalocWithTxn([]any{v1}, txn) + { + tae.CompactBlocks(true) + } + assert.NoError(t, err) + assert.True(t, ok) + assert.NoError(t, txn.Commit(ctx)) + tae.CheckRowsByScan(9, true) + t.Log(tae.Catalog.SimplePPString(3)) +} func TestStartStopTableMerge(t *testing.T) { db := testutil.InitTestDB(context.Background(), "MergeTest", t, nil) defer db.Close() diff --git a/pkg/vm/engine/tae/db/testutil/engine.go b/pkg/vm/engine/tae/db/testutil/engine.go index 7219e3ef4fe0..a98926a2f59e 100644 --- a/pkg/vm/engine/tae/db/testutil/engine.go +++ b/pkg/vm/engine/tae/db/testutil/engine.go @@ -345,12 +345,14 @@ func (e *TestEngine) TryDeleteByDeltalocWithTxn(vals []any, txn txnif.AsyncTxn) pks.Append(val, false) } - stats, err := MockCNDeleteInS3(e.Runtime.Fs, rowIDs, pks, e.schema, txn) + s3stats, err := MockCNDeleteInS3(e.Runtime.Fs, rowIDs, pks, e.schema, txn) + stats := objectio.NewObjectStatsWithObjectID(s3stats.ObjectName().ObjectId(), false, true, true) + objectio.SetObjectStats(stats, &s3stats) pks.Close() rowIDs.Close() assert.NoError(e.T, err) require.False(e.T, stats.IsZero()) - ok, err = rel.TryDeleteByStats(firstID, stats) + ok, err = rel.TryDeleteByStats(firstID, *stats) assert.NoError(e.T, err) if !ok { return ok, err diff --git a/pkg/vm/engine/tae/txn/txnimpl/base_table.go b/pkg/vm/engine/tae/txn/txnimpl/base_table.go index 3ce3e8cad83e..4d2cdc0d9294 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/base_table.go +++ b/pkg/vm/engine/tae/txn/txnimpl/base_table.go @@ -75,7 +75,12 @@ func (tbl *baseTable) DedupWorkSpace(key containers.Vector) (err error) { } return } - +func (tbl *baseTable) approxSize() int { + if tbl == nil || tbl.tableSpace == nil || tbl.tableSpace.node == nil { + return 0 + } + return tbl.tableSpace.node.data.ApproxSize() +} func (tbl *baseTable) BatchDedupLocal(bat *containers.Batch) error { if tbl.tableSpace == nil || !tbl.schema.HasPK() { return nil diff --git a/pkg/vm/engine/tae/txn/txnimpl/store.go b/pkg/vm/engine/tae/txn/txnimpl/store.go index d1c05f6c05f1..ecedc909ff48 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/store.go +++ b/pkg/vm/engine/tae/txn/txnimpl/store.go @@ -16,6 +16,7 @@ package txnimpl import ( "context" + "fmt" "runtime/trace" "sync" "sync/atomic" @@ -23,6 +24,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/moprobe" + "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" @@ -50,6 +52,10 @@ var ( } ) +const ( + MaxWalSize = 70 * mpool.MB +) + func getTracer() *txnTracer { return _tracerPool.Get().(*txnTracer) } @@ -726,9 +732,22 @@ func (store *txnStore) PrePrepare(ctx context.Context) (err error) { return } } + approxSize := store.approxSize() + if approxSize > MaxWalSize { + return moerr.NewInternalError(ctx, fmt.Sprintf("WAL entry approxSize %d is too large, max is %d", approxSize, MaxWalSize)) + } + if approxSize > 50*mpool.MB { + logutil.Warnf("[Large-WAL-Entry]txn %x, WAL entry approxSize %d", store.txn.GetID(), approxSize) + } return } - +func (store *txnStore) approxSize() int { + size := 0 + for _, db := range store.dbs { + size += db.approxSize() + } + return size +} func (store *txnStore) PrepareCommit() (err error) { if store.warChecker != nil { if err = store.warChecker.checkAll( diff --git a/pkg/vm/engine/tae/txn/txnimpl/table.go b/pkg/vm/engine/tae/txn/txnimpl/table.go index 409230909ed9..81ad06a0f807 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/table.go +++ b/pkg/vm/engine/tae/txn/txnimpl/table.go @@ -171,6 +171,9 @@ func (tbl *txnTable) TransferDeleteIntent( return } +func (tbl *txnTable) approxSize() int { + return tbl.dataTable.approxSize() + tbl.tombstoneTable.approxSize() +} func (tbl *txnTable) TransferDeletes(ts types.TS, phase string) (err error) { if tbl.store.rt.TransferTable == nil { return @@ -186,6 +189,12 @@ func (tbl *txnTable) TransferDeletes(ts types.TS, phase string) (err error) { v2.TxnS3TombstoneTransferGetSoftdeleteObjectsHistogram.Observe(time.Since(tGetSoftdeleteObjects).Seconds()) v2.TxnS3TombstoneSoftdeleteObjectCounter.Add(float64(len(softDeleteObjects))) var findTombstoneDuration, readTombstoneDuration, deleteRowsDuration time.Duration + var transferBatch *containers.Batch + defer func() { + if transferBatch != nil { + transferBatch.Close() + } + }() // transfer deltaloc for _, obj := range softDeleteObjects { tFindTombstone := time.Now() @@ -197,6 +206,8 @@ func (tbl *txnTable) TransferDeletes(ts types.TS, phase string) (err error) { if sel.IsEmpty() { continue } + id := obj.AsCommonID() + v2.TxnS3TombstoneTransferDataObjectCounter.Add(1) v2.TxnS3TombstoneTransferStatsCounter.Add(float64(sel.Count())) iter := sel.Iterator() @@ -224,11 +235,31 @@ func (tbl *txnTable) TransferDeletes(ts types.TS, phase string) (err error) { var pkType *types.Type for i := 0; i < vectors[0].Length(); i++ { rowID := vectors[0].Get(i).(types.Rowid) - blkID2, offset := rowID.Decode() + blkID2, row := rowID.Decode() if *blkID2.Object() != *obj.ID() { continue } id.BlockID = blkID2 + pinned, err := tbl.store.rt.TransferTable.Pin(*id) + // cannot find a transferred record. maybe the transferred record was TTL'ed + // here we can convert the error back to r-w conflict + if err != nil { + err = moerr.NewTxnRWConflictNoCtx() + return err + } + page := pinned.Item() + newRowID, ok := page.Transfer(row) + if !ok { + err := moerr.NewTxnWWConflictNoCtx(0, "") + msg := fmt.Sprintf("table-%d blk-%d delete row-%d", + id.TableID, + id.BlockID, + row) + logutil.Warnf("[ts=%s]TransferDeleteNode: %v", + tbl.store.txn.GetStartTS().ToString(), + msg) + return err + } if pkType == nil { pkType = vectors[1].GetType() } @@ -239,15 +270,62 @@ func (tbl *txnTable) TransferDeletes(ts types.TS, phase string) (err error) { // ErrTxnRWConflict: the target block was also be compacted // ErrTxnWWConflict: w-w error tDeleteRows := time.Now() - if _, err = tbl.TransferDeleteRows(id, offset, pk, pkType, phase, ts); err != nil { - return err + if transferBatch == nil { + transferBatch = catalog.NewCNTombstoneBatchByPKType(*pkType, common.WorkspaceAllocator) } + transferBatch.GetVectorByName(catalog.AttrPKVal).Append(pk, false) + transferBatch.GetVectorByName(catalog.AttrRowID).Append(newRowID, false) deleteRowsDuration += time.Since(tDeleteRows) } } } tbl.store.warChecker.Delete(id) } + if transferBatch != nil { + schema := tbl.getSchema(true) + seqnums := make([]uint16, 0, len(schema.ColDefs)-1) + name := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) + writer, err := blockio.NewBlockWriterNew(tbl.store.rt.Fs.Service, name, schema.Version, seqnums) + if err != nil { + return err + } + + writer.SetDataType(objectio.SchemaTombstone) + writer.SetPrimaryKeyWithType( + uint16(catalog.TombstonePrimaryKeyIdx), + index.HBF, + index.ObjectPrefixFn, + index.BlockPrefixFn, + ) + + split := containers.NewBatchSplitter(transferBatch, int(schema.BlockMaxRows)) + + for { + bat, err := split.Next() + if err != nil { + break + } + cnBatch := containers.ToCNBatch(bat) + for _, vec := range cnBatch.Vecs { + if vec == nil { + // this task has been canceled + return nil + } + } + _, err = writer.WriteBatch(cnBatch) + if err != nil { + return err + } + } + _, _, err = writer.Sync(context.Background()) + if err != nil { + return err + } + writerStats := writer.GetObjectStats() + stats := objectio.NewObjectStatsWithObjectID(name.ObjectId(), false, true, true) + objectio.SetObjectStats(stats, &writerStats) + tbl.tombstoneTable.tableSpace.stats = append(tbl.tombstoneTable.tableSpace.stats, *stats) + } v2.TxnS3TombstoneTransferFindTombstonesHistogram.Observe(findTombstoneDuration.Seconds()) v2.TxnS3TombstoneTransferReadTombstoneHistogram.Observe(readTombstoneDuration.Seconds()) v2.TxnS3TombstoneTransferDeleteRowsHistogram.Observe(deleteRowsDuration.Seconds()) diff --git a/pkg/vm/engine/tae/txn/txnimpl/txndb.go b/pkg/vm/engine/tae/txn/txnimpl/txndb.go index 89c8c1ed1946..7c937457fb01 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/txndb.go +++ b/pkg/vm/engine/tae/txn/txnimpl/txndb.go @@ -438,6 +438,14 @@ func (db *txnDB) Freeze() (err error) { return } +func (db *txnDB) approxSize() int { + size := 0 + for _, tbl := range db.tables { + size += tbl.approxSize() + } + return size +} + func (db *txnDB) PrePrepare(ctx context.Context) (err error) { for _, table := range db.tables { if err = table.PrePreareTransfer(txnif.PrePreparePhase, table.store.rt.Now()); err != nil {