Skip to content

Commit

Permalink
Redis waitgroup error handling, lint, tests
Browse files Browse the repository at this point in the history
  • Loading branch information
austonst committed Feb 7, 2024
1 parent 8fe36fb commit 2128d05
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
3 changes: 2 additions & 1 deletion datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
keyBlockBuilderStatus: fmt.Sprintf("%s/%s:block-builder-status", redisPrefix, prefix),
keyLastSlotDelivered: fmt.Sprintf("%s/%s:last-slot-delivered", redisPrefix, prefix),
keyLastHashDelivered: fmt.Sprintf("%s/%s:last-hash-delivered", redisPrefix, prefix),
currentSlot: 0,
}, nil
}

Expand Down Expand Up @@ -814,7 +815,7 @@ func (r *RedisCache) SetFloorBidValue(slot uint64, parentHash, proposerPubkey, v
func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err error) {
// Should never process more than one slot at a time
if r.currentSlot != 0 {
return fmt.Errorf("already processing slot %d", r.currentSlot)
return fmt.Errorf("already processing slot %d", r.currentSlot) //nolint:goerr113
}

keyProcessingSlot := r.keyProcessingSlot(slot)
Expand Down
2 changes: 1 addition & 1 deletion services/api/optimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestPrepareBuildersForSlot(t *testing.T) {
pkStr := pubkey.String()
// Clear cache.
backend.relay.blockBuildersCache = map[string]*blockBuilderCacheEntry{}
backend.relay.prepareBuildersForSlot(slot + 1)
backend.relay.prepareBuildersForSlot(slot+1, slot)
entry, ok := backend.relay.blockBuildersCache[pkStr]
require.True(t, ok)
require.Equal(t, true, entry.status.IsHighPrio)
Expand Down
29 changes: 22 additions & 7 deletions services/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func (api *RelayAPI) IsReady() bool {
// - Stop returning bids
// - Set ready /readyz to negative status
// - Wait a bit to allow removal of service from load balancer and draining of requests
// - If in the middle of proccessing optimistic blocks, wait for those to finish and release redis lock
// - If in the middle of processing optimistic blocks, wait for those to finish and release redis lock
func (api *RelayAPI) StopServer() (err error) {
// avoid running this twice. setting srvShutdown to true makes /readyz switch to negative status
if wasStopping := api.srvShutdown.Swap(true); wasStopping {
Expand All @@ -541,7 +541,10 @@ func (api *RelayAPI) StopServer() (err error) {

// wait for optimistic blocks
api.optimisticBlocksWG.Wait()
api.redis.EndProcessingSlot(context.Background())
err = api.redis.EndProcessingSlot(context.Background())
if err != nil {
api.log.WithError(err).Error("failed to update redis optimistic processing slot")
}

// shutdown
return api.srv.Shutdown(context.Background())
Expand Down Expand Up @@ -834,13 +837,19 @@ func (api *RelayAPI) updateProposerDuties(headSlot uint64) {
api.log.Infof("proposer duties updated: %s", strings.Join(_duties, ", "))
}

func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64, prevHeadSlot uint64) {
func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) {
// First wait for this process to finish processing optimistic blocks
api.optimisticBlocksWG.Wait()

// Now we release our lock and wait for all other builder processes to wrap up
api.redis.EndProcessingSlot(context.Background())
api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot + 1)
err := api.redis.EndProcessingSlot(context.Background())
if err != nil {
api.log.WithError(err).Error("failed to update redis optimistic processing slot")
}
err = api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot+1)
if err != nil {
api.log.WithError(err).Error("failed to get redis optimistic processing slot")
}

// Prevent race with StopServer, make sure we don't lock up redis if the server is shutting down
if api.srvShutdown.Load() {
Expand All @@ -849,7 +858,10 @@ func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64, prevHeadSlot uint64

// Update the optimistic slot and signal processing of the next slot
api.optimisticSlot.Store(headSlot + 1)
api.redis.BeginProcessingSlot(context.Background(), headSlot + 1)
err = api.redis.BeginProcessingSlot(context.Background(), headSlot+1)
if err != nil {
api.log.WithError(err).Error("failed to update redis optimistic processing slot")
}

builders, err := api.db.GetBlockBuilders()
if err != nil {
Expand Down Expand Up @@ -1404,7 +1416,10 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
}

// Wait until optimistic blocks are complete using the redis waitgroup
api.redis.WaitForSlotComplete(context.Background(), payload.Slot())
err = api.redis.WaitForSlotComplete(context.Background(), payload.Slot())

Check failure on line 1419 in services/api/service.go

View workflow job for this annotation

GitHub Actions / Test

multiple-value payload.Slot() (value of type (phase0.Slot, error)) in single-value context

Check failure on line 1419 in services/api/service.go

View workflow job for this annotation

GitHub Actions / Lint

multiple-value payload.Slot() (value of type (phase0.Slot, error)) in single-value context
if err != nil {
api.log.WithError(err).Error("failed to get redis optimistic processing slot")
}

// Check if there is a demotion for the winning block.
_, err = api.db.GetBuilderDemotion(bidTrace)
Expand Down

0 comments on commit 2128d05

Please sign in to comment.