Skip to content

Commit

Permalink
Use a constant amount of memory for error slices and add error abstra…
Browse files Browse the repository at this point in the history
…ction (#57)

This PR changes the Info struct in the Ingester a bit so that we have an
inner Errors struct for keeping track of errors. We add a method for
resetting at, for observing errors, and for transforming to progress
reports. We make sure that the error slices do not grow unbounded, but
we keep track of the total error counts as well.
  • Loading branch information
vegarsti authored Jul 3, 2024
1 parent e247c53 commit 6d60696
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 69 deletions.
3 changes: 3 additions & 0 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
LastIngestedBlockNumber: progress.LastIngestedBlockNumber,
LatestBlockNumber: progress.LatestBlockNumber,
Errors: errors,
DuneErrorCounts: progress.DuneErrorCounts,
RPCErrorCounts: progress.RPCErrorCounts,
Since: progress.Since,
}
url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
payload, err := json.Marshal(request)
Expand Down
5 changes: 4 additions & 1 deletion client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func (p *GetBlockchainProgressResponse) String() string {
type PostBlockchainProgressRequest struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"`
LatestBlockNumber int64 `json:"latest_block_number,omitempty"`
Errors []BlockchainError `json:"errors,omitempty"`
Errors []BlockchainError `json:"errors"`
DuneErrorCounts int `json:"dune_error_counts"`
RPCErrorCounts int `json:"rpc_error_counts"`
Since time.Time `json:"since"`
}

type BlockchainError struct {
Expand Down
41 changes: 1 addition & 40 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,42 +52,6 @@ type Config struct {
SkipFailedBlocks bool
}

type Info struct {
LatestBlockNumber int64
IngestedBlockNumber int64
ConsumedBlockNumber int64
RPCErrors []ErrorInfo
DuneErrors []ErrorInfo
}

// Errors returns a combined list of errors from RPC requests and Dune requests, for use in progress reporting
func (info Info) Errors() []models.BlockchainIndexError {
errors := make([]models.BlockchainIndexError, 0, len(info.RPCErrors)+len(info.DuneErrors))
for _, e := range info.RPCErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "rpc",
})
}
for _, e := range info.DuneErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "dune",
})
}
return errors
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumbers string
Error error
}

