Skip to content

Commit

Permalink
only use block events to keep track of the chain (head events are inc…
Browse files Browse the repository at this point in the history
…onsistent across clients)
  • Loading branch information
pk910 committed Aug 27, 2023
1 parent 925e38d commit 31efa9c
Showing 1 changed file with 3 additions and 11 deletions.
14 changes: 3 additions & 11 deletions indexer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (client *IndexerClient) runIndexerClient() error {
client.retryCounter = 0

// start event stream
blockStream := client.rpcClient.NewBlockStream(rpc.StreamBlockEvent | rpc.StreamHeadEvent | rpc.StreamFinalizedEvent)
blockStream := client.rpcClient.NewBlockStream(rpc.StreamBlockEvent | rpc.StreamFinalizedEvent)
defer blockStream.Close()

// prefill cache
Expand Down Expand Up @@ -201,8 +201,6 @@ func (client *IndexerClient) runIndexerClient() error {
switch evt.Event {
case rpc.StreamBlockEvent:
client.processBlockEvent(evt.Data.(*rpctypes.StandardV1StreamedBlockEvent))
case rpc.StreamHeadEvent:
client.processHeadEvent(evt.Data.(*rpctypes.StandardV1StreamedHeadEvent))
case rpc.StreamFinalizedEvent:
client.processFinalizedEvent(evt.Data.(*rpctypes.StandardV1StreamedFinalizedCheckpointEvent))
}
Expand Down Expand Up @@ -243,6 +241,7 @@ func (client *IndexerClient) prefillCache(finalizedSlot uint64, latestHeader *rp
logger.WithField("client", client.clientName).Debugf("received known block %v:%v [0x%x] warmup, head", utils.EpochOfSlot(uint64(client.lastHeadSlot)), client.lastHeadSlot, client.lastHeadRoot)
}
client.ensureBlock(currentBlock, &latestHeader.Data.Header)
client.setHeadBlock(latestHeader.Data.Root, uint64(latestHeader.Data.Header.Message.Slot))

// walk backwards and load all blocks until we reach a finalized epoch
parentRoot := []byte(currentBlock.header.Message.ParentRoot)
Expand Down Expand Up @@ -448,17 +447,10 @@ func (client *IndexerClient) processBlockEvent(evt *rpctypes.StandardV1StreamedB
if err != nil {
return err
}
client.setHeadBlock(evt.Block, uint64(evt.Slot))
return nil
}

func (client *IndexerClient) processHeadEvent(evt *rpctypes.StandardV1StreamedHeadEvent) error {
currentBlock := client.indexerCache.getCachedBlock(evt.Block)
if currentBlock == nil {
return fmt.Errorf("received head event for non existing block: %v", evt.Block.String())
}
return client.setHeadBlock(evt.Block, uint64(evt.Slot))
}

func (client *IndexerClient) processFinalizedEvent(evt *rpctypes.StandardV1StreamedFinalizedCheckpointEvent) error {
logger.WithField("client", client.clientName).Debugf("received finalization_checkpoint event: epoch %v [%s]", evt.Epoch, evt.Block.String())
client.indexerCache.setFinalizedHead(int64(evt.Epoch)-1, evt.Block)
Expand Down

0 comments on commit 31efa9c

Please sign in to comment.