Skip to content

Commit

Permalink
Add DLQ for reprocessing of failed blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
adammilnesmith committed Jul 11, 2024
1 parent 67dbd31 commit fd79804
Show file tree
Hide file tree
Showing 14 changed files with 578 additions and 19 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ go.work
go.work.sum

# Binary
indexer
indexer
bin

.idea
75 changes: 75 additions & 0 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type BlockchainIngester interface {
// PostProgressReport sends a progress report to DuneAPI
PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error

GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error)

// - API to discover the latest block number ingested
// this can also provide "next block ranges" to push to DuneAPI
// - log/metrics on catching up/falling behind, distance from tip of chain
Expand Down Expand Up @@ -366,3 +368,76 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex
}
return progress, nil
}

func (c *client) GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error) {
var response BlockchainGapsResponse
var err error
var responseStatus string
start := time.Now()

// Log response
defer func() {
if err != nil {
c.log.Error("Getting block gaps failed",
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
)
} else {
c.log.Info("Got block gaps",
"blockGaps", response.String(),
"duration", time.Since(start),
)
}
}()

url := fmt.Sprintf("%s/api/beta/blockchain/%s/gaps", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // nil: empty body
if err != nil {
return nil, err
}
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
bs, _ := io.ReadAll(resp.Body)
responseBody := string(bs)
// We mutate the global err here because we have deferred a log message where we check for non-nil err
err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody)
return nil, err
}

err = json.Unmarshal(responseBody, &response)
if err != nil {
return nil, err
}

gaps := &models.BlockchainGaps{
Gaps: mapSlice(response.Gaps, func(gap BlockGap) models.BlockGap {
return models.BlockGap{
FirstMissing: gap.FirstMissing,
LastMissing: gap.LastMissing,
}
}),
}
return gaps, nil
}

func mapSlice[T any, U any](slice []T, mapper func(T) U) []U {
result := make([]U, len(slice))
for i, v := range slice {
result[i] = mapper(v)
}
return result
}
13 changes: 13 additions & 0 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,16 @@ type BlockchainError struct {
Error string `json:"error"`
Source string `json:"source"`
}

type BlockchainGapsResponse struct {
Gaps []BlockGap `json:"gaps"`
}

type BlockGap struct {
FirstMissing int64 `json:"first_missing"`
LastMissing int64 `json:"last_missing"`
}

func (b *BlockchainGapsResponse) String() string {
return fmt.Sprintf("%+v", *b)
}
12 changes: 12 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ func main() {
startBlockNumber = cfg.BlockHeight
}

dlq := ingester.NewDLQ()

if !cfg.DisableGapsQuery {
blockGaps, err := duneClient.GetBlockGaps(ctx)
if err != nil {
stdlog.Fatal(err)
} else {
dlq.AddBlockGaps(blockGaps.Gaps)
}
}

maxCount := int64(0) // 0 means ingest until cancelled
ingester := ingester.New(
logger,
Expand All @@ -116,6 +127,7 @@ func main() {
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
},
progress,
dlq,
)

wg.Add(1)
Expand Down
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func (r RPCClient) HasError() error {
}