type ingester struct {
log *slog.Logger
node jsonrpc.BlockchainClient
Expand All @@ -103,10 +67,7 @@ func New(
cfg Config,
progress *models.BlockchainIndexProgress,
) Ingester {
info := Info{
RPCErrors: []ErrorInfo{},
DuneErrors: []ErrorInfo{},
}
info := NewInfo(cfg.BlockchainName, cfg.Stack.String())
if progress != nil {
info.LatestBlockNumber = progress.LatestBlockNumber
info.IngestedBlockNumber = progress.LastIngestedBlockNumber
Expand Down
31 changes: 7 additions & 24 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds()
newDistance := latest - lastIngested

rpcErrors := len(i.info.RPCErrors)
duneErrors := len(i.info.DuneErrors)
fields := []interface{}{
"blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec),
"latestBlockNumber", latest,
Expand All @@ -203,31 +201,23 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
}
previousHoursToCatchUp = etaHours
}
if rpcErrors > 0 {
fields = append(fields, "rpcErrors", rpcErrors)
if i.info.Errors.RPCErrorCount > 0 {
fields = append(fields, "rpcErrors", i.info.Errors.RPCErrorCount)
}
if duneErrors > 0 {
fields = append(fields, "duneErrors", duneErrors)
if i.info.Errors.DuneErrorCount > 0 {
fields = append(fields, "duneErrors", i.info.Errors.DuneErrorCount)
}

i.log.Info("PROGRESS REPORT", fields...)
previousIngested = lastIngested
previousTime = tNow

err := i.dune.PostProgressReport(ctx, models.BlockchainIndexProgress{
BlockchainName: i.cfg.BlockchainName,
EVMStack: i.cfg.Stack.String(),
LastIngestedBlockNumber: lastIngested,
LatestBlockNumber: latest,
Errors: i.info.Errors(),
})
err := i.dune.PostProgressReport(ctx, i.info.ToProgressReport())
if err != nil {
i.log.Error("Failed to post progress report", "error", err)
} else {
i.log.Debug("Posted progress report")
// Reset errors after reporting
i.info.RPCErrors = []ErrorInfo{}
i.info.DuneErrors = []ErrorInfo{}
i.info.ResetErrors()
}
}
}
Expand All @@ -238,14 +228,7 @@ func (i *ingester) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
i.log.Info("Sending final progress report")
err := i.dune.PostProgressReport(
ctx,
models.BlockchainIndexProgress{
BlockchainName: i.cfg.BlockchainName,
EVMStack: i.cfg.Stack.String(),
LastIngestedBlockNumber: i.info.IngestedBlockNumber,
LatestBlockNumber: i.info.LatestBlockNumber,
})
err := i.dune.PostProgressReport(ctx, i.info.ToProgressReport())
i.log.Info("Closing node")
if err != nil {
_ = i.node.Close()
Expand Down
117 changes: 117 additions & 0 deletions ingester/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package ingester

import (
"time"

"github.com/duneanalytics/blockchain-ingester/models"
)

type Info struct {
BlockchainName string
Stack string
LatestBlockNumber int64
IngestedBlockNumber int64
ConsumedBlockNumber int64
Errors ErrorState
Since time.Time
}

func NewInfo(blockchain string, stack string) Info {
return Info{
BlockchainName: blockchain,
Stack: stack,
Errors: ErrorState{
RPCErrors: make([]ErrorInfo, 0, 100),
DuneErrors: make([]ErrorInfo, 0, 100),
RPCErrorCount: 0,
DuneErrorCount: 0,
},
Since: time.Now(),
}
}

func (info *Info) ToProgressReport() models.BlockchainIndexProgress {
return models.BlockchainIndexProgress{
BlockchainName: info.BlockchainName,
EVMStack: info.Stack,
LastIngestedBlockNumber: info.IngestedBlockNumber,
LatestBlockNumber: info.LatestBlockNumber,
Errors: info.ProgressReportErrors(),
DuneErrorCounts: info.Errors.DuneErrorCount,
RPCErrorCounts: info.Errors.RPCErrorCount,
Since: info.Since,
}
}

func (info *Info) ResetErrors() {
info.Since = time.Now()
info.Errors.Reset()
}

type ErrorState struct {
RPCErrors []ErrorInfo
DuneErrors []ErrorInfo
RPCErrorCount int
DuneErrorCount int
}

// ProgressReportErrors returns a combined list of errors from RPC requests and Dune requests
func (info Info) ProgressReportErrors() []models.BlockchainIndexError {
errors := make([]models.BlockchainIndexError, 0, len(info.Errors.RPCErrors)+len(info.Errors.DuneErrors))
for _, e := range info.Errors.RPCErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "rpc",
})
}
for _, e := range info.Errors.DuneErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "dune",
})
}
return errors
}

func (es *ErrorState) Reset() {
es.RPCErrors = es.RPCErrors[:0]
es.DuneErrors = es.DuneErrors[:0]
es.RPCErrorCount = 0
es.DuneErrorCount = 0
}

func (es *ErrorState) ObserveRPCError(err ErrorInfo) {
es.RPCErrorCount++
err.Timestamp = time.Now()

// If we have filled the slice, remove the oldest error
if len(es.RPCErrors) == cap(es.RPCErrors) {
tmp := make([]ErrorInfo, len(es.RPCErrors)-1, cap(es.RPCErrors))
copy(tmp, es.RPCErrors[1:])
es.RPCErrors = tmp
}
es.RPCErrors = append(es.RPCErrors, err)
}

