Skip to content

Commit

Permalink
Dune client: SendBlock -> SendBlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jun 25, 2024
1 parent d83f853 commit 3c2104d
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 64 deletions.
75 changes: 51 additions & 24 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"log/slog"
"net/http"
"strings"
"sync"
"time"

Expand All @@ -23,8 +24,8 @@ const (
)

type BlockchainIngester interface {
// SendBlock sends a block to DuneAPI
SendBlock(ctx context.Context, payload models.RPCBlock) error
// SendBlock sends a batch of blocks to DuneAPI
SendBlocks(ctx context.Context, payloads []models.RPCBlock) error

// GetProgressReport gets a progress report from DuneAPI
GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error)
Expand Down Expand Up @@ -81,36 +82,54 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
}

// SendBlock sends a block to DuneAPI
func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error {
func (c *client) SendBlocks(ctx context.Context, payloads []models.RPCBlock) error {
buffer := c.bufPool.Get().(*bytes.Buffer)
defer c.bufPool.Put(buffer)

request, err := c.buildRequest(payload, buffer)
request, err := c.buildRequest(payloads, buffer)
if err != nil {
return err
}
return c.sendRequest(ctx, request)
return c.sendRequest(ctx, *request)
}

func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (BlockchainIngestRequest, error) {
var request BlockchainIngestRequest
func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer) (*BlockchainIngestRequest, error) {
request := &BlockchainIngestRequest{}

// not thread safe, multiple calls to the compressor here
if c.cfg.DisableCompression {
request.Payload = payload.Payload
buffer.Reset()
for _, block := range payloads {
_, err := buffer.Write(block.Payload)
if err != nil {
return nil, err
}
}
request.Payload = buffer.Bytes()
} else {
// not thread safe, multiple calls to the compressor here
buffer.Reset()
c.compressor.Reset(buffer)
_, err := c.compressor.Write(payload.Payload)
for _, block := range payloads {
_, err := c.compressor.Write(block.Payload)
if err != nil {
return nil, err
}
}
err := c.compressor.Close()
if err != nil {
return request, err
return nil, err
}
c.compressor.Close()
request.ContentEncoding = "application/zstd"
request.Payload = buffer.Bytes()
}
request.BlockNumber = payload.BlockNumber
request.IdempotencyKey = c.idempotencyKey(payload)

blockNumbers := make([]string, len(payloads))
for i, payload := range payloads {
blockNumbers[i] = fmt.Sprintf("%d", payload.BlockNumber)
}

request.BlockNumbers = blockNumbers
request.IdempotencyKey = c.idempotencyKey(blockNumbers)
request.EVMStack = c.cfg.Stack.String()
return request, nil
}
Expand All @@ -124,15 +143,15 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
defer func() {
if err != nil {
c.log.Error("INGEST FAILED",
"blockNumber", request.BlockNumber,
"blockNumbers", strings.Join(request.BlockNumbers, ","),
"error", err,
"statusCode", responseStatus,
"payloadSize", len(request.Payload),
"duration", time.Since(start),
)
} else {
c.log.Info("BLOCK SENT",
"blockNumber", request.BlockNumber,
c.log.Info("INGEST SUCCESS",
"blockNumbers", strings.Join(request.BlockNumbers, ","),
"response", response.String(),
"payloadSize", len(request.Payload),
"duration", time.Since(start),
Expand All @@ -153,25 +172,33 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
req.Header.Set("x-idempotency-key", request.IdempotencyKey)
req.Header.Set("x-dune-evm-stack", request.EVMStack)
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
// TODO: Set a header about the number of blocks in the payload
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %v, %v", resp.StatusCode, resp.Status)
}
responseStatus = resp.Status

err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return err
}

if resp.StatusCode != http.StatusOK {
// We mutate the global err here because we have deferred a log message where we check for non-nil err
err := fmt.Errorf("unexpected status code: %v, %v", resp.StatusCode, resp.Status)
return err
}

return nil
}

func (c *client) idempotencyKey(rpcBlock models.RPCBlock) string {
// for idempotency we use the block number (should we use also the date?, or a startup timestamp?)
return fmt.Sprintf("%v", rpcBlock.BlockNumber)
func (c *client) idempotencyKey(blockNumbers []string) string {
// for idempotency we use the block numbers in the request
// (should we use also the date?, or a startup timestamp?)
return strings.Join(blockNumbers, ",")
}

func (c *client) Close() error {
Expand Down Expand Up @@ -265,7 +292,7 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex

url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // empty body
req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // nil: empty body
if err != nil {
return nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ type Config struct {
DisableCompression bool
}

// The response from the DuneAPI ingest endpoint.
// In the case of a successful response, only the tables field is populated,
// and in the case of an error response, only the error field is populated.
type BlockchainIngestResponse struct {
Tables []IngestedTableInfo `json:"tables"`
Error string `json:"error,omitempty"`
Tables []IngestedTableInfo `json:"tables,omitempty"`
}

type IngestedTableInfo struct {
Expand All @@ -49,16 +53,16 @@ func (b *BlockchainIngestResponse) String() string {
}

type BlockchainIngestRequest struct {
BlockNumber int64
BlockNumbers []string
ContentEncoding string
EVMStack string
IdempotencyKey string
Payload []byte
}

type BlockchainProgress struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number"`
LatestBlockNumber int64 `json:"latest_block_number"`
LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"`
LatestBlockNumber int64 `json:"latest_block_number,omitempty"`
}

func (p *BlockchainProgress) String() string {
Expand Down
2 changes: 1 addition & 1 deletion ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (i *ingester) trySendCompletedBlocks(
) int64 {
// Send this block only if we have sent all previous blocks
for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] {
if err := i.dune.SendBlock(ctx, block); err != nil {
if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block}); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextNumberToSend
Expand Down
23 changes: 19 additions & 4 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ func TestRunLoopUntilCancel(t *testing.T) {
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) != 1 {
panic("expected 1 block")
}
block := blocks[0]

atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
if block.BlockNumber == maxBlockNumber {
// cancel execution when we send the last block
Expand Down Expand Up @@ -68,7 +73,7 @@ func TestRunLoopUntilCancel(t *testing.T) {

func TestProduceBlockNumbers(t *testing.T) {
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, _ models.RPCBlock) error {
SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error {
return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
Expand Down Expand Up @@ -107,7 +112,12 @@ func TestProduceBlockNumbers(t *testing.T) {
func TestSendBlocks(t *testing.T) {
sentBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) != 1 {
panic("expected 1 block")
}
block := blocks[0]

// DuneAPI must fail if it receives blocks out of order
if block.BlockNumber != sentBlockNumber+1 {
return errors.Errorf("blocks out of order")
Expand Down Expand Up @@ -157,7 +167,12 @@ func TestRunLoopBlocksOutOfOrder(t *testing.T) {
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) != 1 {
panic("expected 1 block")
}
block := blocks[0]

// Test must fail if DuneAPI receives blocks out of order
require.Equal(t, block.BlockNumber, sentBlockNumber+1)

Expand Down
62 changes: 31 additions & 31 deletions mocks/duneapi/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3c2104d

Please sign in to comment.