Skip to content

Commit

Permalink
complete processor testing
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar committed Jul 8, 2024
1 parent a17868b commit b3c0976
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 37 deletions.
69 changes: 53 additions & 16 deletions localbridgesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
)

const (
eventsTable = "localbridgesync-events"
eventsTable = "localbridgesync-events"
lastBlockTable = "localbridgesync-lastBlock"
)

var (
ErrBlockNotProcessed = errors.New("given block(s) have not been processed yet")
lastBlokcKey = []byte("lb")
)

type processor struct {
Expand All @@ -24,7 +26,8 @@ type processor struct {

func tableCfgFunc(defaultBuckets kv.TableCfg) kv.TableCfg {
return kv.TableCfg{
eventsTable: {},
eventsTable: {},
lastBlockTable: {},
}
}

Expand All @@ -41,11 +44,11 @@ func newProcessor(dbPath string) (*processor, error) {
}, nil
}

// GetClaimsAndBridges returns the claims and bridges occurred between fromBlock, toBlock both included
// GetClaimsAndBridges returns the claims and bridges occurred between fromBlock, toBlock both included.
// If toBlock has not been porcessed yet, ErrBlockNotProcessed will be returned
func (p *processor) GetClaimsAndBridges(
ctx context.Context, fromBlock, toBlock uint64,
) ([]Claim, []Bridge, error) {
// TODO: if toBlock is not yet synced, return error, however we do not store blocks if they have no events :(?
claims := []Claim{}
bridges := []Bridge{}

Expand All @@ -54,17 +57,21 @@ func (p *processor) GetClaimsAndBridges(
return nil, nil, err
}
defer tx.Rollback()
lpb, err := p.getLastProcessedBlockWithTx(tx)
if lpb < toBlock {
return nil, nil, ErrBlockNotProcessed
}
c, err := tx.Cursor(eventsTable)
if err != nil {
return nil, nil, err
}
defer c.Close()

for k, v, err := c.Seek(blockNum2Key(fromBlock)); k != nil; k, v, err = c.Next() {
for k, v, err := c.Seek(blockNum2Bytes(fromBlock)); k != nil; k, v, err = c.Next() {
if err != nil {
return nil, nil, err
}
if key2BlockNum(k) > toBlock {
if bytes2BlockNum(k) > toBlock {
break
}
block := bridgeEvents{}
Expand All @@ -80,7 +87,22 @@ func (p *processor) GetClaimsAndBridges(
}

func (p *processor) getLastProcessedBlock(ctx context.Context) (uint64, error) {
return 0, errors.New("not implemented")
tx, err := p.db.BeginRo(ctx)
if err != nil {
return 0, err
}
defer tx.Rollback()
return p.getLastProcessedBlockWithTx(tx)
}

func (p *processor) getLastProcessedBlockWithTx(tx kv.Tx) (uint64, error) {
if blockNumBytes, err := tx.GetOne(lastBlockTable, lastBlokcKey); err != nil {
return 0, err
} else if blockNumBytes == nil {
return 0, nil
} else {
return bytes2BlockNum(blockNumBytes), nil
}
}

func (p *processor) reorg(firstReorgedBlock uint64) error {
Expand All @@ -93,7 +115,7 @@ func (p *processor) reorg(firstReorgedBlock uint64) error {
return err
}
defer c.Close()
firstKey := blockNum2Key(firstReorgedBlock)
firstKey := blockNum2Bytes(firstReorgedBlock)
for k, _, err := c.Seek(firstKey); k != nil; k, _, err = c.Next() {
if err != nil {
tx.Rollback()
Expand All @@ -104,32 +126,47 @@ func (p *processor) reorg(firstReorgedBlock uint64) error {
return err
}
}
if err := p.updateLastProcessedBlock(tx, firstReorgedBlock-1); err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}

func (p *processor) storeBridgeEvents(blockNum uint64, block bridgeEvents) error {
// TODO: add logic to store last processed block even if there are no events
value, err := json.Marshal(block)
if err != nil {
return err
}
tx, err := p.db.BeginRw(context.Background())
if err != nil {
return err
}
if err := tx.Put(eventsTable, blockNum2Key(blockNum), value); err != nil {
if len(block.Bridges) > 0 || len(block.Claims) > 0 {
value, err := json.Marshal(block)
if err != nil {
tx.Rollback()
return err
}
if err := tx.Put(eventsTable, blockNum2Bytes(blockNum), value); err != nil {
tx.Rollback()
return err
}
}
if err := p.updateLastProcessedBlock(tx, blockNum); err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}

func blockNum2Key(blockNum uint64) []byte {
func (p *processor) updateLastProcessedBlock(tx kv.RwTx, blockNum uint64) error {
blockNumBytes := blockNum2Bytes(blockNum)
return tx.Put(lastBlockTable, lastBlokcKey, blockNumBytes)
}

func blockNum2Bytes(blockNum uint64) []byte {
key := make([]byte, 8)
binary.LittleEndian.PutUint64(key, blockNum)
return key
}

func key2BlockNum(key []byte) uint64 {
func bytes2BlockNum(key []byte) uint64 {
return binary.LittleEndian.Uint64(key)
}
154 changes: 133 additions & 21 deletions localbridgesync/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"slices"
"testing"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -19,6 +20,7 @@ func TestProceessor(t *testing.T) {
&getLastProcessedBlockAction{
p: p,
description: "on an empty processor",
ctx: context.Background(),
expectedLastProcessedBlock: 0,
expectedErr: nil,
},
Expand All @@ -40,8 +42,8 @@ func TestProceessor(t *testing.T) {
ctx: context.Background(),
fromBlock: 0,
toBlock: 2,
expectedClaims: []Claim{},
expectedBridges: []Bridge{},
expectedClaims: nil,
expectedBridges: nil,
expectedErr: ErrBlockNotProcessed,
},
&storeBridgeEventsAction{
Expand All @@ -55,6 +57,7 @@ func TestProceessor(t *testing.T) {
&getLastProcessedBlockAction{
p: p,
description: "after block1",
ctx: context.Background(),
expectedLastProcessedBlock: 1,
expectedErr: nil,
},
Expand All @@ -64,30 +67,20 @@ func TestProceessor(t *testing.T) {
ctx: context.Background(),
fromBlock: 0,
toBlock: 2,
expectedClaims: block1.Events.Claims,
expectedBridges: block1.Events.Bridges,
expectedErr: nil,
expectedClaims: nil,
expectedBridges: nil,
expectedErr: ErrBlockNotProcessed,
},
&getClaimsAndBridgesAction{
p: p,
description: "after block1: range 0, 1",
description: "after block1: range 1, 1",
ctx: context.Background(),
fromBlock: 1,
toBlock: 1,
expectedClaims: block1.Events.Claims,
expectedBridges: block1.Events.Bridges,
expectedErr: nil,
},
&getClaimsAndBridgesAction{
p: p,
description: "after block1: range 2, 2",
ctx: context.Background(),
fromBlock: 2,
toBlock: 2,
expectedClaims: []Claim{},
expectedBridges: []Bridge{},
expectedErr: ErrBlockNotProcessed,
},
&reorgAction{
p: p,
description: "after block1",
Expand All @@ -101,8 +94,8 @@ func TestProceessor(t *testing.T) {
ctx: context.Background(),
fromBlock: 0,
toBlock: 2,
expectedClaims: []Claim{},
expectedBridges: []Bridge{},
expectedClaims: nil,
expectedBridges: nil,
expectedErr: ErrBlockNotProcessed,
},
&storeBridgeEventsAction{
Expand All @@ -120,7 +113,14 @@ func TestProceessor(t *testing.T) {
block: block3.Events,
expectedErr: nil,
},
// processed: block1, block2
// processed: block1, block3
&getLastProcessedBlockAction{
p: p,
description: "after block3",
ctx: context.Background(),
expectedLastProcessedBlock: 3,
expectedErr: nil,
},
&getClaimsAndBridgesAction{
p: p,
description: "after block3: range 2, 2",
Expand All @@ -131,8 +131,114 @@ func TestProceessor(t *testing.T) {
expectedBridges: []Bridge{},
expectedErr: nil,
},

// TODO: keep going!
&getClaimsAndBridgesAction{
p: p,
description: "after block3: range 1, 3",
ctx: context.Background(),
fromBlock: 1,
toBlock: 3,
expectedClaims: append(block1.Events.Claims, block3.Events.Claims...),
expectedBridges: append(block1.Events.Bridges, block3.Events.Bridges...),
expectedErr: nil,
},
&reorgAction{
p: p,
description: "after block3, with value 3",
firstReorgedBlock: 3,
expectedErr: nil,
},
// processed: block1
&getLastProcessedBlockAction{
p: p,
description: "after block3 reorged",
ctx: context.Background(),
expectedLastProcessedBlock: 2,
expectedErr: nil,
},
&reorgAction{
p: p,
description: "after block3, with value 2",
firstReorgedBlock: 2,
expectedErr: nil,
},
&getLastProcessedBlockAction{
p: p,
description: "after block2 reorged",
ctx: context.Background(),
expectedLastProcessedBlock: 1,
expectedErr: nil,
},
&storeBridgeEventsAction{
p: p,
description: "block3 after reorg",
blockNum: block3.Num,
block: block3.Events,
expectedErr: nil,
},
// processed: block1, block3
&storeBridgeEventsAction{
p: p,
description: "block4",
blockNum: block4.Num,
block: block4.Events,
expectedErr: nil,
},
// processed: block1, block3, block4
&storeBridgeEventsAction{
p: p,
description: "block5",
blockNum: block5.Num,
block: block5.Events,
expectedErr: nil,
},
// processed: block1, block3, block4, block5
&getLastProcessedBlockAction{
p: p,
description: "after block5",
ctx: context.Background(),
expectedLastProcessedBlock: 5,
expectedErr: nil,
},
&getClaimsAndBridgesAction{
p: p,
description: "after block5: range 1, 3",
ctx: context.Background(),
fromBlock: 1,
toBlock: 3,
expectedClaims: append(block1.Events.Claims, block3.Events.Claims...),
expectedBridges: append(block1.Events.Bridges, block3.Events.Bridges...),
expectedErr: nil,
},
&getClaimsAndBridgesAction{
p: p,
description: "after block5: range 4, 5",
ctx: context.Background(),
fromBlock: 4,
toBlock: 5,
expectedClaims: append(block4.Events.Claims, block5.Events.Claims...),
expectedBridges: append(block4.Events.Bridges, block5.Events.Bridges...),
expectedErr: nil,
},
&getClaimsAndBridgesAction{
p: p,
description: "after block5: range 0, 5",
ctx: context.Background(),
fromBlock: 0,
toBlock: 5,
expectedClaims: slices.Concat(
block1.Events.Claims,
block3.Events.Claims,
block4.Events.Claims,
block5.Events.Claims,
),
expectedBridges: slices.Concat(
block1.Events.Bridges,
block3.Events.Bridges,
block4.Events.Bridges,
block5.Events.Bridges,
),
expectedErr: nil,
},
}

for _, a := range actions {
Expand Down Expand Up @@ -202,20 +308,26 @@ var (
DepositCount: 3,
},
},
Claims: []Claim{},
},
}
block4 = block{
blockHeader: blockHeader{
Num: 4,
Hash: common.HexToHash("03"),
},
Events: bridgeEvents{
Bridges: []Bridge{},
Claims: []Claim{},
},
}
block5 = block{
blockHeader: blockHeader{
Num: 5,
Hash: common.HexToHash("04"),
},
Events: bridgeEvents{
Bridges: []Bridge{},
Claims: []Claim{
{
GlobalIndex: big.NewInt(4),
Expand Down

0 comments on commit b3c0976

Please sign in to comment.