From 16d3cfac1606bc2362faac2e9e01746ebfe9b7e2 Mon Sep 17 00:00:00 2001 From: ucwong Date: Sat, 26 Oct 2024 01:29:10 +0800 Subject: [PATCH] add DeleteRange feature --- core/rawdb/table.go | 6 ++ ctxcdb/database.go | 9 +++ ctxcdb/dbtest/testsuite.go | 82 ++++++++++++++++++++++++++ ctxcdb/leveldb/leveldb.go | 31 ++++++++++ ctxcdb/memorydb/memorydb.go | 15 +++++ ctxcdb/pebble/pebble.go | 13 +++- ctxcdb/remotedb/remotedb.go | 4 ++ tests/fuzzers/stacktrie/trie_fuzzer.go | 1 + trie/iterator_test.go | 2 +- trie/trie_test.go | 1 + 10 files changed, 162 insertions(+), 2 deletions(-) diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 2732f83d88..cc82cf9715 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -129,6 +129,12 @@ func (t *table) Delete(key []byte) error { return t.db.Delete(append([]byte(t.prefix), key...)) } +// DeleteRange deletes all of the keys (and values) in the range [start,end) +// (inclusive on start, exclusive on end). +func (t *table) DeleteRange(start, end []byte) error { + return t.db.DeleteRange(append([]byte(t.prefix), start...), append([]byte(t.prefix), end...)) +} + // NewIterator creates a binary-alphabetical iterator over a subset // of database content with a particular key prefix, starting at a particular // initial key (or after, if it does not exist). diff --git a/ctxcdb/database.go b/ctxcdb/database.go index 671bde604b..decbc3cdd6 100644 --- a/ctxcdb/database.go +++ b/ctxcdb/database.go @@ -37,6 +37,13 @@ type KeyValueWriter interface { Delete(key []byte) error } +// KeyValueRangeDeleter wraps the DeleteRange method of a backing data store. +type KeyValueRangeDeleter interface { + // DeleteRange deletes all of the keys (and values) in the range [start,end) + // (inclusive on start, exclusive on end). + DeleteRange(start, end []byte) error +} + // KeyValueStater wraps the Stat method of a backing data store. type KeyValueStater interface { // Stat returns a particular internal stat of the database. @@ -61,6 +68,7 @@ type KeyValueStore interface { KeyValueReader KeyValueWriter KeyValueStater + KeyValueRangeDeleter Batcher Iteratee Compacter @@ -154,6 +162,7 @@ type Reader interface { // immutable ancient data. type Writer interface { KeyValueWriter + KeyValueRangeDeleter AncientWriter } diff --git a/ctxcdb/dbtest/testsuite.go b/ctxcdb/dbtest/testsuite.go index 0e26f782b8..f01e3f05d6 100644 --- a/ctxcdb/dbtest/testsuite.go +++ b/ctxcdb/dbtest/testsuite.go @@ -21,6 +21,7 @@ import ( "crypto/rand" "reflect" "sort" + "strconv" "testing" "github.com/CortexFoundation/CortexTheseus/ctxcdb" @@ -403,6 +404,64 @@ func TestDatabaseSuite(t *testing.T, New func() ctxcdb.KeyValueStore) { t.Fatalf("expected error on batch.Write after Close") } }) + + t.Run("DeleteRange", func(t *testing.T) { + db := New() + defer db.Close() + + addRange := func(start, stop int) { + for i := start; i <= stop; i++ { + db.Put([]byte(strconv.Itoa(i)), nil) + } + } + + checkRange := func(start, stop int, exp bool) { + for i := start; i <= stop; i++ { + has, _ := db.Has([]byte(strconv.Itoa(i))) + if has && !exp { + t.Fatalf("unexpected key %d", i) + } + if !has && exp { + t.Fatalf("missing expected key %d", i) + } + } + } + + addRange(1, 9) + db.DeleteRange([]byte("9"), []byte("1")) + checkRange(1, 9, true) + db.DeleteRange([]byte("5"), []byte("5")) + checkRange(1, 9, true) + db.DeleteRange([]byte("5"), []byte("50")) + checkRange(1, 4, true) + checkRange(5, 5, false) + checkRange(6, 9, true) + db.DeleteRange([]byte(""), []byte("a")) + checkRange(1, 9, false) + + addRange(1, 999) + db.DeleteRange([]byte("12345"), []byte("54321")) + checkRange(1, 1, true) + checkRange(2, 5, false) + checkRange(6, 12, true) + checkRange(13, 54, false) + checkRange(55, 123, true) + checkRange(124, 543, false) + checkRange(544, 999, true) + + addRange(1, 999) + db.DeleteRange([]byte("3"), []byte("7")) + checkRange(1, 2, true) + checkRange(3, 6, false) + checkRange(7, 29, true) + checkRange(30, 69, false) + checkRange(70, 299, true) + checkRange(300, 699, false) + checkRange(700, 999, true) + + db.DeleteRange([]byte(""), []byte("a")) + checkRange(1, 999, false) + }) } // BenchDatabaseSuite runs a suite of benchmarks against a KeyValueStore database @@ -498,6 +557,29 @@ func BenchDatabaseSuite(b *testing.B, New func() ctxcdb.KeyValueStore) { benchBatchWrite(b, keys, vals) }) }) + b.Run("DeleteRange", func(b *testing.B) { + benchDeleteRange := func(b *testing.B, count int) { + db := New() + defer db.Close() + + for i := 0; i < count; i++ { + db.Put([]byte(strconv.Itoa(i)), nil) + } + b.ResetTimer() + b.ReportAllocs() + + db.DeleteRange([]byte("0"), []byte("999999999")) + } + b.Run("DeleteRange100", func(b *testing.B) { + benchDeleteRange(b, 100) + }) + b.Run("DeleteRange1k", func(b *testing.B) { + benchDeleteRange(b, 1000) + }) + b.Run("DeleteRange10k", func(b *testing.B) { + benchDeleteRange(b, 10000) + }) + }) } func iterateKeys(it ctxcdb.Iterator) []string { diff --git a/ctxcdb/leveldb/leveldb.go b/ctxcdb/leveldb/leveldb.go index 03b85e9ffa..a68c9c2c68 100644 --- a/ctxcdb/leveldb/leveldb.go +++ b/ctxcdb/leveldb/leveldb.go @@ -21,6 +21,7 @@ package leveldb import ( + "bytes" "fmt" "strings" "sync" @@ -207,6 +208,36 @@ func (db *Database) Delete(key []byte) error { return db.db.Delete(key, nil) } +var ErrTooManyKeys = errors.New("too many keys in deleted range") + +// DeleteRange deletes all of the keys (and values) in the range [start,end) +// (inclusive on start, exclusive on end). +// Note that this is a fallback implementation as leveldb does not natively +// support range deletion. It can be slow and therefore the number of deleted +// keys is limited in order to avoid blocking for a very long time. +// ErrTooManyKeys is returned if the range has only been partially deleted. +// In this case the caller can repeat the call until it finally succeeds. +func (db *Database) DeleteRange(start, end []byte) error { + batch := db.NewBatch() + it := db.NewIterator(nil, start) + defer it.Release() + + var count int + for it.Next() && bytes.Compare(end, it.Key()) > 0 { + count++ + if count > 10000 { // should not block for more than a second + if err := batch.Write(); err != nil { + return err + } + return ErrTooManyKeys + } + if err := batch.Delete(it.Key()); err != nil { + return err + } + } + return batch.Write() +} + // NewBatch creates a write-only key-value store that buffers changes to its host // database until a final write is called. func (db *Database) NewBatch() ctxcdb.Batch { diff --git a/ctxcdb/memorydb/memorydb.go b/ctxcdb/memorydb/memorydb.go index c3ff4f8a1e..72fd576cdb 100644 --- a/ctxcdb/memorydb/memorydb.go +++ b/ctxcdb/memorydb/memorydb.go @@ -18,6 +18,7 @@ package memorydb import ( + "bytes" "errors" "sort" "strings" @@ -125,6 +126,20 @@ func (db *Database) Delete(key []byte) error { return nil } +// DeleteRange deletes all of the keys (and values) in the range [start,end) +// (inclusive on start, exclusive on end). +func (db *Database) DeleteRange(start, end []byte) error { + it := db.NewIterator(nil, start) + defer it.Release() + + for it.Next() && bytes.Compare(end, it.Key()) > 0 { + if err := db.Delete(it.Key()); err != nil { + return err + } + } + return nil +} + // NewBatch creates a write-only key-value store that buffers changes to its host // database until a final write is called. func (db *Database) NewBatch() ctxcdb.Batch { diff --git a/ctxcdb/pebble/pebble.go b/ctxcdb/pebble/pebble.go index 59268b936b..7e508c01dd 100644 --- a/ctxcdb/pebble/pebble.go +++ b/ctxcdb/pebble/pebble.go @@ -324,7 +324,18 @@ func (d *Database) Delete(key []byte) error { if d.closed { return pebble.ErrClosed } - return d.db.Delete(key, nil) + return d.db.Delete(key, d.writeOptions) +} + +// DeleteRange deletes all of the keys (and values) in the range [start,end) +// (inclusive on start, exclusive on end). +func (d *Database) DeleteRange(start, end []byte) error { + d.quitLock.RLock() + defer d.quitLock.RUnlock() + if d.closed { + return pebble.ErrClosed + } + return d.db.DeleteRange(start, end, d.writeOptions) } // NewBatch creates a write-only key-value store that buffers changes to its host diff --git a/ctxcdb/remotedb/remotedb.go b/ctxcdb/remotedb/remotedb.go index e5f7d06f68..66cd0f3fba 100644 --- a/ctxcdb/remotedb/remotedb.go +++ b/ctxcdb/remotedb/remotedb.go @@ -94,6 +94,10 @@ func (db *Database) Delete(key []byte) error { panic("not supported") } +func (db *Database) DeleteRange(start, end []byte) error { + panic("not supported") +} + func (db *Database) ModifyAncients(f func(ctxcdb.AncientWriteOp) error) (int64, error) { panic("not supported") } diff --git a/tests/fuzzers/stacktrie/trie_fuzzer.go b/tests/fuzzers/stacktrie/trie_fuzzer.go index 5941876e87..ba538c08bd 100644 --- a/tests/fuzzers/stacktrie/trie_fuzzer.go +++ b/tests/fuzzers/stacktrie/trie_fuzzer.go @@ -65,6 +65,7 @@ type spongeDb struct { func (s *spongeDb) Has(key []byte) (bool, error) { panic("implement me") } func (s *spongeDb) Get(key []byte) ([]byte, error) { return nil, errors.New("no such elem") } func (s *spongeDb) Delete(key []byte) error { panic("implement me") } +func (s *spongeDb) DeleteRange(start, end []byte) error { panic("implement me") } func (s *spongeDb) NewBatch() ctxcdb.Batch { return &spongeBatch{s} } func (s *spongeDb) NewBatchWithSize(size int) ctxcdb.Batch { return &spongeBatch{s} } func (s *spongeDb) NewSnapshot() (ctxcdb.Snapshot, error) { panic("implement me") } diff --git a/trie/iterator_test.go b/trie/iterator_test.go index 05717e9dd7..d7b77e14d1 100644 --- a/trie/iterator_test.go +++ b/trie/iterator_test.go @@ -489,7 +489,7 @@ func (l *loggingDb) Put(key []byte, value []byte) error { func (l *loggingDb) Delete(key []byte) error { return l.backend.Delete(key) } - +func (s *loggingDb) DeleteRange(start, end []byte) error { panic("implement me") } func (l *loggingDb) NewBatch() ctxcdb.Batch { return l.backend.NewBatch() } diff --git a/trie/trie_test.go b/trie/trie_test.go index 0f9adaf458..ebfe2a84f2 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -740,6 +740,7 @@ func (s *spongeDb) NewBatch() ctxcdb.Batch { return &spongeBat func (s *spongeDb) NewBatchWithSize(size int) ctxcdb.Batch { return &spongeBatch{s} } func (s *spongeDb) NewSnapshot() (ctxcdb.Snapshot, error) { panic("implement me") } func (s *spongeDb) Stat(property string) (string, error) { panic("implement me") } +func (s *spongeDb) DeleteRange(start, end []byte) error { panic("implement me") } func (s *spongeDb) Compact(start []byte, limit []byte) error { panic("implement me") } func (s *spongeDb) Close() error { return nil } func (s *spongeDb) Put(key []byte, value []byte) error {