diff --git a/cmd/main.go b/cmd/main.go index 6a580cd..4962ade 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -87,6 +87,7 @@ func main() { PollInterval: cfg.PollInterval, Stack: cfg.RPCStack, BlockchainName: cfg.BlockchainName, + BatchRequestInterval: cfg.BatchRequestInterval, }, ) diff --git a/config/config.go b/config/config.go index b0f71ce..df665fd 100644 --- a/config/config.go +++ b/config/config.go @@ -39,8 +39,9 @@ type Config struct { PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll RPCNode RPCClient - RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll - Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll + RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll + Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll + BatchRequestInterval time.Duration `long:"batch-request-interval" env:"BATCH_REQUEST_INTERVAL" description:"Interval at which to send batch requests to Dune" default:"1s"` // nolint:lll } func (c Config) HasError() error { diff --git a/ingester/ingester.go b/ingester/ingester.go index fbf3061..c0b2f10 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -49,6 +49,7 @@ type Config struct { ReportProgressInterval time.Duration Stack models.EVMStack BlockchainName string + BatchRequestInterval time.Duration } type Info struct { diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 4f892dd..7db58aa 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -166,7 +166,10 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo i.log.Info("SendBlocks: Starting to receive blocks") blocks := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order nextNumberToSend := startBlockNumber + batchTimer := time.NewTicker(i.cfg.BatchRequestInterval) + defer batchTimer.Stop() for { + // Either receive a block, send blocks, or shut down (if the context is done, or the channel is closed). select { case <-ctx.Done(): i.log.Info("SendBlocks: Context canceled, stopping") @@ -176,12 +179,11 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo i.log.Info("SendBlocks: Channel is closed, returning") return nil } - blocks[block.BlockNumber] = block i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(blocks)) - + case <-batchTimer.C: nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend) - i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) + i.log.Info("SendBlocks: Sent completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) } } } @@ -194,28 +196,53 @@ func (i *ingester) trySendCompletedBlocks( blocks map[int64]models.RPCBlock, nextNumberToSend int64, ) int64 { - // Send this block only if we have sent all previous blocks + // Collect a batch of blocks to send, only send those which are in order + batch := make([]models.RPCBlock, 0, len(blocks)) for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] { - if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block}); err != nil { - if errors.Is(err, context.Canceled) { - i.log.Info("SendBlocks: Context canceled, stopping") - return nextNumberToSend - } - // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap - i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err) - i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumber: block.BlockNumber, - Error: err, - }) - } else { - i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber) - atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber) - } - // We've sent block N, so increment the pointer + batch = append(batch, block) delete(blocks, nextNumberToSend) nextNumberToSend++ } + + if len(batch) == 0 { + return nextNumberToSend + } + + blockNumbers := make([]string, len(batch)) + for i, block := range batch { + blockNumbers[i] = fmt.Sprintf("%d", block.BlockNumber) + } + + i.log.Info( + "SendBlocks: Sending batch", + "blockNumberFirst", batch[0].BlockNumber, + "blockNumberLast", batch[len(batch)-1].BlockNumber, + "batchSize", len(batch), + ) + + // Send the batch + if err := i.dune.SendBlocks(ctx, batch); err != nil { + if errors.Is(err, context.Canceled) { + i.log.Info("SendBlocks: Context canceled, stopping") + return nextNumberToSend + } + // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap + i.log.Error( + "SendBlocks: Failed to send batch, continuing", + "blockNumberFirst", batch[0].BlockNumber, + "blockNumberLast", batch[len(batch)-1].BlockNumber, + "error", err, + ) + i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ + Timestamp: time.Now(), + BlockNumber: batch[0].BlockNumber, // TODO? + Error: err, + }) + } else { + i.log.Info("Updating latest ingested block number", "blockNumber", batch[len(batch)-1].BlockNumber) + atomic.StoreInt64(&i.info.IngestedBlockNumber, batch[len(batch)-1].BlockNumber) + } + return nextNumberToSend } diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 1bedc47..d4e693c 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -2,6 +2,7 @@ package ingester_test import ( "context" + "fmt" "io" "log/slog" "math/rand" @@ -14,29 +15,38 @@ import ( duneapi_mock "github.com/duneanalytics/blockchain-ingester/mocks/duneapi" jsonrpc_mock "github.com/duneanalytics/blockchain-ingester/mocks/jsonrpc" "github.com/duneanalytics/blockchain-ingester/models" - "github.com/go-errors/errors" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) -func TestRunLoopUntilCancel(t *testing.T) { +func TestRunUntilCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(10) sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { - if len(blocks) != 1 { - panic("expected 1 block") + if len(blocks) == 0 { + return nil } - block := blocks[0] - atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) - if block.BlockNumber == maxBlockNumber { - // cancel execution when we send the last block + next := sentBlockNumber + 1 + for _, block := range blocks { + // We cannot send blocks out of order to DuneAPI + if block.BlockNumber != next { + return fmt.Errorf("expected block %d, got %d", next, block.BlockNumber) + } + next++ + } + + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + atomic.StoreInt64(&sentBlockNumber, lastBlockNumber) + if lastBlockNumber >= maxBlockNumber { + // cancel execution when we have sent the last block cancel() return context.Canceled } + return nil }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { @@ -62,20 +72,18 @@ func TestRunLoopUntilCancel(t *testing.T) { // logOutput := os.Stderr logOutput := io.Discard ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ - MaxBatchSize: 1, - PollInterval: 1000 * time.Millisecond, + MaxBatchSize: 1, + PollInterval: 1000 * time.Millisecond, + BatchRequestInterval: time.Nanosecond, }) err := ing.Run(ctx, 1, -1) // run until canceled require.ErrorIs(t, err, context.Canceled) // this is expected - require.Equal(t, sentBlockNumber, maxBlockNumber) + require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber) } func TestProduceBlockNumbers(t *testing.T) { duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error { - return nil - }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { return nil }, @@ -93,8 +101,9 @@ func TestProduceBlockNumbers(t *testing.T) { } logOutput := io.Discard ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ - MaxBatchSize: 1, - PollInterval: 1000 * time.Millisecond, + MaxBatchSize: 1, + PollInterval: 1000 * time.Millisecond, + BatchRequestInterval: time.Nanosecond, }) blockNumbers := make(chan int64) var wg sync.WaitGroup @@ -113,28 +122,33 @@ func TestSendBlocks(t *testing.T) { sentBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { - if len(blocks) != 1 { - panic("expected 1 block") + if len(blocks) == 0 { + return nil } - block := blocks[0] - // DuneAPI must fail if it receives blocks out of order - if block.BlockNumber != sentBlockNumber+1 { - return errors.Errorf("blocks out of order") + next := sentBlockNumber + 1 + for _, block := range blocks { + // We cannot send blocks out of order to DuneAPI + if block.BlockNumber != next { + return fmt.Errorf("expected block %d, got %d", next, block.BlockNumber) + } + next++ } - atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) - return nil - }, - PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + atomic.StoreInt64(&sentBlockNumber, lastBlockNumber) return nil }, } + // Swap these to see logs // logOutput := os.Stderr logOutput := io.Discard - ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), nil, duneapi, ingester.Config{ - MaxBatchSize: 10, // this won't matter as we only run SendBlocks - PollInterval: 1000 * time.Millisecond, - }) + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + nil, // node client isn't used in this unit test + duneapi, + ingester.Config{BatchRequestInterval: time.Nanosecond}, + ) blocks := make(chan models.RPCBlock) @@ -152,6 +166,7 @@ func TestSendBlocks(t *testing.T) { } // Now send the first block blocks <- models.RPCBlock{BlockNumber: 1} + time.Sleep(time.Millisecond) // Allow enough time for the tick before closing the channel close(blocks) require.NoError(t, group.Wait()) @@ -159,29 +174,36 @@ func TestSendBlocks(t *testing.T) { require.Equal(t, int64(5), sentBlockNumber) } -// TestRunLoopBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order +// TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order // even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time. -func TestRunLoopBlocksOutOfOrder(t *testing.T) { +func TestRunBlocksOutOfOrder(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(1000) sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { - if len(blocks) != 1 { - panic("expected 1 block") + if len(blocks) == 0 { + return nil } - block := blocks[0] - // Test must fail if DuneAPI receives blocks out of order - require.Equal(t, block.BlockNumber, sentBlockNumber+1) + next := sentBlockNumber + 1 + for _, block := range blocks { + // We cannot send blocks out of order to DuneAPI + if block.BlockNumber != next { + return fmt.Errorf("expected block %d, got %d", next, block.BlockNumber) + } + next++ + } - atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) - if block.BlockNumber == maxBlockNumber { - // cancel execution when we send the last block + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + atomic.StoreInt64(&sentBlockNumber, lastBlockNumber) + if lastBlockNumber >= maxBlockNumber { + // cancel execution when we have sent the last block cancel() return context.Canceled } + return nil }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { @@ -205,12 +227,18 @@ func TestRunLoopBlocksOutOfOrder(t *testing.T) { // Swap these to see logs // logOutput := os.Stderr logOutput := io.Discard - ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ - MaxBatchSize: 10, // fetch blocks in multiple goroutines - PollInterval: 1000 * time.Millisecond, - }) + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + MaxBatchSize: 10, // fetch blocks in multiple goroutines + PollInterval: 1000 * time.Millisecond, + BatchRequestInterval: time.Nanosecond, + }, + ) err := ing.Run(ctx, 1, -1) // run until canceled require.ErrorIs(t, err, context.Canceled) // this is expected - require.Equal(t, sentBlockNumber, maxBlockNumber) + require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber) }