Skip to content

Commit

Permalink
fix: dag migration and related foreach db method
Browse files Browse the repository at this point in the history
  • Loading branch information
kstdl committed Aug 3, 2023
1 parent 9bcea0a commit c1ebef6
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 35 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ taraxa-indexer

.vscode

data
data
backup*
11 changes: 0 additions & 11 deletions internal/storage/pebble/migration_types.go

This file was deleted.

24 changes: 16 additions & 8 deletions internal/storage/pebble/migrations/0_dag_removeSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import (
log "github.com/sirupsen/logrus"
)

type OldDag struct {
Hash models.Hash `json:"hash"`
Level models.Counter `json:"level"`
Sender models.Address `json:"sender"`
Timestamp models.Timestamp `json:"timestamp"`
TransactionCount models.Counter `json:"transactionCount"`
}

// RemoveSenderMigration is a migration that removes the Sender attribute from the Dag struct.
type RemoveSenderMigration struct {
id string
Expand All @@ -18,19 +26,19 @@ func (m *RemoveSenderMigration) GetId() string {

// Apply is the implementation of the Migration interface for the RemoveSenderMigration.
func (m *RemoveSenderMigration) Apply(s *pebble.Storage) error {
// Retrieve all Dags from the database
const DAG_BATCH_THRESHOLD = 1000
batch := s.NewBatch()
var last_key []byte

var done = false

for !done {
var o models.Dag
for {
var o OldDag
count := 0
s.ForEachFromKey(&o, last_key, func(key, res []byte) bool {
s.ForEachFromKey([]byte("d"), last_key, func(key, res []byte) (stop bool) {
err := rlp.DecodeBytes(res, &o)
if err != nil {
if err.Error() == "rlp: too few elements for migration.OldDag" {
return false
}
log.WithFields(log.Fields{"migration": m.id, "error": err}).Fatal("Error decoding Dag")
}
dag := models.Dag{
Expand All @@ -49,9 +57,9 @@ func (m *RemoveSenderMigration) Apply(s *pebble.Storage) error {
count++
return count == DAG_BATCH_THRESHOLD
})

batch.CommitBatch()
batch = s.NewBatch()
if count < DAG_BATCH_THRESHOLD {
batch.CommitBatch()
break
}
}
Expand Down
10 changes: 5 additions & 5 deletions internal/storage/pebble/migrations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ type Migration interface {
}

type Manager struct {
s *pebble.Storage
storage *pebble.Storage
migrations []Migration
}

func NewManager(s *pebble.Storage) *Manager {
m := Manager{
s: s,
storage: s,
}
m.RegisterMigration(&RemoveSenderMigration{id: "0_dag_removeSender"})
return &m
Expand All @@ -31,7 +31,7 @@ func (m *Manager) RegisterMigration(migration Migration) {

func (m *Manager) IsApplied(migration Migration) bool {
migrationId := ""
err := m.s.GetFromDB(&migrationId, []byte(migration_prefix+migration.GetId()))
err := m.storage.GetFromDB(&migrationId, []byte(migration_prefix+migration.GetId()))
return err == nil
}

Expand All @@ -41,11 +41,11 @@ func (m *Manager) ApplyAll() (err error) {
isApplied := m.IsApplied(migration)
if !isApplied {
log.WithFields(log.Fields{"migration": migration.GetId()}).Info("Running migration")
err = migration.Apply(m.s)
err = migration.Apply(m.storage)
if err != nil {
return
}
b := m.s.NewBatch()
b := m.storage.NewBatch()
err = b.AddToBatchFullKey(migration.GetId(), []byte(migration_prefix+migration.GetId()))
if err != nil {
return
Expand Down
16 changes: 8 additions & 8 deletions internal/storage/pebble/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func getPrefix(o interface{}) (ret string) {
ret = "t"
case *models.Pbft, models.Pbft:
ret = "p"
case *models.Dag, models.Dag, *OldDag, OldDag:
case *models.Dag, models.Dag:
ret = "d"
case *storage.AddressStats, storage.AddressStats:
ret = "s"
Expand Down Expand Up @@ -153,12 +153,13 @@ func (s *Storage) find(prefix []byte) *pebble.Iterator {
return iter
}

func (s *Storage) forEach(o interface{}, prefix, start_key []byte, fn func(key, res []byte) (stop bool), navigate func(iter *pebble.Iterator)) {
func (s *Storage) forEach(prefix, start_key []byte, fn func(key, res []byte) (stop bool), navigate func(iter *pebble.Iterator)) {
iter := s.find(prefix)
defer iter.Close()
if len(start_key) > 0 {
iter.SeekGE(start_key)
if len(start_key) == 0 {
start_key = prefix
}
iter.SeekGE(start_key)

for ; iter.Valid(); navigate(iter) {
if fn(iter.Key(), iter.Value()) {
Expand All @@ -167,9 +168,8 @@ func (s *Storage) forEach(o interface{}, prefix, start_key []byte, fn func(key,
}
}

func (s *Storage) ForEachFromKey(o interface{}, start_key []byte, fn func(key, res []byte) (stop bool)) {
prefix := []byte(getPrefix(o))
s.forEach(o, prefix, start_key, fn, func(iter *pebble.Iterator) { iter.Next() })
func (s *Storage) ForEachFromKey(prefix, start_key []byte, fn func(key, res []byte) (stop bool)) {
s.forEach(prefix, start_key, fn, func(iter *pebble.Iterator) { iter.Next() })
}

func (s *Storage) forEachPrefix(o interface{}, key_prefix string, start *uint64, fn func(key, res []byte) (stop bool), navigate func(iter *pebble.Iterator)) {
Expand All @@ -178,7 +178,7 @@ func (s *Storage) forEachPrefix(o interface{}, key_prefix string, start *uint64,
if start != nil {
start_key = getKey(getPrefix(&o), key_prefix, *start)
}
s.forEach(o, prefix, start_key, fn, navigate)
s.forEach(prefix, start_key, fn, navigate)
}

func (s *Storage) ForEach(o interface{}, key_prefix string, start *uint64, fn func(key, res []byte) (stop bool)) {
Expand Down
1 change: 0 additions & 1 deletion internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type Storage interface {
Close() error
ForEach(o interface{}, key_prefix string, start *uint64, fn func(key, res []byte) (stop bool))
ForEachBackwards(o interface{}, key_prefix string, start *uint64, fn func(key, res []byte) (stop bool))
ForEachFromKey(o interface{}, start_key []byte, fn func(key, res []byte) (stop bool))
NewBatch() Batch
GetTotalSupply() *TotalSupply
GetAccounts() []Account
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func main() {
c.ValidatorsYieldSavingInterval = uint64(*validators_yield_saving_interval)

fin := st.GetFinalizationData()
log.WithFields(log.Fields{"pbft_count": fin.PbftCount}).Info("Loaded db with")
log.WithFields(log.Fields{"pbft_count": fin.PbftCount, "dag_count": fin.DagCount, "trx_count": fin.TrxCount}).Info("Loaded db with")

apiHandler := api.NewApiHandler(st, c)
api.RegisterHandlers(e, apiHandler)
Expand Down

0 comments on commit c1ebef6

Please sign in to comment.