-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send batch of blocks from the main loop #37
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,8 @@ import ( | |
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
const maxBatchSize = 100 | ||
|
||
// Run fetches blocks from a node RPC and sends them in order to the Dune API. | ||
// | ||
// ProduceBlockNumbers (blockNumbers channel) -> FetchBlockLoop (blocks channel) -> SendBlocks -> Dune | ||
|
@@ -21,15 +23,22 @@ import ( | |
// but buffers them in a map until they can be sent in order. | ||
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error { | ||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
errGroup, ctx := errgroup.WithContext(ctx) | ||
|
||
blockNumbers := make(chan int64) | ||
defer close(blockNumbers) | ||
blocks := make(chan models.RPCBlock) | ||
|
||
// We buffer the block channel so that RPC requests can be made concurrently with sending blocks to Dune. | ||
// We limit the buffer size to the same number of concurrent requests, so we exert some backpressure. | ||
blocks := make(chan models.RPCBlock, maxBatchSize) | ||
defer close(blocks) | ||
|
||
// Start MaxBatchSize goroutines to consume blocks concurrently | ||
for range i.cfg.MaxBatchSize { | ||
if i.cfg.MaxConcurrentRequests <= 0 { | ||
return errors.Errorf("MaxConcurrentRequests must be > 0") | ||
} | ||
for range i.cfg.MaxConcurrentRequests { | ||
errGroup.Go(func() error { | ||
return i.FetchBlockLoop(ctx, blockNumbers, blocks) | ||
}) | ||
|
@@ -44,11 +53,10 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int | |
// Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever | ||
endBlockNumber := startBlockNumber - 1 + maxCount | ||
i.log.Info("Starting ingester", | ||
"max_batch_size", i.cfg.MaxBatchSize, | ||
"run_forever", maxCount <= 0, | ||
"start_block_number", startBlockNumber, | ||
"end_block_number", endBlockNumber, | ||
"batch_size", i.cfg.MaxBatchSize, | ||
"runForever", maxCount <= 0, | ||
"startBlockNumber", startBlockNumber, | ||
"endBlockNumber", endBlockNumber, | ||
"maxConcurrency", i.cfg.MaxConcurrentRequests, | ||
) | ||
|
||
// Produce block numbers in the main goroutine | ||
|
@@ -131,57 +139,81 @@ func (i *ingester) FetchBlockLoop( | |
return ctx.Err() | ||
} | ||
|
||
i.log.Error("Failed to get block by number, continuing..", | ||
i.log.Error("Failed to get block by number", | ||
"blockNumber", blockNumber, | ||
"continueing", i.cfg.SkipFailedBlocks, | ||
"elapsed", time.Since(startTime), | ||
"error", err, | ||
) | ||
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ | ||
Timestamp: time.Now(), | ||
BlockNumber: blockNumber, | ||
Error: err, | ||
}) | ||
|
||
// TODO: should we sleep (backoff) here? | ||
if !i.cfg.SkipFailedBlocks { | ||
return err | ||
} | ||
blocks <- models.RPCBlock{BlockNumber: blockNumber, Error: err} | ||
continue | ||
} | ||
|
||
atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber) | ||
getBlockElapsed := time.Since(startTime) | ||
i.log.Info("FetchBlockLoop: Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed) | ||
startTime = time.Now() | ||
select { | ||
case <-ctx.Done(): | ||
i.log.Info("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) | ||
return ctx.Err() | ||
case blocks <- block: | ||
i.log.Info("FetchBlockLoop: Sent block", "blockNumber", blockNumber, "elapsed", time.Since(startTime)) | ||
i.log.Info( | ||
"FetchBlockLoop: Got and sent block", | ||
"blockNumber", blockNumber, | ||
"getBlockElapsed", getBlockElapsed, | ||
) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// SendBlocks to Dune. We receive blocks from the FetchBlockLoop goroutines, potentially out of order. | ||
// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune. | ||
func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error { | ||
i.log.Info("SendBlocks: Starting to receive blocks") | ||
blocks := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order | ||
func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock, startBlockNumber int64) error { | ||
// Buffer for temporarily storing blocks that have arrived out of order | ||
collectedBlocks := make(map[int64]models.RPCBlock) | ||
nextNumberToSend := startBlockNumber | ||
batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval) | ||
defer batchTimer.Stop() | ||
|
||
i.log.Info("SendBlocks: Starting to receive blocks") | ||
for { | ||
// Either receive a block, send blocks, or shut down (if the context is done, or the channel is closed). | ||
select { | ||
case <-ctx.Done(): | ||
i.log.Info("SendBlocks: Context canceled, stopping") | ||
return ctx.Err() | ||
case block, ok := <-blocksCh: | ||
case block, ok := <-blocks: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. liked the old name more :) |
||
if !ok { | ||
i.log.Info("SendBlocks: Channel is closed, returning") | ||
return nil | ||
} | ||
|
||
blocks[block.BlockNumber] = block | ||
i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(blocks)) | ||
if block.Errored() { | ||
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ | ||
Timestamp: time.Now(), | ||
BlockNumbers: fmt.Sprintf("%d", block.BlockNumber), | ||
Error: block.Error, | ||
}) | ||
|
||
i.log.Info("Received FAILED block", "number", block.BlockNumber) | ||
} | ||
|
||
nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend) | ||
i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) | ||
collectedBlocks[block.BlockNumber] = block | ||
i.log.Info( | ||
"SendBlocks: Received block", | ||
"blockNumber", block.BlockNumber, | ||
"bufferSize", len(collectedBlocks), | ||
) | ||
case <-batchTimer.C: | ||
var err error | ||
nextNumberToSend, err = i.trySendCompletedBlocks(ctx, collectedBlocks, nextNumberToSend) | ||
if err != nil { | ||
return errors.Errorf("send blocks: %w", err) | ||
} | ||
i.log.Info("SendBlocks: Sent completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) | ||
} | ||
} | ||
} | ||
|
@@ -191,32 +223,59 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo | |
// We return the next numberToSend such that the caller can continue from there. | ||
func (i *ingester) trySendCompletedBlocks( | ||
ctx context.Context, | ||
blocks map[int64]models.RPCBlock, | ||
collectedBlocks map[int64]models.RPCBlock, | ||
nextNumberToSend int64, | ||
) int64 { | ||
// Send this block only if we have sent all previous blocks | ||
for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] { | ||
if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block}); err != nil { | ||
) (int64, error) { | ||
// Outer loop: We might need to send multiple batch requests if our buffer is too big | ||
for _, ok := collectedBlocks[nextNumberToSend]; ok; _, ok = collectedBlocks[nextNumberToSend] { | ||
// Collect a blocks of blocks to send, only send those which are in order | ||
// Collect a batch to send, only send those which are in order | ||
blockBatch := make([]models.RPCBlock, 0, maxBatchSize) | ||
for block, ok := collectedBlocks[nextNumberToSend]; ok; block, ok = collectedBlocks[nextNumberToSend] { | ||
// Skip Failed block if we're configured to skip Failed blocks | ||
if i.cfg.SkipFailedBlocks && block.Errored() { | ||
nextNumberToSend++ | ||
continue | ||
} | ||
|
||
blockBatch = append(blockBatch, block) | ||
delete(collectedBlocks, nextNumberToSend) | ||
nextNumberToSend++ | ||
|
||
if len(blockBatch) == maxBatchSize { | ||
break | ||
} | ||
} | ||
|
||
if len(blockBatch) == 0 { | ||
return nextNumberToSend, nil | ||
} | ||
|
||
// Send the batch | ||
lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber | ||
if lastBlockNumber != nextNumberToSend-1 { | ||
panic("unexpected last block number") | ||
} | ||
if err := i.dune.SendBlocks(ctx, blockBatch); err != nil { | ||
if errors.Is(err, context.Canceled) { | ||
i.log.Info("SendBlocks: Context canceled, stopping") | ||
return nextNumberToSend | ||
return nextNumberToSend, nil | ||
} | ||
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap | ||
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err) | ||
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ | ||
Timestamp: time.Now(), | ||
BlockNumber: block.BlockNumber, | ||
Error: err, | ||
}) | ||
} else { | ||
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber) | ||
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber) | ||
// TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries | ||
err := errors.Errorf("failed to send batch: %w", err) | ||
i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err) | ||
return nextNumberToSend, err | ||
} | ||
// We've sent block N, so increment the pointer | ||
delete(blocks, nextNumberToSend) | ||
nextNumberToSend++ | ||
i.log.Info( | ||
"SendBlocks: Sent batch, updating latest ingested block number", | ||
"blockNumberFirst", blockBatch[0].BlockNumber, | ||
"blockNumberLast", lastBlockNumber, | ||
"batchSize", len(blockBatch), | ||
) | ||
atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber) | ||
} | ||
return nextNumberToSend | ||
|
||
return nextNumberToSend, nil | ||
} | ||
|
||
func (i *ingester) tryUpdateLatestBlockNumber() int64 { | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw a panic once on this, so safe guarding against that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#38