From 3c2104ddacfbd4873d8f84a56f4bfbf37f483042 Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Tue, 25 Jun 2024 14:26:50 +0200 Subject: [PATCH] Dune client: SendBlock -> SendBlocks --- client/duneapi/client.go | 75 ++++++++++++++++++++++++++------------- client/duneapi/models.go | 12 ++++--- ingester/mainloop.go | 2 +- ingester/mainloop_test.go | 23 +++++++++--- mocks/duneapi/client.go | 62 ++++++++++++++++---------------- 5 files changed, 110 insertions(+), 64 deletions(-) diff --git a/client/duneapi/client.go b/client/duneapi/client.go index 671bdcb..2e0e820 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -8,6 +8,7 @@ import ( "io" "log/slog" "net/http" + "strings" "sync" "time" @@ -23,8 +24,8 @@ const ( ) type BlockchainIngester interface { - // SendBlock sends a block to DuneAPI - SendBlock(ctx context.Context, payload models.RPCBlock) error + // SendBlock sends a batch of blocks to DuneAPI + SendBlocks(ctx context.Context, payloads []models.RPCBlock) error // GetProgressReport gets a progress report from DuneAPI GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) @@ -81,36 +82,54 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line } // SendBlock sends a block to DuneAPI -func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error { +func (c *client) SendBlocks(ctx context.Context, payloads []models.RPCBlock) error { buffer := c.bufPool.Get().(*bytes.Buffer) defer c.bufPool.Put(buffer) - request, err := c.buildRequest(payload, buffer) + request, err := c.buildRequest(payloads, buffer) if err != nil { return err } - return c.sendRequest(ctx, request) + return c.sendRequest(ctx, *request) } -func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (BlockchainIngestRequest, error) { - var request BlockchainIngestRequest +func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer) (*BlockchainIngestRequest, error) { + request := &BlockchainIngestRequest{} + // not thread safe, multiple calls to the compressor here if c.cfg.DisableCompression { - request.Payload = payload.Payload + buffer.Reset() + for _, block := range payloads { + _, err := buffer.Write(block.Payload) + if err != nil { + return nil, err + } + } + request.Payload = buffer.Bytes() } else { - // not thread safe, multiple calls to the compressor here buffer.Reset() c.compressor.Reset(buffer) - _, err := c.compressor.Write(payload.Payload) + for _, block := range payloads { + _, err := c.compressor.Write(block.Payload) + if err != nil { + return nil, err + } + } + err := c.compressor.Close() if err != nil { - return request, err + return nil, err } - c.compressor.Close() request.ContentEncoding = "application/zstd" request.Payload = buffer.Bytes() } - request.BlockNumber = payload.BlockNumber - request.IdempotencyKey = c.idempotencyKey(payload) + + blockNumbers := make([]string, len(payloads)) + for i, payload := range payloads { + blockNumbers[i] = fmt.Sprintf("%d", payload.BlockNumber) + } + + request.BlockNumbers = blockNumbers + request.IdempotencyKey = c.idempotencyKey(blockNumbers) request.EVMStack = c.cfg.Stack.String() return request, nil } @@ -124,15 +143,15 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques defer func() { if err != nil { c.log.Error("INGEST FAILED", - "blockNumber", request.BlockNumber, + "blockNumbers", strings.Join(request.BlockNumbers, ","), "error", err, "statusCode", responseStatus, "payloadSize", len(request.Payload), "duration", time.Since(start), ) } else { - c.log.Info("BLOCK SENT", - "blockNumber", request.BlockNumber, + c.log.Info("INGEST SUCCESS", + "blockNumbers", strings.Join(request.BlockNumbers, ","), "response", response.String(), "payloadSize", len(request.Payload), "duration", time.Since(start), @@ -153,25 +172,33 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques req.Header.Set("x-idempotency-key", request.IdempotencyKey) req.Header.Set("x-dune-evm-stack", request.EVMStack) req.Header.Set("x-dune-api-key", c.cfg.APIKey) + // TODO: Set a header about the number of blocks in the payload req = req.WithContext(ctx) resp, err := c.httpClient.Do(req) if err != nil { return err } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected status code: %v, %v", resp.StatusCode, resp.Status) - } + responseStatus = resp.Status + err = json.NewDecoder(resp.Body).Decode(&response) if err != nil { return err } + + if resp.StatusCode != http.StatusOK { + // We mutate the global err here because we have deferred a log message where we check for non-nil err + err := fmt.Errorf("unexpected status code: %v, %v", resp.StatusCode, resp.Status) + return err + } + return nil } -func (c *client) idempotencyKey(rpcBlock models.RPCBlock) string { - // for idempotency we use the block number (should we use also the date?, or a startup timestamp?) - return fmt.Sprintf("%v", rpcBlock.BlockNumber) +func (c *client) idempotencyKey(blockNumbers []string) string { + // for idempotency we use the block numbers in the request + // (should we use also the date?, or a startup timestamp?) + return strings.Join(blockNumbers, ",") } func (c *client) Close() error { @@ -265,7 +292,7 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName) c.log.Debug("Sending request", "url", url) - req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // empty body + req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // nil: empty body if err != nil { return nil, err } diff --git a/client/duneapi/models.go b/client/duneapi/models.go index 1afd273..96677f0 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -23,8 +23,12 @@ type Config struct { DisableCompression bool } +// The response from the DuneAPI ingest endpoint. +// In the case of a successful response, only the tables field is populated, +// and in the case of an error response, only the error field is populated. type BlockchainIngestResponse struct { - Tables []IngestedTableInfo `json:"tables"` + Error string `json:"error,omitempty"` + Tables []IngestedTableInfo `json:"tables,omitempty"` } type IngestedTableInfo struct { @@ -49,7 +53,7 @@ func (b *BlockchainIngestResponse) String() string { } type BlockchainIngestRequest struct { - BlockNumber int64 + BlockNumbers []string ContentEncoding string EVMStack string IdempotencyKey string @@ -57,8 +61,8 @@ type BlockchainIngestRequest struct { } type BlockchainProgress struct { - LastIngestedBlockNumber int64 `json:"last_ingested_block_number"` - LatestBlockNumber int64 `json:"latest_block_number"` + LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"` + LatestBlockNumber int64 `json:"latest_block_number,omitempty"` } func (p *BlockchainProgress) String() string { diff --git a/ingester/mainloop.go b/ingester/mainloop.go index ee7a2bd..4f892dd 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -196,7 +196,7 @@ func (i *ingester) trySendCompletedBlocks( ) int64 { // Send this block only if we have sent all previous blocks for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] { - if err := i.dune.SendBlock(ctx, block); err != nil { + if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block}); err != nil { if errors.Is(err, context.Canceled) { i.log.Info("SendBlocks: Context canceled, stopping") return nextNumberToSend diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 366ae90..1bedc47 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -25,7 +25,12 @@ func TestRunLoopUntilCancel(t *testing.T) { sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { + SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { + if len(blocks) != 1 { + panic("expected 1 block") + } + block := blocks[0] + atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) if block.BlockNumber == maxBlockNumber { // cancel execution when we send the last block @@ -68,7 +73,7 @@ func TestRunLoopUntilCancel(t *testing.T) { func TestProduceBlockNumbers(t *testing.T) { duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(_ context.Context, _ models.RPCBlock) error { + SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error { return nil }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { @@ -107,7 +112,12 @@ func TestProduceBlockNumbers(t *testing.T) { func TestSendBlocks(t *testing.T) { sentBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { + SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { + if len(blocks) != 1 { + panic("expected 1 block") + } + block := blocks[0] + // DuneAPI must fail if it receives blocks out of order if block.BlockNumber != sentBlockNumber+1 { return errors.Errorf("blocks out of order") @@ -157,7 +167,12 @@ func TestRunLoopBlocksOutOfOrder(t *testing.T) { sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { + SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { + if len(blocks) != 1 { + panic("expected 1 block") + } + block := blocks[0] + // Test must fail if DuneAPI receives blocks out of order require.Equal(t, block.BlockNumber, sentBlockNumber+1) diff --git a/mocks/duneapi/client.go b/mocks/duneapi/client.go index f918ed3..64d979a 100644 --- a/mocks/duneapi/client.go +++ b/mocks/duneapi/client.go @@ -26,8 +26,8 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // PostProgressReportFunc: func(ctx context.Context, progress models.BlockchainIndexProgress) error { // panic("mock out the PostProgressReport method") // }, -// SendBlockFunc: func(ctx context.Context, payload models.RPCBlock) error { -// panic("mock out the SendBlock method") +// SendBlocksFunc: func(ctx context.Context, payloads []models.RPCBlock) error { +// panic("mock out the SendBlocks method") // }, // } // @@ -42,8 +42,8 @@ type BlockchainIngesterMock struct { // PostProgressReportFunc mocks the PostProgressReport method. PostProgressReportFunc func(ctx context.Context, progress models.BlockchainIndexProgress) error - // SendBlockFunc mocks the SendBlock method. - SendBlockFunc func(ctx context.Context, payload models.RPCBlock) error + // SendBlocksFunc mocks the SendBlocks method. + SendBlocksFunc func(ctx context.Context, payloads []models.RPCBlock) error // calls tracks calls to the methods. calls struct { @@ -59,17 +59,17 @@ type BlockchainIngesterMock struct { // Progress is the progress argument value. Progress models.BlockchainIndexProgress } - // SendBlock holds details about calls to the SendBlock method. - SendBlock []struct { + // SendBlocks holds details about calls to the SendBlocks method. + SendBlocks []struct { // Ctx is the ctx argument value. Ctx context.Context - // Payload is the payload argument value. - Payload models.RPCBlock + // Payloads is the payloads argument value. + Payloads []models.RPCBlock } } lockGetProgressReport sync.RWMutex lockPostProgressReport sync.RWMutex - lockSendBlock sync.RWMutex + lockSendBlocks sync.RWMutex } // GetProgressReport calls GetProgressReportFunc. @@ -140,38 +140,38 @@ func (mock *BlockchainIngesterMock) PostProgressReportCalls() []struct { return calls } -// SendBlock calls SendBlockFunc. -func (mock *BlockchainIngesterMock) SendBlock(ctx context.Context, payload models.RPCBlock) error { - if mock.SendBlockFunc == nil { - panic("BlockchainIngesterMock.SendBlockFunc: method is nil but BlockchainIngester.SendBlock was just called") +// SendBlocks calls SendBlocksFunc. +func (mock *BlockchainIngesterMock) SendBlocks(ctx context.Context, payloads []models.RPCBlock) error { + if mock.SendBlocksFunc == nil { + panic("BlockchainIngesterMock.SendBlocksFunc: method is nil but BlockchainIngester.SendBlocks was just called") } callInfo := struct { - Ctx context.Context - Payload models.RPCBlock + Ctx context.Context + Payloads []models.RPCBlock }{ - Ctx: ctx, - Payload: payload, + Ctx: ctx, + Payloads: payloads, } - mock.lockSendBlock.Lock() - mock.calls.SendBlock = append(mock.calls.SendBlock, callInfo) - mock.lockSendBlock.Unlock() - return mock.SendBlockFunc(ctx, payload) + mock.lockSendBlocks.Lock() + mock.calls.SendBlocks = append(mock.calls.SendBlocks, callInfo) + mock.lockSendBlocks.Unlock() + return mock.SendBlocksFunc(ctx, payloads) } -// SendBlockCalls gets all the calls that were made to SendBlock. +// SendBlocksCalls gets all the calls that were made to SendBlocks. // Check the length with: // -// len(mockedBlockchainIngester.SendBlockCalls()) -func (mock *BlockchainIngesterMock) SendBlockCalls() []struct { - Ctx context.Context - Payload models.RPCBlock +// len(mockedBlockchainIngester.SendBlocksCalls()) +func (mock *BlockchainIngesterMock) SendBlocksCalls() []struct { + Ctx context.Context + Payloads []models.RPCBlock } { var calls []struct { - Ctx context.Context - Payload models.RPCBlock + Ctx context.Context + Payloads []models.RPCBlock } - mock.lockSendBlock.RLock() - calls = mock.calls.SendBlock - mock.lockSendBlock.RUnlock() + mock.lockSendBlocks.RLock() + calls = mock.calls.SendBlocks + mock.lockSendBlocks.RUnlock() return calls }