From 6730e76f140bbc72d7e8832439adef4da55488e0 Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Tue, 25 Jun 2024 14:31:44 +0200 Subject: [PATCH 1/4] Send batch of blocks from the main loop --- README.md | 5 - client/duneapi/client.go | 15 ++- cmd/main.go | 8 +- config/config.go | 6 +- ingester/ingester.go | 31 +++-- ingester/mainloop.go | 160 +++++++++++++++++++------- ingester/mainloop_test.go | 230 +++++++++++++++++++++++++++++--------- models/block.go | 4 + 8 files changed, 340 insertions(+), 119 deletions(-) diff --git a/README.md b/README.md index 6047ba4..dd69531 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,6 @@ You can use our public docker container image and run it as such: ```bash docker run -e BLOCKCHAIN_NAME='foo' -e RPC_NODE_URL='http://localhost:8545' -e DUNE_API_KEY='your-key-here' duneanalytics/node-indexer - ``` @@ -36,7 +35,6 @@ Build the binary for your OS: $ make build $ BLOCKCHAIN_NAME='foo' RPC_NODE_URL='http://localhost:8545' DUNE_API_KEY='your-key-here' ./indexer - ``` ## Configuration Options @@ -44,7 +42,4 @@ $ BLOCKCHAIN_NAME='foo' RPC_NODE_URL='http://localhost:8545' DUNE_API_KEY='your- You can see all the configuration options by using the `--help` argument: ```bash docker run duneanalytics/node-indexer ./indexer --help - ``` - - diff --git a/client/duneapi/client.go b/client/duneapi/client.go index 035e633..18ca280 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -59,7 +59,11 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line checkRetry := func(ctx context.Context, resp *http.Response, err error) (bool, error) { yes, err2 := retryablehttp.DefaultRetryPolicy(ctx, resp, err) if yes { - log.Warn("Retrying request", "statusCode", resp.Status, "error", err) + if resp == nil { + log.Warn("Retrying request", "error", err) + } else { + log.Warn("Retrying request", "statusCode", resp.Status, "error", err) + } } return yes, err2 } @@ -191,11 +195,6 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques return err } - err = json.NewDecoder(resp.Body).Decode(&response) - if err != nil { - return err - } - return nil } @@ -220,7 +219,7 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch defer func() { if err != nil { c.log.Error("Sending progress report failed", - "lastIngestedBlockNumer", request.LastIngestedBlockNumber, + "lastIngestedBlockNumber", request.LastIngestedBlockNumber, "error", err, "statusCode", responseStatus, "duration", time.Since(start), @@ -228,7 +227,7 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch ) } else { c.log.Info("Sent progress report", - "lastIngestedBlockNumer", request.LastIngestedBlockNumber, + "lastIngestedBlockNumber", request.LastIngestedBlockNumber, "latestBlockNumber", request.LatestBlockNumber, "duration", time.Since(start), ) diff --git a/cmd/main.go b/cmd/main.go index 6a580cd..72070b3 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -65,8 +65,9 @@ func main() { // Get stored progress unless config indicates we should start from 0 var startBlockNumber int64 // Default to -1 to start where the ingester left off + var progress *models.BlockchainIndexProgress if cfg.BlockHeight == -1 { - progress, err := duneClient.GetProgressReport(ctx) + progress, err = duneClient.GetProgressReport(ctx) if err != nil { stdlog.Fatal(err) } else { @@ -82,12 +83,15 @@ func main() { rpcClient, duneClient, ingester.Config{ - MaxBatchSize: cfg.Concurrency, + MaxConcurrentRequests: cfg.RPCConcurrency, ReportProgressInterval: cfg.ReportProgressInterval, PollInterval: cfg.PollInterval, Stack: cfg.RPCStack, BlockchainName: cfg.BlockchainName, + BlockSubmitInterval: cfg.BlockSubmitInterval, + SkipFailedBlocks: cfg.SkipFailedBlocks, }, + progress, ) wg.Add(1) diff --git a/config/config.go b/config/config.go index b0f71ce..ada04c5 100644 --- a/config/config.go +++ b/config/config.go @@ -39,8 +39,10 @@ type Config struct { PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll RPCNode RPCClient - RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll - Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll + RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll + RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"10"` // nolint:lll + BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"1s"` // nolint:lll + SkipFailedBlocks bool `long:"skip-failed-blocks" env:"SKIP_FAILED_BLOCKS" description:"Skip failed blocks when submitting to Dune"` // nolint:lll } func (c Config) HasError() error { diff --git a/ingester/ingester.go b/ingester/ingester.go index fbf3061..e9eeea0 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -44,11 +44,13 @@ const ( ) type Config struct { - MaxBatchSize int + MaxConcurrentRequests int PollInterval time.Duration ReportProgressInterval time.Duration Stack models.EVMStack BlockchainName string + BlockSubmitInterval time.Duration + SkipFailedBlocks bool } type Info struct { @@ -60,9 +62,9 @@ type Info struct { } type ErrorInfo struct { - Timestamp time.Time - BlockNumber int64 - Error error + Timestamp time.Time + BlockNumbers string + Error error } type ingester struct { @@ -73,16 +75,27 @@ type ingester struct { info Info } -func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, cfg Config) Ingester { +func New( + log *slog.Logger, + node jsonrpc.BlockchainClient, + dune duneapi.BlockchainIngester, + cfg Config, + progress *models.BlockchainIndexProgress, +) Ingester { + info := Info{ + RPCErrors: []ErrorInfo{}, + DuneErrors: []ErrorInfo{}, + } + if progress != nil { + info.LatestBlockNumber = progress.LatestBlockNumber + info.IngestedBlockNumber = progress.LastIngestedBlockNumber + } ing := &ingester{ log: log.With("module", "ingester"), node: node, dune: dune, cfg: cfg, - info: Info{ - RPCErrors: []ErrorInfo{}, - DuneErrors: []ErrorInfo{}, - }, + info: info, } if ing.cfg.PollInterval == 0 { ing.cfg.PollInterval = defaultPollInterval diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 4f892dd..52ae5c8 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -3,6 +3,7 @@ package ingester import ( "context" "fmt" + "strings" "sync/atomic" "time" @@ -21,15 +22,22 @@ import ( // but buffers them in a map until they can be sent in order. func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error { ctx, cancel := context.WithCancel(ctx) + defer cancel() errGroup, ctx := errgroup.WithContext(ctx) blockNumbers := make(chan int64) defer close(blockNumbers) - blocks := make(chan models.RPCBlock) + + // We buffer the block channel so that RPC requests can be made concurrently with sending blocks to Dune. + // We limit the buffer size to the same number of concurrent requests, so we exert some backpressure. + blocks := make(chan models.RPCBlock, i.cfg.MaxConcurrentRequests) defer close(blocks) // Start MaxBatchSize goroutines to consume blocks concurrently - for range i.cfg.MaxBatchSize { + if i.cfg.MaxConcurrentRequests <= 0 { + return errors.Errorf("MaxConcurrentRequests must be > 0") + } + for range i.cfg.MaxConcurrentRequests { errGroup.Go(func() error { return i.FetchBlockLoop(ctx, blockNumbers, blocks) }) @@ -44,11 +52,10 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int // Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever endBlockNumber := startBlockNumber - 1 + maxCount i.log.Info("Starting ingester", - "max_batch_size", i.cfg.MaxBatchSize, - "run_forever", maxCount <= 0, - "start_block_number", startBlockNumber, - "end_block_number", endBlockNumber, - "batch_size", i.cfg.MaxBatchSize, + "runForever", maxCount <= 0, + "startBlockNumber", startBlockNumber, + "endBlockNumber", endBlockNumber, + "maxConcurrency", i.cfg.MaxConcurrentRequests, ) // Produce block numbers in the main goroutine @@ -136,25 +143,31 @@ func (i *ingester) FetchBlockLoop( "error", err, ) i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumber: blockNumber, - Error: err, + Timestamp: time.Now(), + BlockNumbers: fmt.Sprintf("%d", blockNumber), + Error: err, }) - // TODO: should we sleep (backoff) here? + if !i.cfg.SkipFailedBlocks { + return err + } + // We need to send an empty block downstream to indicate that this failed + blocks <- models.RPCBlock{BlockNumber: blockNumber} continue } atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber) getBlockElapsed := time.Since(startTime) - i.log.Info("FetchBlockLoop: Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed) - startTime = time.Now() select { case <-ctx.Done(): i.log.Info("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) return ctx.Err() case blocks <- block: - i.log.Info("FetchBlockLoop: Sent block", "blockNumber", blockNumber, "elapsed", time.Since(startTime)) + i.log.Info( + "FetchBlockLoop: Got and sent block", + "blockNumber", blockNumber, + "getBlockElapsed", getBlockElapsed, + ) } } } @@ -162,61 +175,130 @@ func (i *ingester) FetchBlockLoop( // SendBlocks to Dune. We receive blocks from the FetchBlockLoop goroutines, potentially out of order. // We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune. -func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error { - i.log.Info("SendBlocks: Starting to receive blocks") - blocks := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order +func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock, startBlockNumber int64) error { + // Buffer for temporarily storing blocks that have arrived out of order + collectedBlocks := make(map[int64]models.RPCBlock) nextNumberToSend := startBlockNumber + batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval) + defer batchTimer.Stop() + + i.log.Info("SendBlocks: Starting to receive blocks") for { + // Either receive a block, send blocks, or shut down (if the context is done, or the channel is closed). select { case <-ctx.Done(): i.log.Info("SendBlocks: Context canceled, stopping") return ctx.Err() - case block, ok := <-blocksCh: + case block, ok := <-blocks: if !ok { i.log.Info("SendBlocks: Channel is closed, returning") return nil } - blocks[block.BlockNumber] = block - i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(blocks)) + if block.Empty() { + // We got an empty block from the RPC client goroutine, either fail or send an empty block downstream + if !i.cfg.SkipFailedBlocks { + i.log.Info("Received empty block, exiting", "number", block.BlockNumber) + return errors.Errorf("empty block received") + } + i.log.Info("Received empty block", "number", block.BlockNumber) + } - nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend) - i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) + collectedBlocks[block.BlockNumber] = block + i.log.Info( + "SendBlocks: Received block", + "blockNumber", block.BlockNumber, + "bufferSize", len(collectedBlocks), + ) + case <-batchTimer.C: + var err error + nextNumberToSend, err = i.trySendCompletedBlocks(ctx, collectedBlocks, nextNumberToSend) + if err != nil { + return errors.Errorf("send blocks: %w", err) + } + i.log.Info("SendBlocks: Sent completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) } } } +const maxBatchSize = 100 + // trySendCompletedBlocks sends all blocks that can be sent in order from the blockMap. // Once we have sent all blocks, if any, we return with the nextNumberToSend. // We return the next numberToSend such that the caller can continue from there. func (i *ingester) trySendCompletedBlocks( ctx context.Context, - blocks map[int64]models.RPCBlock, + collectedBlocks map[int64]models.RPCBlock, nextNumberToSend int64, -) int64 { - // Send this block only if we have sent all previous blocks - for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] { - if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block}); err != nil { +) (int64, error) { + // Outer loop: We might need to send multiple batch requests if our buffer is too big + for _, ok := collectedBlocks[nextNumberToSend]; ok; _, ok = collectedBlocks[nextNumberToSend] { + // Collect a blocks of blocks to send, only send those which are in order + blocks := make([]models.RPCBlock, 0, len(collectedBlocks)) + for block, ok := collectedBlocks[nextNumberToSend]; ok; block, ok = collectedBlocks[nextNumberToSend] { + // Skip block if it's empty and we're configured to skip empty blocks + if i.cfg.SkipFailedBlocks && block.Empty() { + nextNumberToSend++ + continue + } + + blocks = append(blocks, block) + delete(collectedBlocks, nextNumberToSend) + nextNumberToSend++ + // Don't send more than maxBatchSize blocks + if len(blocks) == maxBatchSize { + break + } + } + + if len(blocks) == 0 { + return nextNumberToSend, nil + } + + // Send the batch + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + if lastBlockNumber != nextNumberToSend-1 { + panic("unexpected last block number") + } + if err := i.dune.SendBlocks(ctx, blocks); err != nil { if errors.Is(err, context.Canceled) { i.log.Info("SendBlocks: Context canceled, stopping") - return nextNumberToSend + return nextNumberToSend, nil + } + if !i.cfg.SkipFailedBlocks { + err := errors.Errorf("failed to send batch: %w", err) + i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err) + return nextNumberToSend, err } // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap - i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err) + i.log.Error( + "SendBlocks: Failed to send batch, continuing", + "blockNumberFirst", blocks[0].BlockNumber, + "blockNumberLast", blocks[len(blocks)-1].BlockNumber, + "error", err, + ) + blockNumbers := make([]string, len(blocks)) + for i, block := range blocks { + blockNumbers[i] = fmt.Sprintf("%d", block.BlockNumber) + } i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumber: block.BlockNumber, - Error: err, + Timestamp: time.Now(), + BlockNumbers: strings.Join(blockNumbers, ","), + Error: err, }) - } else { - i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber) - atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber) + continue } - // We've sent block N, so increment the pointer - delete(blocks, nextNumberToSend) - nextNumberToSend++ + i.log.Info( + "SendBlocks: Sent batch, updating latest ingested block number", + "blockNumberFirst", blocks[0].BlockNumber, + "blockNumberLast", lastBlockNumber, + "batchSize", len(blocks), + ) + + atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber) } - return nextNumberToSend + + return nextNumberToSend, nil } func (i *ingester) tryUpdateLatestBlockNumber() int64 { diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 1bedc47..4fbe583 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -2,6 +2,7 @@ package ingester_test import ( "context" + "fmt" "io" "log/slog" "math/rand" @@ -19,24 +20,34 @@ import ( "golang.org/x/sync/errgroup" ) -func TestRunLoopUntilCancel(t *testing.T) { +func TestRunUntilCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(10) sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { - if len(blocks) != 1 { - panic("expected 1 block") + if len(blocks) == 0 { + return nil } - block := blocks[0] - atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) - if block.BlockNumber == maxBlockNumber { - // cancel execution when we send the last block + next := sentBlockNumber + 1 + for _, block := range blocks { + // We cannot send blocks out of order to DuneAPI + if block.BlockNumber != next { + panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber)) + } + next++ + } + + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + atomic.StoreInt64(&sentBlockNumber, lastBlockNumber) + if lastBlockNumber >= maxBlockNumber { + // cancel execution when we have sent the last block cancel() return context.Canceled } + return nil }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { @@ -61,21 +72,25 @@ func TestRunLoopUntilCancel(t *testing.T) { // Swap these to see logs // logOutput := os.Stderr logOutput := io.Discard - ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ - MaxBatchSize: 1, - PollInterval: 1000 * time.Millisecond, - }) + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + BlockSubmitInterval: time.Nanosecond, + MaxConcurrentRequests: 10, + SkipFailedBlocks: false, + }, + nil, // progress + ) err := ing.Run(ctx, 1, -1) // run until canceled require.ErrorIs(t, err, context.Canceled) // this is expected - require.Equal(t, sentBlockNumber, maxBlockNumber) + require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber) } func TestProduceBlockNumbers(t *testing.T) { duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error { - return nil - }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { return nil }, @@ -91,11 +106,18 @@ func TestProduceBlockNumbers(t *testing.T) { return nil }, } + // Swap these to see logs logOutput := io.Discard - ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ - MaxBatchSize: 1, - PollInterval: 1000 * time.Millisecond, - }) + // logOutput := os.Stderr + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + BlockSubmitInterval: time.Nanosecond, + }, + nil, // progress + ) blockNumbers := make(chan int64) var wg sync.WaitGroup wg.Add(1) @@ -113,45 +135,59 @@ func TestSendBlocks(t *testing.T) { sentBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { - if len(blocks) != 1 { - panic("expected 1 block") + if len(blocks) == 0 { + return nil } - block := blocks[0] - // DuneAPI must fail if it receives blocks out of order - if block.BlockNumber != sentBlockNumber+1 { - return errors.Errorf("blocks out of order") + next := sentBlockNumber + 1 + for _, block := range blocks { + // We cannot send blocks out of order to DuneAPI + if block.BlockNumber != next { + panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber)) + } + next++ } - atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) - return nil - }, - PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + atomic.StoreInt64(&sentBlockNumber, lastBlockNumber) return nil }, } + // Swap these to see logs // logOutput := os.Stderr logOutput := io.Discard - ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), nil, duneapi, ingester.Config{ - MaxBatchSize: 10, // this won't matter as we only run SendBlocks - PollInterval: 1000 * time.Millisecond, - }) + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + nil, // node client isn't used in this unit test + duneapi, + ingester.Config{ + BlockSubmitInterval: time.Nanosecond, + }, + nil, // progress + ) blocks := make(chan models.RPCBlock) startFromBlock := 1 - group, _ := errgroup.WithContext(context.Background()) + group, ctx := errgroup.WithContext(context.Background()) group.Go(func() error { return ing.SendBlocks(context.Background(), blocks, int64(startFromBlock)) }) // Send blocks except the next block, ensure none are sent to the API for _, n := range []int64{2, 3, 4, 5, 10} { - blocks <- models.RPCBlock{BlockNumber: n} + select { + case <-ctx.Done(): // if error group fails, its context is canceled + require.Fail(t, "context was canceled") + case blocks <- models.RPCBlock{BlockNumber: n, Payload: []byte("block")}: + // Sent block + } require.Equal(t, int64(0), sentBlockNumber) } // Now send the first block - blocks <- models.RPCBlock{BlockNumber: 1} + blocks <- models.RPCBlock{BlockNumber: 1, Payload: []byte("block")} + time.Sleep(time.Millisecond) // Allow enough time for the tick before closing the channel close(blocks) require.NoError(t, group.Wait()) @@ -159,29 +195,41 @@ func TestSendBlocks(t *testing.T) { require.Equal(t, int64(5), sentBlockNumber) } -// TestRunLoopBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order +// TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order // even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time. -func TestRunLoopBlocksOutOfOrder(t *testing.T) { +func TestRunBlocksOutOfOrder(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(1000) sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { - if len(blocks) != 1 { - panic("expected 1 block") + if len(blocks) == 0 { + return nil } - block := blocks[0] - // Test must fail if DuneAPI receives blocks out of order - require.Equal(t, block.BlockNumber, sentBlockNumber+1) + // Fail if we're not sending a batch of blocks + if len(blocks) == 1 { + panic("expected batch of blocks, got 1") + } + + next := sentBlockNumber + 1 + for _, block := range blocks { + // We cannot send blocks out of order to DuneAPI + if block.BlockNumber != next { + panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber)) + } + next++ + } - atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) - if block.BlockNumber == maxBlockNumber { - // cancel execution when we send the last block + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + atomic.StoreInt64(&sentBlockNumber, lastBlockNumber) + if lastBlockNumber >= maxBlockNumber { + // cancel execution when we have sent the last block cancel() return context.Canceled } + return nil }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { @@ -193,10 +241,10 @@ func TestRunLoopBlocksOutOfOrder(t *testing.T) { return maxBlockNumber + 1, nil }, BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { - // Get blocks out of order by sleeping for a random amount of ms - time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + // Get blocks out of order by sleeping for a random amount of time + time.Sleep(time.Duration(rand.Intn(10)) * time.Nanosecond) atomic.StoreInt64(&producedBlockNumber, blockNumber) - return models.RPCBlock{BlockNumber: blockNumber}, nil + return models.RPCBlock{BlockNumber: blockNumber, Payload: []byte("block")}, nil }, CloseFunc: func() error { return nil @@ -205,12 +253,86 @@ func TestRunLoopBlocksOutOfOrder(t *testing.T) { // Swap these to see logs // logOutput := os.Stderr logOutput := io.Discard - ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ - MaxBatchSize: 10, // fetch blocks in multiple goroutines - PollInterval: 1000 * time.Millisecond, - }) + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + MaxConcurrentRequests: 10, // fetch blocks in multiple goroutines + // big enough compared to the time spent in block by number to ensure batching. We panic + // in the mocked Dune client if we don't get a batch of blocks (more than one block). + BlockSubmitInterval: 50 * time.Millisecond, + SkipFailedBlocks: false, + }, + nil, // progress + ) err := ing.Run(ctx, 1, -1) // run until canceled require.ErrorIs(t, err, context.Canceled) // this is expected - require.Equal(t, sentBlockNumber, maxBlockNumber) + require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber) +} + +// TestRunRPCNodeFails shows that we crash if the RPC client fails to fetch a block +func TestRunRPCNodeFails(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + maxBlockNumber := int64(1000) + producedBlockNumber := int64(0) + someRPCError := errors.Errorf("some RPC error") + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error { + return someRPCError + }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil + }, + } + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return maxBlockNumber + 1, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + // Get blocks out of order by sleeping for a random amount of time + time.Sleep(time.Duration(rand.Intn(10)) * time.Nanosecond) + atomic.StoreInt64(&producedBlockNumber, blockNumber) + return models.RPCBlock{BlockNumber: blockNumber, Payload: []byte("block")}, nil + }, + CloseFunc: func() error { + return nil + }, + } + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + MaxConcurrentRequests: 10, + BlockSubmitInterval: time.Millisecond, + SkipFailedBlocks: false, + }, + nil, // progress + ) + + err := ing.Run(ctx, 1, -1) // run until canceled + require.ErrorIs(t, err, someRPCError) +} + +// TestRunRPCNodeFails shows that we crash if the RPC client fails to fetch a block +func TestRunFailsIfNoConcurrentRequests(t *testing.T) { + logOutput := io.Discard + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + nil, + nil, + ingester.Config{ + MaxConcurrentRequests: 0, + }, + nil, // progress + ) + + err := ing.Run(context.Background(), 1, -1) // run until canceled + require.ErrorContains(t, err, "MaxConcurrentRequests must be > 0") } diff --git a/models/block.go b/models/block.go index ffb410c..1435a8d 100644 --- a/models/block.go +++ b/models/block.go @@ -5,3 +5,7 @@ type RPCBlock struct { // agnostic blob of data that is the block Payload []byte } + +func (b RPCBlock) Empty() bool { + return len(b.Payload) == 0 +} From 7ff628aff54ea265a0646f962c6908ff42d93c0f Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Thu, 27 Jun 2024 13:22:42 +0200 Subject: [PATCH 2/4] improve some tests --- ingester/mainloop_test.go | 98 ++++++++++++++++++++++++++++++--------- 1 file changed, 77 insertions(+), 21 deletions(-) diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 4fbe583..4f12b4a 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -2,7 +2,6 @@ package ingester_test import ( "context" - "fmt" "io" "log/slog" "math/rand" @@ -34,9 +33,7 @@ func TestRunUntilCancel(t *testing.T) { next := sentBlockNumber + 1 for _, block := range blocks { // We cannot send blocks out of order to DuneAPI - if block.BlockNumber != next { - panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber)) - } + require.Equalf(t, next, block.BlockNumber, "expected block %d, got %d", next, block.BlockNumber) next++ } @@ -142,9 +139,7 @@ func TestSendBlocks(t *testing.T) { next := sentBlockNumber + 1 for _, block := range blocks { // We cannot send blocks out of order to DuneAPI - if block.BlockNumber != next { - panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber)) - } + require.Equalf(t, next, block.BlockNumber, "expected block %d, got %d", next, block.BlockNumber) next++ } @@ -195,9 +190,7 @@ func TestSendBlocks(t *testing.T) { require.Equal(t, int64(5), sentBlockNumber) } -// TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order -// even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time. -func TestRunBlocksOutOfOrder(t *testing.T) { +func TestRunBlocksUseBatching(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(1000) sentBlockNumber := int64(0) @@ -209,16 +202,81 @@ func TestRunBlocksOutOfOrder(t *testing.T) { } // Fail if we're not sending a batch of blocks - if len(blocks) == 1 { - panic("expected batch of blocks, got 1") + require.Len(t, blocks, 1) + + next := sentBlockNumber + 1 + for _, block := range blocks { + // We cannot send blocks out of order to DuneAPI + require.Equalf(t, next, block.BlockNumber, "expected block %d, got %d", next, block.BlockNumber) + next++ + } + + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + atomic.StoreInt64(&sentBlockNumber, lastBlockNumber) + if lastBlockNumber >= maxBlockNumber { + // cancel execution when we have sent the last block + cancel() + return context.Canceled + } + + return nil + }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil + }, + } + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return maxBlockNumber + 1, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + // Get blocks out of order by sleeping for a random amount of time + atomic.StoreInt64(&producedBlockNumber, blockNumber) + return models.RPCBlock{BlockNumber: blockNumber, Payload: []byte("block")}, nil + }, + CloseFunc: func() error { + return nil + }, + } + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + MaxConcurrentRequests: 20, // fetch blocks in multiple goroutines + // big enough compared to the time spent in block by number to ensure batching. We panic + // in the mocked Dune client if we don't get a batch of blocks (more than one block). + BlockSubmitInterval: 50 * time.Millisecond, + SkipFailedBlocks: false, + }, + nil, // progress + ) + + err := ing.Run(ctx, 1, -1) // run until canceled + require.ErrorIs(t, err, context.Canceled) // this is expected + require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber) +} + +// TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order +// even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time. +func TestRunBlocksOutOfOrder(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + maxBlockNumber := int64(1000) + sentBlockNumber := int64(0) + producedBlockNumber := int64(0) + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { + if len(blocks) == 0 { + return nil } next := sentBlockNumber + 1 for _, block := range blocks { // We cannot send blocks out of order to DuneAPI - if block.BlockNumber != next { - panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber)) - } + require.Equalf(t, next, block.BlockNumber, "expected block %d, got %d", next, block.BlockNumber) next++ } @@ -258,7 +316,7 @@ func TestRunBlocksOutOfOrder(t *testing.T) { rpcClient, duneapi, ingester.Config{ - MaxConcurrentRequests: 10, // fetch blocks in multiple goroutines + MaxConcurrentRequests: 20, // fetch blocks in multiple goroutines // big enough compared to the time spent in block by number to ensure batching. We panic // in the mocked Dune client if we don't get a batch of blocks (more than one block). BlockSubmitInterval: 50 * time.Millisecond, @@ -277,11 +335,10 @@ func TestRunRPCNodeFails(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() maxBlockNumber := int64(1000) - producedBlockNumber := int64(0) someRPCError := errors.Errorf("some RPC error") duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error { - return someRPCError + return nil }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { return nil @@ -291,11 +348,10 @@ func TestRunRPCNodeFails(t *testing.T) { LatestBlockNumberFunc: func() (int64, error) { return maxBlockNumber + 1, nil }, - BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + BlockByNumberFunc: func(_ context.Context, _ int64) (models.RPCBlock, error) { // Get blocks out of order by sleeping for a random amount of time time.Sleep(time.Duration(rand.Intn(10)) * time.Nanosecond) - atomic.StoreInt64(&producedBlockNumber, blockNumber) - return models.RPCBlock{BlockNumber: blockNumber, Payload: []byte("block")}, nil + return models.RPCBlock{}, someRPCError }, CloseFunc: func() error { return nil From 98ac08274c462fa846d5048fc1c44f22c3ed5150 Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Thu, 27 Jun 2024 13:42:57 +0200 Subject: [PATCH 3/4] mainloop: don't handle duneapi errors yet, simplify around SkipFailed --- ingester/mainloop.go | 79 ++++++++++++++------------------------- ingester/mainloop_test.go | 2 +- models/block.go | 6 ++- 3 files changed, 33 insertions(+), 54 deletions(-) diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 52ae5c8..df3f5cc 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -3,7 +3,6 @@ package ingester import ( "context" "fmt" - "strings" "sync/atomic" "time" @@ -138,21 +137,16 @@ func (i *ingester) FetchBlockLoop( return ctx.Err() } - i.log.Error("Failed to get block by number, continuing..", + i.log.Error("Failed to get block by number", "blockNumber", blockNumber, + "continueing", i.cfg.SkipFailedBlocks, + "elapsed", time.Since(startTime), "error", err, ) - i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumbers: fmt.Sprintf("%d", blockNumber), - Error: err, - }) - if !i.cfg.SkipFailedBlocks { return err } - // We need to send an empty block downstream to indicate that this failed - blocks <- models.RPCBlock{BlockNumber: blockNumber} + blocks <- models.RPCBlock{BlockNumber: blockNumber, Error: err} continue } @@ -195,13 +189,14 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock return nil } - if block.Empty() { - // We got an empty block from the RPC client goroutine, either fail or send an empty block downstream - if !i.cfg.SkipFailedBlocks { - i.log.Info("Received empty block, exiting", "number", block.BlockNumber) - return errors.Errorf("empty block received") - } - i.log.Info("Received empty block", "number", block.BlockNumber) + if block.Errored() { + i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ + Timestamp: time.Now(), + BlockNumbers: fmt.Sprintf("%d", block.BlockNumber), + Error: block.Error, + }) + + i.log.Info("Received FAILED block", "number", block.BlockNumber) } collectedBlocks[block.BlockNumber] = block @@ -234,67 +229,49 @@ func (i *ingester) trySendCompletedBlocks( // Outer loop: We might need to send multiple batch requests if our buffer is too big for _, ok := collectedBlocks[nextNumberToSend]; ok; _, ok = collectedBlocks[nextNumberToSend] { // Collect a blocks of blocks to send, only send those which are in order - blocks := make([]models.RPCBlock, 0, len(collectedBlocks)) + // Collect a batch to send, only send those which are in order + blockBatch := make([]models.RPCBlock, 0, maxBatchSize) for block, ok := collectedBlocks[nextNumberToSend]; ok; block, ok = collectedBlocks[nextNumberToSend] { - // Skip block if it's empty and we're configured to skip empty blocks - if i.cfg.SkipFailedBlocks && block.Empty() { + // Skip Failed block if we're configured to skip Failed blocks + if i.cfg.SkipFailedBlocks && block.Errored() { nextNumberToSend++ continue } - blocks = append(blocks, block) + blockBatch = append(blockBatch, block) delete(collectedBlocks, nextNumberToSend) nextNumberToSend++ - // Don't send more than maxBatchSize blocks - if len(blocks) == maxBatchSize { + + if len(blockBatch) == maxBatchSize { break } } - if len(blocks) == 0 { + if len(blockBatch) == 0 { return nextNumberToSend, nil } // Send the batch - lastBlockNumber := blocks[len(blocks)-1].BlockNumber + lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber if lastBlockNumber != nextNumberToSend-1 { panic("unexpected last block number") } - if err := i.dune.SendBlocks(ctx, blocks); err != nil { + if err := i.dune.SendBlocks(ctx, blockBatch); err != nil { if errors.Is(err, context.Canceled) { i.log.Info("SendBlocks: Context canceled, stopping") return nextNumberToSend, nil } - if !i.cfg.SkipFailedBlocks { - err := errors.Errorf("failed to send batch: %w", err) - i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err) - return nextNumberToSend, err - } - // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap - i.log.Error( - "SendBlocks: Failed to send batch, continuing", - "blockNumberFirst", blocks[0].BlockNumber, - "blockNumberLast", blocks[len(blocks)-1].BlockNumber, - "error", err, - ) - blockNumbers := make([]string, len(blocks)) - for i, block := range blocks { - blockNumbers[i] = fmt.Sprintf("%d", block.BlockNumber) - } - i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumbers: strings.Join(blockNumbers, ","), - Error: err, - }) - continue + // TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries + err := errors.Errorf("failed to send batch: %w", err) + i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err) + return nextNumberToSend, err } i.log.Info( "SendBlocks: Sent batch, updating latest ingested block number", - "blockNumberFirst", blocks[0].BlockNumber, + "blockNumberFirst", blockBatch[0].BlockNumber, "blockNumberLast", lastBlockNumber, - "batchSize", len(blocks), + "batchSize", len(blockBatch), ) - atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber) } diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 4f12b4a..56be95c 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -202,7 +202,7 @@ func TestRunBlocksUseBatching(t *testing.T) { } // Fail if we're not sending a batch of blocks - require.Len(t, blocks, 1) + require.Greater(t, len(blocks), 1) next := sentBlockNumber + 1 for _, block := range blocks { diff --git a/models/block.go b/models/block.go index 1435a8d..fc8d3e6 100644 --- a/models/block.go +++ b/models/block.go @@ -4,8 +4,10 @@ type RPCBlock struct { BlockNumber int64 // agnostic blob of data that is the block Payload []byte + // optional field, if we fail to collect the block data + Error error } -func (b RPCBlock) Empty() bool { - return len(b.Payload) == 0 +func (b RPCBlock) Errored() bool { + return b.Error != nil } From f2249f855035317ca60622a2c630e57bb90edea9 Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Thu, 27 Jun 2024 13:46:25 +0200 Subject: [PATCH 4/4] allow for maxBatchSize blocks in flight --- ingester/mainloop.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ingester/mainloop.go b/ingester/mainloop.go index df3f5cc..0de32cf 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -11,6 +11,8 @@ import ( "golang.org/x/sync/errgroup" ) +const maxBatchSize = 100 + // Run fetches blocks from a node RPC and sends them in order to the Dune API. // // ProduceBlockNumbers (blockNumbers channel) -> FetchBlockLoop (blocks channel) -> SendBlocks -> Dune @@ -29,7 +31,7 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int // We buffer the block channel so that RPC requests can be made concurrently with sending blocks to Dune. // We limit the buffer size to the same number of concurrent requests, so we exert some backpressure. - blocks := make(chan models.RPCBlock, i.cfg.MaxConcurrentRequests) + blocks := make(chan models.RPCBlock, maxBatchSize) defer close(blocks) // Start MaxBatchSize goroutines to consume blocks concurrently @@ -216,8 +218,6 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock } } -const maxBatchSize = 100 - // trySendCompletedBlocks sends all blocks that can be sent in order from the blockMap. // Once we have sent all blocks, if any, we return with the nextNumberToSend. // We return the next numberToSend such that the caller can continue from there.