From 2128d051af5c4208b713c78e927fbf0758449c89 Mon Sep 17 00:00:00 2001 From: Austonst Date: Thu, 5 Oct 2023 11:13:35 -0600 Subject: [PATCH] Redis waitgroup error handling, lint, tests --- datastore/redis.go | 3 ++- services/api/optimistic_test.go | 2 +- services/api/service.go | 29 ++++++++++++++++++++++------- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/datastore/redis.go b/datastore/redis.go index 86cf6454c..9cadf7409 100644 --- a/datastore/redis.go +++ b/datastore/redis.go @@ -146,6 +146,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) { keyBlockBuilderStatus: fmt.Sprintf("%s/%s:block-builder-status", redisPrefix, prefix), keyLastSlotDelivered: fmt.Sprintf("%s/%s:last-slot-delivered", redisPrefix, prefix), keyLastHashDelivered: fmt.Sprintf("%s/%s:last-hash-delivered", redisPrefix, prefix), + currentSlot: 0, }, nil } @@ -814,7 +815,7 @@ func (r *RedisCache) SetFloorBidValue(slot uint64, parentHash, proposerPubkey, v func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err error) { // Should never process more than one slot at a time if r.currentSlot != 0 { - return fmt.Errorf("already processing slot %d", r.currentSlot) + return fmt.Errorf("already processing slot %d", r.currentSlot) //nolint:goerr113 } keyProcessingSlot := r.keyProcessingSlot(slot) diff --git a/services/api/optimistic_test.go b/services/api/optimistic_test.go index b7a35b80e..e9b33e303 100644 --- a/services/api/optimistic_test.go +++ b/services/api/optimistic_test.go @@ -358,7 +358,7 @@ func TestPrepareBuildersForSlot(t *testing.T) { pkStr := pubkey.String() // Clear cache. backend.relay.blockBuildersCache = map[string]*blockBuilderCacheEntry{} - backend.relay.prepareBuildersForSlot(slot + 1) + backend.relay.prepareBuildersForSlot(slot+1, slot) entry, ok := backend.relay.blockBuildersCache[pkStr] require.True(t, ok) require.Equal(t, true, entry.status.IsHighPrio) diff --git a/services/api/service.go b/services/api/service.go index 686bd15cd..732b07942 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -516,7 +516,7 @@ func (api *RelayAPI) IsReady() bool { // - Stop returning bids // - Set ready /readyz to negative status // - Wait a bit to allow removal of service from load balancer and draining of requests -// - If in the middle of proccessing optimistic blocks, wait for those to finish and release redis lock +// - If in the middle of processing optimistic blocks, wait for those to finish and release redis lock func (api *RelayAPI) StopServer() (err error) { // avoid running this twice. setting srvShutdown to true makes /readyz switch to negative status if wasStopping := api.srvShutdown.Swap(true); wasStopping { @@ -541,7 +541,10 @@ func (api *RelayAPI) StopServer() (err error) { // wait for optimistic blocks api.optimisticBlocksWG.Wait() - api.redis.EndProcessingSlot(context.Background()) + err = api.redis.EndProcessingSlot(context.Background()) + if err != nil { + api.log.WithError(err).Error("failed to update redis optimistic processing slot") + } // shutdown return api.srv.Shutdown(context.Background()) @@ -834,13 +837,19 @@ func (api *RelayAPI) updateProposerDuties(headSlot uint64) { api.log.Infof("proposer duties updated: %s", strings.Join(_duties, ", ")) } -func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64, prevHeadSlot uint64) { +func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) { // First wait for this process to finish processing optimistic blocks api.optimisticBlocksWG.Wait() // Now we release our lock and wait for all other builder processes to wrap up - api.redis.EndProcessingSlot(context.Background()) - api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot + 1) + err := api.redis.EndProcessingSlot(context.Background()) + if err != nil { + api.log.WithError(err).Error("failed to update redis optimistic processing slot") + } + err = api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot+1) + if err != nil { + api.log.WithError(err).Error("failed to get redis optimistic processing slot") + } // Prevent race with StopServer, make sure we don't lock up redis if the server is shutting down if api.srvShutdown.Load() { @@ -849,7 +858,10 @@ func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64, prevHeadSlot uint64 // Update the optimistic slot and signal processing of the next slot api.optimisticSlot.Store(headSlot + 1) - api.redis.BeginProcessingSlot(context.Background(), headSlot + 1) + err = api.redis.BeginProcessingSlot(context.Background(), headSlot+1) + if err != nil { + api.log.WithError(err).Error("failed to update redis optimistic processing slot") + } builders, err := api.db.GetBlockBuilders() if err != nil { @@ -1404,7 +1416,10 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request) } // Wait until optimistic blocks are complete using the redis waitgroup - api.redis.WaitForSlotComplete(context.Background(), payload.Slot()) + err = api.redis.WaitForSlotComplete(context.Background(), payload.Slot()) + if err != nil { + api.log.WithError(err).Error("failed to get redis optimistic processing slot") + } // Check if there is a demotion for the winning block. _, err = api.db.GetBuilderDemotion(bidTrace)