type Config struct {
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll
DisableGapsQuery bool `long:"disable-gaps-query" env:"DISABLE_GAPS_QUERY" description:"disable gaps query used to populate the initial DLQ"` // nolint:lll
Dune DuneClient
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/duneanalytics/blockchain-ingester
go 1.22.2

require (
github.com/emirpasic/gods v1.18.1
github.com/go-errors/errors v1.5.1
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/jessevdk/go-flags v1.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
Expand Down
102 changes: 102 additions & 0 deletions ingester/dlq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package ingester

import (
"fmt"
"slices"
"sync"
"time"

"github.com/duneanalytics/blockchain-ingester/models"
pq "github.com/emirpasic/gods/queues/priorityqueue"
"github.com/emirpasic/gods/utils"
)

type DLQ struct {
priorityQueue pq.Queue // structure not thread safe
mutex sync.Mutex
retryDelay func(int) time.Duration
}

// DLQBlock This is generic so that metadata about retries can be maintained in an envelope during processing for when
// a block needs to make its way back onto the DLQ later
type DLQBlock[T any] struct {
Value T
Retries int
nextRunTime time.Time
}

func (b *DLQBlock[T]) String() string {
return fmt.Sprintf("%+v", *b)
}

func MapDLQBlock[T, U any](b DLQBlock[T], mapper func(T) U) DLQBlock[U] {
return DLQBlock[U]{
Value: mapper(b.Value),
Retries: b.Retries,
}
}

func NewDLQ() *DLQ {
return &DLQ{priorityQueue: *pq.NewWith(byNextRunTime), retryDelay: retryDelayLinear}
}

func retryDelayLinear(retries int) time.Duration {
return time.Duration(retries) * time.Minute
}

func NewDLQWithDelay(retryDelay func(int) time.Duration) *DLQ {
return &DLQ{priorityQueue: *pq.NewWith(byNextRunTime), retryDelay: retryDelay}
}

// Comparator function (sort by retry count in ascending order)
func byRetries(a, b interface{}) int {

Check failure on line 52 in ingester/dlq.go

View workflow job for this annotation

GitHub Actions / Lint and test

func `byRetries` is unused (unused)
return utils.IntComparator(a.(DLQBlock[int64]).Retries, b.(DLQBlock[int64]).Retries)
}

// Comparator function (sort by nextRunTime in ascending order)
func byNextRunTime(a, b interface{}) int {
return utils.TimeComparator(a.(DLQBlock[int64]).nextRunTime, b.(DLQBlock[int64]).nextRunTime)
}

func (dlq *DLQ) AddBlockGaps(gaps []models.BlockGap) {
// queue these in reverse so that recent blocks are retried first
slices.SortFunc(gaps, func(a, b models.BlockGap) int {
return -utils.Int64Comparator(a.FirstMissing, b.FirstMissing)
})

dlq.mutex.Lock()
defer dlq.mutex.Unlock()

for _, gap := range gaps {
for i := gap.FirstMissing; i <= gap.LastMissing; i++ {
dlq.priorityQueue.Enqueue(DLQBlock[int64]{Value: i})
}
}
}

func (dlq *DLQ) AddBlock(blockNumber int64, retries int) {
nextRunTime := time.Now().Add(dlq.retryDelay(retries + 1))

dlq.mutex.Lock()
defer dlq.mutex.Unlock()

dlq.priorityQueue.Enqueue(DLQBlock[int64]{Value: blockNumber, Retries: retries + 1, nextRunTime: nextRunTime})
}

func (dlq *DLQ) GetNextBlock() (value *DLQBlock[int64], ok bool) {
dlq.mutex.Lock()
defer dlq.mutex.Unlock()

peek, ok := dlq.priorityQueue.Peek()
if !ok || peek.(DLQBlock[int64]).nextRunTime.After(time.Now()) {
return nil, false
}

block, ok := dlq.priorityQueue.Dequeue()
if ok {
blockCasted := block.(DLQBlock[int64])
return &blockCasted, ok
} else {

Check failure on line 99 in ingester/dlq.go

View workflow job for this annotation

GitHub Actions / Lint and test

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
return nil, ok
}
}
16 changes: 9 additions & 7 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ type Ingester interface {
// - a fatal error occurs
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error

// This is just a placeholder for now
Info() Info
ProduceBlockNumbersDLQ(ctx context.Context, outChan chan DLQBlock[int64]) error

FetchBlockLoopDLQ(ctx context.Context, blockNumbers <-chan DLQBlock[int64], blocks chan<- DLQBlock[models.RPCBlock],
) error

SendBlocksDLQ(ctx context.Context, blocks <-chan DLQBlock[models.RPCBlock]) error

Close() error
}

const (
defaultMaxBatchSize = 5
defaultReportProgressInterval = 30 * time.Second
)

Expand All @@ -58,6 +61,7 @@ type ingester struct {
dune duneapi.BlockchainIngester
cfg Config
info Info
dlq *DLQ
}

func New(
Expand All @@ -66,6 +70,7 @@ func New(
dune duneapi.BlockchainIngester,
cfg Config,
progress *models.BlockchainIndexProgress,
dlq *DLQ,
) Ingester {
info := NewInfo(cfg.BlockchainName, cfg.Stack.String())
if progress != nil {
Expand All @@ -78,13 +83,10 @@ func New(
dune: dune,
cfg: cfg,
info: info,
dlq: dlq,
}
if ing.cfg.ReportProgressInterval == 0 {
ing.cfg.ReportProgressInterval = defaultReportProgressInterval
}
return ing
}

func (i *ingester) Info() Info {
return i.info
}
Loading

0 comments on commit fd79804

Please sign in to comment.