Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some improvements in mainloop.go #34

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type Config struct {
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
7 changes: 2 additions & 5 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type Ingester interface {
// it will run continuously until the context is cancelled
ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber int64, endBlockNumber int64) error

// ConsumeBlocks fetches blocks sent on the channel and sends them on the other channel.
// FetchBlockLoop fetches blocks sent on the channel and sends them on the other channel.
// It will run continuously until the context is cancelled, or the channel is closed.
// It can safely be run concurrently.
ConsumeBlocks(context.Context, chan int64, chan models.RPCBlock) error
FetchBlockLoop(context.Context, chan int64, chan models.RPCBlock) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
Expand Down Expand Up @@ -84,9 +84,6 @@ func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.Blockchai
DuneErrors: []ErrorInfo{},
},
}
if ing.cfg.MaxBatchSize == 0 {
ing.cfg.MaxBatchSize = defaultMaxBatchSize
}
if ing.cfg.PollInterval == 0 {
ing.cfg.PollInterval = defaultPollInterval
}
Expand Down
91 changes: 53 additions & 38 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (

// Run fetches blocks from a node RPC and sends them in order to the Dune API.
//
// ProduceBlockNumbers (blockNumbers channel) -> ConsumeBlocks (blocks channel) -> SendBlocks -> Dune
// ProduceBlockNumbers (blockNumbers channel) -> FetchBlockLoop (blocks channel) -> SendBlocks -> Dune
//
// We produce block numbers to fetch on an unbuffered channel (ProduceBlockNumbers),
// and each concurrent ConsumeBlock goroutine gets a block number from that channel.
// and each concurrent FetchBlockLoop goroutine gets a block number from that channel.
// The SendBlocks goroutine receives all blocks on an unbuffered channel,
// but buffers them in a map until they can be sent in order.
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error {
Expand All @@ -31,7 +31,7 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
// Start MaxBatchSize goroutines to consume blocks concurrently
for range i.cfg.MaxBatchSize {
errGroup.Go(func() error {
return i.ConsumeBlocks(ctx, blockNumbers, blocks)
return i.FetchBlockLoop(ctx, blockNumbers, blocks)
})
}
errGroup.Go(func() error {
Expand Down Expand Up @@ -60,9 +60,9 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
return errGroup.Wait()
}

var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")
var ErrFinishedFetchBlockLoop = errors.New("finished FetchBlockLoop")

// ProduceBlockNumbers to be consumed by multiple goroutines running ConsumeBlocks
// ProduceBlockNumbers to be consumed by multiple goroutines running FetchBlockLoop
func (i *ingester) ProduceBlockNumbers(
ctx context.Context, blockNumbers chan int64, startBlockNumber int64, endBlockNumber int64,
) error {
Expand Down Expand Up @@ -109,25 +109,25 @@ func (i *ingester) ProduceBlockNumbers(
}
}
i.log.Info("Finished producing block numbers")
return ErrFinishedConsumeBlocks
return ErrFinishedFetchBlockLoop
}

// ConsumeBlocks from the RPC node. This can be run in multiple goroutines to parallelize block fetching.
func (i *ingester) ConsumeBlocks(
// FetchBlockLoop from the RPC node. This can be run in multiple goroutines to parallelize block fetching.
func (i *ingester) FetchBlockLoop(
ctx context.Context, blockNumbers chan int64, blocks chan models.RPCBlock,
) error {
for {
select {
case <-ctx.Done():
i.log.Info("ConsumeBlocks: context is done")
i.log.Info("FetchBlockLoop: context is done")
return ctx.Err()
case blockNumber := <-blockNumbers:
startTime := time.Now()

block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("ConsumeBlocks: Context canceled, stopping")
i.log.Info("FetchBlockLoop: Context canceled, stopping")
return ctx.Err()
}

Expand All @@ -147,23 +147,25 @@ func (i *ingester) ConsumeBlocks(

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)
i.log.Info("Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed)
i.log.Info("FetchBlockLoop: Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed)
startTime = time.Now()
select {
case <-ctx.Done():
i.log.Info("ConsumeBlocks: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
i.log.Info("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
return ctx.Err()
case blocks <- block:
i.log.Info("FetchBlockLoop: Sent block", "blockNumber", blockNumber, "elapsed", time.Since(startTime))
}
}
}
}

// SendBlocks to Dune. We receive blocks from the ConsumeBlocks goroutines, potentially out of order.
// SendBlocks to Dune. We receive blocks from the FetchBlockLoop goroutines, potentially out of order.
// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune.
func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error {
i.log.Info("SendBlocks: Starting to receive blocks")
blockMap := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order
next := startBlockNumber
blocks := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order
nextNumberToSend := startBlockNumber
for {
select {
case <-ctx.Done():
Expand All @@ -175,33 +177,46 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
return nil
}

blockMap[block.BlockNumber] = block

// Send this block only if we have sent all previous blocks
for block, ok := blockMap[next]; ok; block, ok = blockMap[next] {
if err := i.dune.SendBlock(ctx, block); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return ctx.Err()
}
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: block.BlockNumber,
Error: err,
})
} else {
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
}
blocks[block.BlockNumber] = block
i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(blocks))

nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend)
i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
}
}
}

// We've sent block N, so increment the pointer
delete(blockMap, next)
next++
// trySendCompletedBlocks sends all blocks that can be sent in order from the blockMap.
// Once we have sent all blocks, if any, we return with the nextNumberToSend.
// We return the next numberToSend such that the caller can continue from there.
func (i *ingester) trySendCompletedBlocks(
ctx context.Context,
blocks map[int64]models.RPCBlock,
nextNumberToSend int64,
) int64 {
// Send this block only if we have sent all previous blocks
for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] {
if err := i.dune.SendBlock(ctx, block); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextNumberToSend
}
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: block.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
}
// We've sent block N, so increment the pointer
delete(blocks, nextNumberToSend)
nextNumberToSend++
}
return nextNumberToSend
}

func (i *ingester) tryUpdateLatestBlockNumber() int64 {
Expand Down
10 changes: 4 additions & 6 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,17 @@ func TestSendBlocks(t *testing.T) {
require.Equal(t, int64(5), sentBlockNumber)
}

// TestRunLoopUntilBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order
// TestRunLoopBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order
// even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time.
func TestRunLoopUntilBlocksOutOfOrder(t *testing.T) {
func TestRunLoopBlocksOutOfOrder(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
// DuneAPI must fail if it receives blocks out of order
if block.BlockNumber != sentBlockNumber+1 {
return errors.Errorf("blocks out of order")
}
// Test must fail if DuneAPI receives blocks out of order
require.Equal(t, block.BlockNumber, sentBlockNumber+1)

atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
if block.BlockNumber == maxBlockNumber {
Expand Down
Loading