Skip to content

Commit

Permalink
better configs for retries and log when we have errors on requests (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
msf authored Jun 21, 2024
1 parent 34cc3e7 commit 6166f19
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
15 changes: 13 additions & 2 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

const (
MaxRetries = 20 // try really hard to send the block
MinWaitDur = 100 * time.Millisecond
MaxWaitDur = 5 * time.Second
)

type BlockchainIngester interface {
Expand Down Expand Up @@ -53,8 +55,18 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
httpClient := retryablehttp.NewClient()
httpClient.RetryMax = MaxRetries
httpClient.Logger = log
httpClient.CheckRetry = retryablehttp.DefaultRetryPolicy
checkRetry := func(ctx context.Context, resp *http.Response, err error) (bool, error) {
yes, err2 := retryablehttp.DefaultRetryPolicy(ctx, resp, err)
if yes {
log.Warn("Retrying request", "statusCode", resp.Status, "error", err)
}
return yes, err2
}

httpClient.CheckRetry = checkRetry
httpClient.Backoff = retryablehttp.LinearJitterBackoff
httpClient.RetryWaitMin = MinWaitDur
httpClient.RetryWaitMax = MaxWaitDur
return &client{
log: log.With("module", "duneapi"),
httpClient: httpClient,
Expand Down Expand Up @@ -154,7 +166,6 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
if err != nil {
return err
}

return nil
}

Expand Down
10 changes: 3 additions & 7 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func (i *ingester) ConsumeBlocks(
case blockNumber := <-blockNumbers:
startTime := time.Now()

i.log.Info("Getting block by number", "blockNumber", blockNumber)
block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -154,7 +153,6 @@ func (i *ingester) ConsumeBlocks(
i.log.Info("ConsumeBlocks: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
return ctx.Err()
case blocks <- block:
i.log.Info("Sent block")
}
}
}
Expand All @@ -178,11 +176,9 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
}

blockMap[block.BlockNumber] = block
i.log.Info("Received block", "blockNumber", block.BlockNumber)

// Send this block only if we have sent all previous blocks
for block, ok := blockMap[next]; ok; block, ok = blockMap[next] {
i.log.Info("SendBlocks: Sending block to DuneAPI", "blockNumber", block.BlockNumber)
if err := i.dune.SendBlock(ctx, block); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
Expand All @@ -197,7 +193,6 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
}

Expand Down Expand Up @@ -250,7 +245,8 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
fields = append(fields, "fallingBehind", fallingBehind)
}
if newDistance > 1 {
fields = append(fields, "distanceFromLatest", newDistance)
etaHours := time.Duration(float64(newDistance) / blocksPerSec * float64(time.Second)).Hours()
fields = append(fields, "hoursToCatchUp", fmt.Sprintf("%.1f", etaHours))
}
if rpcErrors > 0 {
fields = append(fields, "rpcErrors", rpcErrors)
Expand All @@ -259,7 +255,7 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
fields = append(fields, "duneErrors", duneErrors)
}

i.log.Info("ProgressReport", fields...)
i.log.Info("PROGRESS REPORT", fields...)
previousIngested = lastIngested
previousDistance = newDistance
previousTime = tNow
Expand Down

0 comments on commit 6166f19

Please sign in to comment.