Skip to content

Commit

Permalink
remove txnmetadata
Browse files Browse the repository at this point in the history
This feature is not longer used, and it didn't ever quite work with
snapshots.
  • Loading branch information
thorfour committed Dec 4, 2023
1 parent bcecc0f commit d3ae5a2
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 145 deletions.
2 changes: 1 addition & 1 deletion bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func BenchmarkSnapshot(b *testing.B) {
bytesWritten := 0
for i := 0; i < b.N; i++ {
wc := &writeCounter{Writer: io.Discard}
require.NoError(b, WriteSnapshot(ctx, db.HighWatermark().TxnID, nil, db, wc, false))
require.NoError(b, WriteSnapshot(ctx, db.HighWatermark(), db, wc, false))
bytesWritten += wc.count
}
b.ReportMetric(float64(bytesWritten)/float64(b.N), "size/op")
Expand Down
73 changes: 29 additions & 44 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,7 @@ type DB struct {
// Databases monotonically increasing transaction id
tx atomic.Uint64
// highWatermark maintains the highest consecutively completed txn.
highWatermark atomicTxn
txnMetadataProvider func(uint64) []byte
highWatermark atomic.Uint64

// TxPool is a waiting area for finished transactions that haven't been added to the watermark
txPool *TxPool
Expand Down Expand Up @@ -409,15 +408,6 @@ type DataSink interface {

type DBOption func(*DB) error

// WithUserDefinedTxnMetadataProvider enables the user to provide custom
// metadata associated with any txn.
func WithUserDefinedTxnMetadataProvider(f func(tx uint64) []byte) DBOption {
return func(db *DB) error {
db.txnMetadataProvider = f
return nil
}
}

func WithCompactionAfterOpen(compact bool, tableNames []string) DBOption {
return func(db *DB) error {
db.compactAfterRecovery = compact
Expand Down Expand Up @@ -568,7 +558,7 @@ func (s *ColumnStore) DB(ctx context.Context, name string, opts ...DBOption) (*D
Name: "frostdb_tx_high_watermark",
Help: "The highest transaction number that has been released to be read",
}, func() float64 {
return float64(db.highWatermark.Load().TxnID)
return float64(db.highWatermark.Load())
}),
snapshotMetrics: newSnapshotMetrics(reg),
}
Expand Down Expand Up @@ -692,46 +682,46 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
level.Info(db.logger).Log(
"msg", "failed to load latest snapshot", "db", db.name, "err", err,
)
snapshotTx.TxnID = 0
snapshotTx = 0
}
snapshotLogArgs := make([]any, 0)
if snapshotTx.TxnID != 0 {
if snapshotTx != 0 {
snapshotLogArgs = append(
snapshotLogArgs,
"snapshot_tx", snapshotTx.TxnID,
"snapshot_tx", snapshotTx,
"snapshot_load_duration", time.Since(snapshotLoadStart),
)
if err := db.cleanupSnapshotDir(ctx, snapshotTx.TxnID); err != nil {
if err := db.cleanupSnapshotDir(ctx, snapshotTx); err != nil {
// Truncation is best-effort. If it fails, move on.
level.Info(db.logger).Log(
"msg", "failed to truncate snapshots not equal to loaded snapshot",
"err", err,
"snapshot_tx", snapshotTx.TxnID,
"snapshot_tx", snapshotTx,
)
}
if err := wal.Truncate(snapshotTx.TxnID); err != nil {
if err := wal.Truncate(snapshotTx); err != nil {
level.Info(db.logger).Log(
"msg", "failed to truncate WAL after loading snapshot",
"err", err,
"snapshot_tx", snapshotTx.TxnID,
"snapshot_tx", snapshotTx,
)
}
}

// persistedTables is a map from a table name to the last transaction
// persisted.
persistedTables := map[string]uint64{}
var lastTx Txn
var lastTx uint64

start := time.Now()
if err := wal.Replay(snapshotTx.TxnID, func(tx uint64, record *walpb.Record) error {
if err := wal.Replay(snapshotTx, func(tx uint64, record *walpb.Record) error {
if err := ctx.Err(); err != nil {
return err
}
switch e := record.Entry.EntryType.(type) {
case *walpb.Entry_TableBlockPersisted_:
persistedTables[e.TableBlockPersisted.TableName] = tx
if tx > snapshotTx.TxnID {
if tx > snapshotTx {
// The loaded snapshot has data in a table that has been
// persisted. Delete all data in this table, since it has
// already been persisted.
Expand Down Expand Up @@ -765,11 +755,11 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
// Writes are performed concurrently to speed up replay.
var writeWg errgroup.Group
writeWg.SetLimit(runtime.GOMAXPROCS(0))
if err := wal.Replay(snapshotTx.TxnID, func(tx uint64, record *walpb.Record) error {
if err := wal.Replay(snapshotTx, func(tx uint64, record *walpb.Record) error {
if err := ctx.Err(); err != nil {
return err
}
lastTx = Txn{TxnID: tx, TxnMetadata: record.TxnMetadata}
lastTx = tx
switch e := record.Entry.EntryType.(type) {
case *walpb.Entry_NewTableBlock_:
entry := e.NewTableBlock
Expand Down Expand Up @@ -930,7 +920,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
}

resetTxn := snapshotTx
if lastTx.TxnID > resetTxn.TxnID {
if lastTx > resetTxn {
resetTxn = lastTx
}

Expand Down Expand Up @@ -960,7 +950,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
[]any{
"msg", "db recovered",
"wal_replay_duration", time.Since(start),
"watermark", resetTxn.TxnID,
"watermark", resetTxn,
},
snapshotLogArgs...,
)...,
Expand Down Expand Up @@ -1178,11 +1168,11 @@ func (db *DB) Table(name string, config *tablepb.TableConfig) (*Table, error) {
}
}

tx, _, metadata, commit := db.begin()
tx, _, commit := db.begin()
defer commit()

id := generateULID()
if err := table.newTableBlock(0, tx, metadata, id); err != nil {
if err := table.newTableBlock(0, tx, id); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1248,7 +1238,7 @@ func (p *DBTableProvider) GetTable(name string) (logicalplan.TableReader, error)

// beginRead returns the high watermark. Reads can safely access any write that has a lower or equal tx id than the returned number.
func (db *DB) beginRead() uint64 {
return db.highWatermark.Load().TxnID
return db.highWatermark.Load()
}

// begin is an internal function that Tables call to start a transaction for writes.
Expand All @@ -1257,16 +1247,11 @@ func (db *DB) beginRead() uint64 {
// the write tx id
// The current high watermark
// A function to complete the transaction
func (db *DB) begin() (uint64, uint64, []byte, func()) {
txnID := db.tx.Add(1)
func (db *DB) begin() (uint64, uint64, func()) {
txn := db.tx.Add(1)
watermark := db.highWatermark.Load()
var txnMetadata []byte
if db.txnMetadataProvider != nil {
txnMetadata = db.txnMetadataProvider(txnID)
}
return txnID, watermark.TxnID, txnMetadata, func() {
txn := Txn{TxnID: txnID, TxnMetadata: txnMetadata}
if mark := db.highWatermark.Load().TxnID; mark+1 == txnID {
return txn, watermark, func() {
if mark := db.highWatermark.Load(); mark+1 == txn {
// This is the next consecutive transaction; increase the watermark.
db.highWatermark.Store(txn)
}
Expand All @@ -1280,15 +1265,15 @@ func (db *DB) begin() (uint64, uint64, []byte, func()) {
// Wait makes no differentiation between completed and aborted transactions.
func (db *DB) Wait(tx uint64) {
for {
if db.highWatermark.Load().TxnID >= tx {
if db.highWatermark.Load() >= tx {
return
}
time.Sleep(10 * time.Millisecond)
}
}

// HighWatermark returns the current high watermark.
func (db *DB) HighWatermark() Txn {
func (db *DB) HighWatermark() uint64 {
return db.highWatermark.Load()
}

Expand All @@ -1297,16 +1282,16 @@ func (db *DB) HighWatermark() Txn {
// expected transaction will log correctly to the WAL. Note that db.wal is not
// used since callers might be calling resetToTxn before db.wal has been
// initialized or might not want the WAL to be reset.
func (db *DB) resetToTxn(txn Txn, wal WAL) {
db.tx.Store(txn.TxnID)
func (db *DB) resetToTxn(txn uint64, wal WAL) {
db.tx.Store(txn)
db.highWatermark.Store(txn)
if wal != nil {
// This call resets the WAL to a zero state so that new records can be
// logged.
if err := wal.Reset(txn.TxnID + 1); err != nil {
if err := wal.Reset(txn + 1); err != nil {
level.Warn(db.logger).Log(
"msg", "failed to reset WAL when resetting DB to txn",
"txnID", txn.TxnID,
"txnID", txn,
"err", err,
)
}
Expand Down
9 changes: 1 addition & 8 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,9 +1413,6 @@ func TestDBRecover(t *testing.T) {
dbAndTableName = "test"
numInserts = 3
)
txnMetadataProvider := func(tx uint64) []byte {
return []byte(fmt.Sprintf("%d-metadata", tx))
}
setup := func(t *testing.T, blockRotation bool, options ...Option) string {
dir := t.TempDir()
c, err := New(
Expand All @@ -1435,7 +1432,7 @@ func TestDBRecover(t *testing.T) {
require.NoError(t, err)
defer c.Close()

db, err := c.DB(ctx, dbAndTableName, WithUserDefinedTxnMetadataProvider(txnMetadataProvider))
db, err := c.DB(ctx, dbAndTableName)
require.NoError(t, err)
schema := dynparquet.SampleDefinition()
table, err := db.Table(dbAndTableName, NewTableConfig(schema))
Expand Down Expand Up @@ -1501,10 +1498,6 @@ func TestDBRecover(t *testing.T) {
db, err := c.DB(ctx, dbAndTableName)
require.NoError(t, err)

// Verify metadata is stored.
watermark := db.highWatermark.Load()
require.Equal(t, txnMetadataProvider(watermark.TxnID), watermark.TxnMetadata)

engine := query.NewEngine(memory.DefaultAllocator, db.TableProvider())
nrows := 0
require.NoError(t, engine.ScanTable(dbAndTableName).
Expand Down
48 changes: 23 additions & 25 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (db *DB) snapshot(ctx context.Context, async bool, onSuccess func()) {
return
}

tx, _, metadata, commit := db.begin()
tx, _, commit := db.begin()
level.Debug(db.logger).Log(
"msg", "starting a new snapshot",
"tx", tx,
Expand All @@ -150,7 +150,6 @@ func (db *DB) snapshot(ctx context.Context, async bool, onSuccess func()) {
Entry: &walpb.Entry{
EntryType: &walpb.Entry_Snapshot_{Snapshot: &walpb.Entry_Snapshot{Tx: tx}},
},
TxnMetadata: metadata,
},
); err != nil {
level.Error(db.logger).Log(
Expand All @@ -175,9 +174,9 @@ func (db *DB) snapshot(ctx context.Context, async bool, onSuccess func()) {
}

if async {
go doSnapshot(db.snapshotWriter(tx, metadata))
go doSnapshot(db.snapshotWriter(tx))
} else {
doSnapshot(db.offlineSnapshotWriter(tx, metadata))
doSnapshot(db.offlineSnapshotWriter(tx))
}
}

Expand Down Expand Up @@ -239,14 +238,14 @@ func (db *DB) snapshotAtTX(ctx context.Context, tx uint64, writeSnapshot func(co

// loadLatestSnapshot loads the latest snapshot (i.e. the snapshot with the
// highest txn) from the snapshots dir into the database.
func (db *DB) loadLatestSnapshot(ctx context.Context) (Txn, error) {
func (db *DB) loadLatestSnapshot(ctx context.Context) (uint64, error) {
return db.loadLatestSnapshotFromDir(ctx, db.snapshotsDir())
}

func (db *DB) loadLatestSnapshotFromDir(ctx context.Context, dir string) (Txn, error) {
func (db *DB) loadLatestSnapshotFromDir(ctx context.Context, dir string) (uint64, error) {
var (
lastErr error
loadedTxn Txn
loadedTxn uint64
)
// No error should be returned from snapshotsDo.
_ = db.snapshotsDo(ctx, dir, func(parsedTx uint64, entry os.DirEntry) (bool, error) {
Expand Down Expand Up @@ -278,24 +277,23 @@ func (db *DB) loadLatestSnapshotFromDir(ctx context.Context, dir string) (Txn, e
}
return false, nil
})
if loadedTxn.TxnID != 0 {
if loadedTxn != 0 {
// Successfully loaded a snapshot.
return loadedTxn, nil
}

errString := "no valid snapshots found"
if lastErr != nil {
return Txn{}, fmt.Errorf("%s: lastErr: %w", errString, lastErr)
return 0, fmt.Errorf("%s: lastErr: %w", errString, lastErr)
}
return Txn{}, fmt.Errorf("%s", errString)
return 0, fmt.Errorf("%s", errString)
}

func LoadSnapshot(ctx context.Context, db *DB, tx uint64, r io.ReaderAt, size int64, truncateWAL bool) (Txn, error) {
txnMetadata, err := loadSnapshot(ctx, db, r, size)
if err != nil {
return Txn{}, err
func LoadSnapshot(ctx context.Context, db *DB, tx uint64, r io.ReaderAt, size int64, truncateWAL bool) (uint64, error) {
if err := loadSnapshot(ctx, db, r, size); err != nil {
return 0, err
}
watermark := Txn{TxnID: tx, TxnMetadata: txnMetadata}
watermark := tx
var wal WAL
if truncateWAL {
wal = db.wal
Expand Down Expand Up @@ -369,20 +367,20 @@ func (w *offsetWriter) checksum() uint32 {
return w.runningChecksum.Sum32()
}

func (db *DB) snapshotWriter(tx uint64, txnMetadata []byte) func(context.Context, io.Writer) error {
func (db *DB) snapshotWriter(tx uint64) func(context.Context, io.Writer) error {
return func(ctx context.Context, w io.Writer) error {
return WriteSnapshot(ctx, tx, txnMetadata, db, w, false)
return WriteSnapshot(ctx, tx, db, w, false)
}
}

// offlineSnapshotWriter is used when a database is closing after all the tables have closed.
func (db *DB) offlineSnapshotWriter(tx uint64, txnMetadata []byte) func(context.Context, io.Writer) error {
func (db *DB) offlineSnapshotWriter(tx uint64) func(context.Context, io.Writer) error {
return func(ctx context.Context, w io.Writer) error {
return WriteSnapshot(ctx, tx, txnMetadata, db, w, true)
return WriteSnapshot(ctx, tx, db, w, true)
}
}

func WriteSnapshot(ctx context.Context, _ uint64, txnMetadata []byte, db *DB, w io.Writer, offline bool) error {
func WriteSnapshot(ctx context.Context, _ uint64, db *DB, w io.Writer, offline bool) error {
offW := newOffsetWriter(w)
w = offW
var tables []*Table
Expand All @@ -396,7 +394,7 @@ func WriteSnapshot(ctx context.Context, _ uint64, txnMetadata []byte, db *DB, w
return err
}

metadata := &snapshotpb.FooterData{TxnMetadata: txnMetadata}
metadata := &snapshotpb.FooterData{}
for _, t := range tables {
if err := func() error {
// Obtain a write block to prevent racing with
Expand Down Expand Up @@ -561,10 +559,10 @@ func readFooter(r io.ReaderAt, size int64) (*snapshotpb.FooterData, error) {
// loadSnapshot loads a snapshot from the given io.ReaderAt and returns the
// txnMetadata (if any) the snapshot was created with and an error if any
// occurred.
func loadSnapshot(ctx context.Context, db *DB, r io.ReaderAt, size int64) ([]byte, error) {
func loadSnapshot(ctx context.Context, db *DB, r io.ReaderAt, size int64) error {
footer, err := readFooter(r, size)
if err != nil {
return nil, err
return err
}

for i, tableMeta := range footer.TableMetadata {
Expand Down Expand Up @@ -672,11 +670,11 @@ func loadSnapshot(ctx context.Context, db *DB, r io.ReaderAt, size int64) ([]byt
delete(db.tables, cleanupTable.Name)
}
db.mtx.Unlock()
return nil, err
return err
}
}

return footer.TxnMetadata, nil
return nil
}

// cleanupSnapshotDir should be called with a tx at which the caller is certain
Expand Down
Loading

0 comments on commit d3ae5a2

Please sign in to comment.