func (es *ErrorState) ObserveDuneError(err ErrorInfo) {
es.DuneErrorCount++
err.Timestamp = time.Now()

// If we have filled the slice, remove the oldest error
if len(es.DuneErrors) == cap(es.DuneErrors) {
tmp := make([]ErrorInfo, len(es.DuneErrors)-1, cap(es.DuneErrors))
copy(tmp, es.DuneErrors[1:])
es.DuneErrors = tmp
}
es.DuneErrors = append(es.DuneErrors, err)
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumbers string
Error error
}
52 changes: 52 additions & 0 deletions ingester/models_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ingester_test

import (
"testing"

"github.com/duneanalytics/blockchain-ingester/ingester"
"github.com/go-errors/errors"
"github.com/stretchr/testify/require"
)

// TestInfoErrors ensures that we never allow the error slices to grow indefinitely
func TestInfoErrors(t *testing.T) {
info := ingester.NewInfo("test", "test")
for j := 0; j < 2; j++ {
for i := 0; i < 200; i++ {
require.Len(t, info.Errors.RPCErrors, min(i, 100))
require.Len(t, info.Errors.DuneErrors, min(i, 100))
info.Errors.ObserveDuneError(ingester.ErrorInfo{})
info.Errors.ObserveRPCError(ingester.ErrorInfo{})
require.Equal(t, 100, cap(info.Errors.RPCErrors))
require.Equal(t, 100, cap(info.Errors.DuneErrors))
}
info.ResetErrors()
require.Len(t, info.Errors.RPCErrors, 0)
require.Len(t, info.Errors.DuneErrors, 0)
require.Equal(t, 100, cap(info.Errors.RPCErrors))
require.Equal(t, 100, cap(info.Errors.DuneErrors))
}
}

func TestProgressReportErrors(t *testing.T) {
info := ingester.NewInfo("test", "test")
info.Errors.ObserveDuneError(ingester.ErrorInfo{Error: errors.New("foo")})
info.Errors.ObserveRPCError(ingester.ErrorInfo{Error: errors.New("bar")})
errors := info.ProgressReportErrors()
require.Len(t, errors, 2)
}

func TestInfoToProgressReport(t *testing.T) {
info := ingester.NewInfo("test", "test")
info.IngestedBlockNumber = 1
info.LatestBlockNumber = 2
info.Errors.ObserveDuneError(ingester.ErrorInfo{Error: errors.New("foo")})
report := info.ToProgressReport()
require.Equal(t, "test", report.BlockchainName)
require.Equal(t, "test", report.EVMStack)
require.Equal(t, int64(1), report.LastIngestedBlockNumber)
require.Equal(t, int64(2), report.LatestBlockNumber)
require.Len(t, report.Errors, 1)
require.Equal(t, 1, report.DuneErrorCounts)
require.Equal(t, 0, report.RPCErrorCounts)
}
7 changes: 3 additions & 4 deletions ingester/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock
}

if block.Errored() {
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
i.info.Errors.ObserveRPCError(ErrorInfo{
BlockNumbers: fmt.Sprintf("%d", block.BlockNumber),
Error: block.Error,
})
Expand Down Expand Up @@ -125,8 +124,8 @@ func (i *ingester) trySendBlockBatch(
for i, block := range blockBatch {
blocknumbers[i] = fmt.Sprintf("%d", block.BlockNumber)
}
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),

i.info.Errors.ObserveDuneError(ErrorInfo{
Error: err,
BlockNumbers: strings.Join(blocknumbers, ","),
})
Expand Down
3 changes: 3 additions & 0 deletions models/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ type BlockchainIndexProgress struct {
LastIngestedBlockNumber int64
LatestBlockNumber int64
Errors []BlockchainIndexError
DuneErrorCounts int
RPCErrorCounts int
Since time.Time
}

type BlockchainIndexError struct {
Expand Down

0 comments on commit 6d60696

Please sign in to comment.