Skip to content

Commit

Permalink
Send batch of blocks from the main loop
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jun 27, 2024
1 parent a448231 commit db35c5d
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 119 deletions.
5 changes: 0 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ You can use our public docker container image and run it as such:

```bash
docker run -e BLOCKCHAIN_NAME='foo' -e RPC_NODE_URL='http://localhost:8545' -e DUNE_API_KEY='your-key-here' duneanalytics/node-indexer

```


Expand All @@ -36,15 +35,11 @@ Build the binary for your OS:
$ make build

$ BLOCKCHAIN_NAME='foo' RPC_NODE_URL='http://localhost:8545' DUNE_API_KEY='your-key-here' ./indexer

```

## Configuration Options

You can see all the configuration options by using the `--help` argument:
```bash
docker run duneanalytics/node-indexer ./indexer --help

```


15 changes: 7 additions & 8 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
checkRetry := func(ctx context.Context, resp *http.Response, err error) (bool, error) {
yes, err2 := retryablehttp.DefaultRetryPolicy(ctx, resp, err)
if yes {
log.Warn("Retrying request", "statusCode", resp.Status, "error", err)
if resp == nil {
log.Warn("Retrying request", "error", err)
} else {
log.Warn("Retrying request", "statusCode", resp.Status, "error", err)
}
}
return yes, err2
}
Expand Down Expand Up @@ -191,11 +195,6 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
return err
}

err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return err
}

return nil
}

Expand All @@ -220,15 +219,15 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
defer func() {
if err != nil {
c.log.Error("Sending progress report failed",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"lastIngestedBlockNumber", request.LastIngestedBlockNumber,
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
"responseBody", responseBody,
)
} else {
c.log.Info("Sent progress report",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"lastIngestedBlockNumber", request.LastIngestedBlockNumber,
"latestBlockNumber", request.LatestBlockNumber,
"duration", time.Since(start),
)
Expand Down
8 changes: 6 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ func main() {
// Get stored progress unless config indicates we should start from 0
var startBlockNumber int64
// Default to -1 to start where the ingester left off
var progress *models.BlockchainIndexProgress
if cfg.BlockHeight == -1 {
progress, err := duneClient.GetProgressReport(ctx)
progress, err = duneClient.GetProgressReport(ctx)
if err != nil {
stdlog.Fatal(err)
} else {
Expand All @@ -82,12 +83,15 @@ func main() {
rpcClient,
duneClient,
ingester.Config{
MaxBatchSize: cfg.Concurrency,
MaxConcurrentRequests: cfg.RPCConcurrency,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: cfg.SkipFailedBlocks,
},
progress,
)

wg.Add(1)
Expand Down
6 changes: 4 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ type Config struct {
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"10"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"1s"` // nolint:lll
SkipFailedBlocks bool `long:"skip-failed-blocks" env:"SKIP_FAILED_BLOCKS" description:"Skip failed blocks when submitting to Dune"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
31 changes: 22 additions & 9 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ const (
)

type Config struct {
MaxBatchSize int
MaxConcurrentRequests int
PollInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
}

type Info struct {
Expand All @@ -60,9 +62,9 @@ type Info struct {
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumber int64
Error error
Timestamp time.Time
BlockNumbers string
Error error
}

type ingester struct {
Expand All @@ -73,16 +75,27 @@ type ingester struct {
info Info
}

func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, cfg Config) Ingester {
func New(
log *slog.Logger,
node jsonrpc.BlockchainClient,
dune duneapi.BlockchainIngester,
cfg Config,
progress *models.BlockchainIndexProgress,
) Ingester {
info := Info{
RPCErrors: []ErrorInfo{},
DuneErrors: []ErrorInfo{},
}
if progress != nil {
info.LatestBlockNumber = progress.LatestBlockNumber
info.IngestedBlockNumber = progress.LastIngestedBlockNumber
}
ing := &ingester{
log: log.With("module", "ingester"),
node: node,
dune: dune,
cfg: cfg,
info: Info{
RPCErrors: []ErrorInfo{},
DuneErrors: []ErrorInfo{},
},
info: info,
}
if ing.cfg.PollInterval == 0 {
ing.cfg.PollInterval = defaultPollInterval
Expand Down
Loading

0 comments on commit db35c5d

Please sign in to comment.