Skip to content

Commit

Permalink
Update tableinfo (matrixorigin#19425)
Browse files Browse the repository at this point in the history
When updating the table info of snapshot meta, there is an unexpected drop
table row, but this table has been dropped, so this row can be ignored completely,
so there is no need to panic, just print a warning, and then analyze where this row
comes from when the warning occurs.

Approved by: @XuPeng-SH
  • Loading branch information
LeftHandCold authored Oct 18, 2024
1 parent 5c9094b commit 92bbacb
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions pkg/vm/engine/tae/logtail/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,9 @@ func isMoTable(tid uint64) bool {
}

type tombstone struct {
pk types.Tuple
ts types.TS
rowid types.Rowid
pk types.Tuple
ts types.TS
}

func (sm *SnapshotMeta) updateTableInfo(
Expand Down Expand Up @@ -467,7 +468,7 @@ func (sm *SnapshotMeta) updateTableInfo(
}
}

deletes := make([]tombstone, 0)
deleteRows := make([]tombstone, 0)
for _, info := range tTombstones {
if info.stats.BlkCnt() != 1 {
panic(fmt.Sprintf("mo_table tombstone %v blk cnt %v",
Expand All @@ -484,6 +485,7 @@ func (sm *SnapshotMeta) updateTableInfo(
}

commitTsVec := vector.MustFixedColWithTypeCheck[types.TS](objectBat.Vecs[len(objectBat.Vecs)-1])
rowIDVec := vector.MustFixedColWithTypeCheck[types.Rowid](objectBat.Vecs[0])
for i := 0; i < len(commitTsVec); i++ {
pk, _, _, _ := types.DecodeTuple(objectBat.Vecs[1].GetRawBytesAt(i))
commitTs := commitTsVec[i]
Expand All @@ -494,24 +496,27 @@ func (sm *SnapshotMeta) updateTableInfo(
logutil.Infof("yyyy skip table %v @ %v", pk.ErrString(nil), commitTs.ToString())
continue
}
deletes = append(deletes, tombstone{
pk: pk,
ts: commitTs,
deleteRows = append(deleteRows, tombstone{
rowid: rowIDVec[i],
pk: pk,
ts: commitTs,
})
}
}
sort.Slice(deletes, func(i, j int) bool {
ts2 := deletes[j].ts
return deletes[i].ts.LT(&ts2)
sort.Slice(deleteRows, func(i, j int) bool {
ts2 := deleteRows[j].ts
return deleteRows[i].ts.LT(&ts2)
})

for _, del := range deletes {
for _, del := range deleteRows {
pk := del.pk.ErrString(nil)
if sm.tablePKIndex[pk] == nil {
continue
}
if len(sm.tablePKIndex[pk]) == 0 {
panic(fmt.Sprintf("delete table %v not found @ %v, start is %v, end is %v", del.pk.ErrString(nil), del.ts.ToString(), startts.ToString(), endts.ToString()))
logutil.Warnf("[UpdateTableInfoWarn] delete table %v not found @ rowid %v, commit %v, start is %v, end is %v",
del.pk.ErrString(nil), del.rowid.String(), del.ts.ToString(), startts.ToString(), endts.ToString())
continue
}
table := sm.tablePKIndex[pk][0]
if !table.deleteAt.IsEmpty() && table.deleteAt.GT(&del.ts) {
Expand Down

0 comments on commit 92bbacb

Please sign in to comment.