Skip to content

Commit

Permalink
Send batch of blocks from the main loop
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jun 25, 2024
1 parent 3c2104d commit d17408e
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 68 deletions.
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func main() {
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BatchRequestInterval: cfg.BatchRequestInterval,
},
)

Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type Config struct {
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll
BatchRequestInterval time.Duration `long:"batch-request-interval" env:"BATCH_REQUEST_INTERVAL" description:"Interval at which to send batch requests to Dune" default:"1s"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
1 change: 1 addition & 0 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Config struct {
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BatchRequestInterval time.Duration
}

type Info struct {
Expand Down
69 changes: 48 additions & 21 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
i.log.Info("SendBlocks: Starting to receive blocks")
blocks := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order
nextNumberToSend := startBlockNumber
batchTimer := time.NewTicker(i.cfg.BatchRequestInterval)
defer batchTimer.Stop()
for {
// Either receive a block, send blocks, or shut down (if the context is done, or the channel is closed).
select {
case <-ctx.Done():
i.log.Info("SendBlocks: Context canceled, stopping")
Expand All @@ -176,12 +179,11 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
i.log.Info("SendBlocks: Channel is closed, returning")
return nil
}

blocks[block.BlockNumber] = block
i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(blocks))

case <-batchTimer.C:
nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend)
i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
i.log.Info("SendBlocks: Sent completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
}
}
}
Expand All @@ -194,28 +196,53 @@ func (i *ingester) trySendCompletedBlocks(
blocks map[int64]models.RPCBlock,
nextNumberToSend int64,
) int64 {
// Send this block only if we have sent all previous blocks
// Collect a batch of blocks to send, only send those which are in order
batch := make([]models.RPCBlock, 0, len(blocks))
for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] {
if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block}); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextNumberToSend
}
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: block.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
}
// We've sent block N, so increment the pointer
batch = append(batch, block)
delete(blocks, nextNumberToSend)
nextNumberToSend++
}

if len(batch) == 0 {
return nextNumberToSend
}

blockNumbers := make([]string, len(batch))
for i, block := range batch {
blockNumbers[i] = fmt.Sprintf("%d", block.BlockNumber)
}

i.log.Info(
"SendBlocks: Sending batch",
"blockNumberFirst", batch[0].BlockNumber,
"blockNumberLast", batch[len(batch)-1].BlockNumber,
"batchSize", len(batch),
)

// Send the batch
if err := i.dune.SendBlocks(ctx, batch); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextNumberToSend
}
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error(
"SendBlocks: Failed to send batch, continuing",
"blockNumberFirst", batch[0].BlockNumber,
"blockNumberLast", batch[len(batch)-1].BlockNumber,
"error", err,
)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: batch[0].BlockNumber, // TODO?
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", batch[len(batch)-1].BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, batch[len(batch)-1].BlockNumber)
}

return nextNumberToSend
}

Expand Down
118 changes: 73 additions & 45 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester_test

import (
"context"
"fmt"
"io"
"log/slog"
"math/rand"
Expand All @@ -14,29 +15,38 @@ import (
duneapi_mock "github.com/duneanalytics/blockchain-ingester/mocks/duneapi"
jsonrpc_mock "github.com/duneanalytics/blockchain-ingester/mocks/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/models"
"github.com/go-errors/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestRunLoopUntilCancel(t *testing.T) {
func TestRunUntilCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(10)
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) != 1 {
panic("expected 1 block")
if len(blocks) == 0 {
return nil
}
block := blocks[0]

atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
if block.BlockNumber == maxBlockNumber {
// cancel execution when we send the last block
next := sentBlockNumber + 1
for _, block := range blocks {
// We cannot send blocks out of order to DuneAPI
if block.BlockNumber != next {
return fmt.Errorf("expected block %d, got %d", next, block.BlockNumber)
}
next++
}

lastBlockNumber := blocks[len(blocks)-1].BlockNumber
atomic.StoreInt64(&sentBlockNumber, lastBlockNumber)
if lastBlockNumber >= maxBlockNumber {
// cancel execution when we have sent the last block
cancel()
return context.Canceled
}

return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
Expand All @@ -62,20 +72,18 @@ func TestRunLoopUntilCancel(t *testing.T) {
// logOutput := os.Stderr
logOutput := io.Discard
ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
BatchRequestInterval: time.Nanosecond,
})

err := ing.Run(ctx, 1, -1) // run until canceled
require.ErrorIs(t, err, context.Canceled) // this is expected
require.Equal(t, sentBlockNumber, maxBlockNumber)
require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber)
}

func TestProduceBlockNumbers(t *testing.T) {
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error {
return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
return nil
},
Expand All @@ -93,8 +101,9 @@ func TestProduceBlockNumbers(t *testing.T) {
}
logOutput := io.Discard
ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
BatchRequestInterval: time.Nanosecond,
})
blockNumbers := make(chan int64)
var wg sync.WaitGroup
Expand All @@ -113,28 +122,33 @@ func TestSendBlocks(t *testing.T) {
sentBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) != 1 {
panic("expected 1 block")
if len(blocks) == 0 {
return nil
}
block := blocks[0]

// DuneAPI must fail if it receives blocks out of order
if block.BlockNumber != sentBlockNumber+1 {
return errors.Errorf("blocks out of order")
next := sentBlockNumber + 1
for _, block := range blocks {
// We cannot send blocks out of order to DuneAPI
if block.BlockNumber != next {
return fmt.Errorf("expected block %d, got %d", next, block.BlockNumber)
}
next++
}
atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {

lastBlockNumber := blocks[len(blocks)-1].BlockNumber
atomic.StoreInt64(&sentBlockNumber, lastBlockNumber)
return nil
},
}
// Swap these to see logs
// logOutput := os.Stderr
logOutput := io.Discard
ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), nil, duneapi, ingester.Config{
MaxBatchSize: 10, // this won't matter as we only run SendBlocks
PollInterval: 1000 * time.Millisecond,
})
ing := ingester.New(
slog.New(slog.NewTextHandler(logOutput, nil)),
nil, // node client isn't used in this unit test
duneapi,
ingester.Config{BatchRequestInterval: time.Nanosecond},
)

blocks := make(chan models.RPCBlock)

Expand All @@ -152,36 +166,44 @@ func TestSendBlocks(t *testing.T) {
}
// Now send the first block
blocks <- models.RPCBlock{BlockNumber: 1}
time.Sleep(time.Millisecond) // Allow enough time for the tick before closing the channel
close(blocks)
require.NoError(t, group.Wait())

// Ensure the last correct block was sent
require.Equal(t, int64(5), sentBlockNumber)
}

// TestRunLoopBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order
// TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order
// even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time.
func TestRunLoopBlocksOutOfOrder(t *testing.T) {
func TestRunBlocksOutOfOrder(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) != 1 {
panic("expected 1 block")
if len(blocks) == 0 {
return nil
}
block := blocks[0]

// Test must fail if DuneAPI receives blocks out of order
require.Equal(t, block.BlockNumber, sentBlockNumber+1)
next := sentBlockNumber + 1
for _, block := range blocks {
// We cannot send blocks out of order to DuneAPI
if block.BlockNumber != next {
return fmt.Errorf("expected block %d, got %d", next, block.BlockNumber)
}
next++
}

atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
if block.BlockNumber == maxBlockNumber {
// cancel execution when we send the last block
lastBlockNumber := blocks[len(blocks)-1].BlockNumber
atomic.StoreInt64(&sentBlockNumber, lastBlockNumber)
if lastBlockNumber >= maxBlockNumber {
// cancel execution when we have sent the last block
cancel()
return context.Canceled
}

return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
Expand All @@ -205,12 +227,18 @@ func TestRunLoopBlocksOutOfOrder(t *testing.T) {
// Swap these to see logs
// logOutput := os.Stderr
logOutput := io.Discard
ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{
MaxBatchSize: 10, // fetch blocks in multiple goroutines
PollInterval: 1000 * time.Millisecond,
})
ing := ingester.New(
slog.New(slog.NewTextHandler(logOutput, nil)),
rpcClient,
duneapi,
ingester.Config{
MaxBatchSize: 10, // fetch blocks in multiple goroutines
PollInterval: 1000 * time.Millisecond,
BatchRequestInterval: time.Nanosecond,
},
)

err := ing.Run(ctx, 1, -1) // run until canceled
require.ErrorIs(t, err, context.Canceled) // this is expected
require.Equal(t, sentBlockNumber, maxBlockNumber)
require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber)
}

0 comments on commit d17408e

Please sign in to comment.