From 7eea6a1a46dc9dfe52dd138dfe6cee5b4c7098e5 Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Tue, 5 Nov 2024 09:53:18 +0100 Subject: [PATCH 1/3] fix(constraints-api): eof error handling logic --- builder/builder.go | 151 ++++++++++++++++++++-------------------- builder/builder_test.go | 2 +- 2 files changed, 78 insertions(+), 75 deletions(-) diff --git a/builder/builder.go b/builder/builder.go index 64d7c9f..cce82c2 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -275,13 +275,16 @@ func (b *Builder) SubscribeProposerConstraints() error { return nil } +// subscribeToRelayForConstraints loops almost indefinitely to subscribe to +// constraints forwarded by relays. If the subscription fails or it is lost, it +// retries every 10 seconds for a max of 10 minutes the restablish the +// connection before existing. func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint string) error { attempts := 0 maxAttempts := 60 // Max 10 minutes of retries retryInterval := 10 * time.Second - var resp *http.Response - + // Main loop to reconnect to the relay for { log.Info("Attempting to subscribe to constraints...") @@ -298,7 +301,7 @@ func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint string) error client := http.Client{} - resp, err = client.Do(req) + resp, err := client.Do(req) if err != nil { log.Error(fmt.Sprintf("Failed to connect to SSE server: %v", err)) time.Sleep(retryInterval) @@ -308,99 +311,99 @@ func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint string) error if resp.StatusCode != http.StatusOK { log.Error(fmt.Sprintf("Error subscribing to constraints via SSE: %s, %v", resp.Status, err)) - return err - } - break - } - - defer resp.Body.Close() - log.Info(fmt.Sprintf("Connected to SSE server: %s", relayBaseEndpoint)) - - var reader io.Reader - - // Check if the response is gzipped - if resp.Header.Get("Content-Encoding") == "gzip" { - // Decompress the response body - gzipReader, err := gzip.NewReader(resp.Body) - if err != nil { - return fmt.Errorf("error creating gzip reader: %v", err) - } - defer gzipReader.Close() - reader = gzipReader - } else { - reader = resp.Body - } - - bufReader := bufio.NewReader(reader) - for { - line, err := bufReader.ReadString('\n') - if err != nil { - if err == io.EOF { - log.Info("End of stream") - break - } - log.Error(fmt.Sprintf("Error reading from response body: %v", err)) - continue - } - - if !strings.HasPrefix(line, "data: ") { + resp.Body.Close() // Close response body to free resources + time.Sleep(retryInterval) + attempts++ continue } - data := strings.TrimPrefix(line, "data: ") + defer resp.Body.Close() + log.Info(fmt.Sprintf("Connected to SSE server: %s", relayBaseEndpoint)) + // Reset subscribing attempts + attempts = 0 - // We assume the data is the JSON representation of the constraints - log.Info(fmt.Sprintf("Received new constraint: %s", data)) - constraintsSigned := make(types.SignedConstraintsList, 0, 8) - if err := json.Unmarshal([]byte(data), &constraintsSigned); err != nil { - log.Warn(fmt.Sprintf("Failed to unmarshal constraints: %v", err)) - continue + var reader io.Reader + if resp.Header.Get("Content-Encoding") == "gzip" { + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { + return fmt.Errorf("error creating gzip reader: %v", err) + } + defer gzipReader.Close() + reader = gzipReader + } else { + reader = resp.Body } - if len(constraintsSigned) == 0 { - log.Warn("Received 0 length list of constraints") - continue - } + bufReader := bufio.NewReader(reader) + for { + line, err := bufReader.ReadString('\n') + if err != nil { + // If we encounter an EOF or another serious error, break out to reconnect + isEOF := err == io.EOF || strings.Contains(err.Error(), "EOF") + if isEOF { + log.Error("Encountered EOF. Connection to relay lost, attempting to reconnect...") + time.Sleep(retryInterval) + break // Break to reconnect by restarting the main `for` loop + } - for _, constraint := range constraintsSigned { - // Check that the constraints pubkey is authorized to sign constraints - if !slices.Contains(b.slotConstraintsPubkeys, constraint.Message.Pubkey) { - log.Warn("Received constraint from unauthorized pubkey", "pubkey", constraint.Message.Pubkey) + // Log other errors but continue reading if recoverable + log.Error(fmt.Sprintf("Error reading from response body: %v", err)) continue } - // Verify the signature of the constraints message - valid, err := constraint.VerifySignature(constraint.Message.Pubkey, b.GetConstraintsDomain()) - if err != nil || !valid { - log.Error("Failed to verify constraint signature", "err", err) + if !strings.HasPrefix(line, "data: ") { continue } - decodedConstraints, err := DecodeConstraints(constraint) - if err != nil { - log.Error("Failed to decode constraint: ", err) + data := strings.TrimPrefix(line, "data: ") + + constraintsSigned := make(types.SignedConstraintsList, 0, 8) + if err := json.Unmarshal([]byte(data), &constraintsSigned); err != nil { + log.Warn(fmt.Sprintf("Failed to unmarshal constraints: %v", err)) continue } - // For every constraint, we need to check if it has already been seen for the associated slot - slotConstraints, _ := b.constraintsCache.Get(constraint.Message.Slot) - if len(slotConstraints) == 0 { - // New constraint for this slot, add it in the map and continue with the next constraint - b.constraintsCache.Put(constraint.Message.Slot, decodedConstraints) + if len(constraintsSigned) == 0 { + log.Warn("Received 0 length list of constraints") continue } - for hash := range decodedConstraints { - // Update the slot constraints - slotConstraints[hash] = decodedConstraints[hash] - } + log.Info(fmt.Sprintf("Received %d new constraints", len(constraintsSigned))) + + for _, constraint := range constraintsSigned { + if !slices.Contains(b.slotConstraintsPubkeys, constraint.Message.Pubkey) { + log.Warn("Received constraint from unauthorized pubkey", "pubkey", constraint.Message.Pubkey) + continue + } + + valid, err := constraint.VerifySignature(constraint.Message.Pubkey, b.GetConstraintsDomain()) + if err != nil || !valid { + log.Error("Failed to verify constraint signature", "err", err) + continue + } - // Update the slot constraints in the cache - b.constraintsCache.Put(constraint.Message.Slot, slotConstraints) + decodedConstraints, err := DecodeConstraints(constraint) + if err != nil { + log.Error("Failed to decode constraint: ", err) + continue + } + + slotConstraints, _ := b.constraintsCache.Get(constraint.Message.Slot) + if len(slotConstraints) == 0 { + b.constraintsCache.Put(constraint.Message.Slot, decodedConstraints) + continue + } + + for hash := range decodedConstraints { + slotConstraints[hash] = decodedConstraints[hash] + + log.Debug(fmt.Sprintf("Adding constraint with hash %s to cache for slot %d", hash, constraint.Message.Slot)) + } + + b.constraintsCache.Put(constraint.Message.Slot, slotConstraints) + } } } - - return nil } func (b *Builder) Stop() error { diff --git a/builder/builder_test.go b/builder/builder_test.go index 07d7d1f..0383e58 100644 --- a/builder/builder_test.go +++ b/builder/builder_test.go @@ -438,7 +438,7 @@ func TestSubscribeProposerConstraints(t *testing.T) { _, ok := builder.constraintsCache.Get(0) require.Equal(t, false, ok) - builder.subscribeToRelayForConstraints(builder.relay.Config().Endpoint) + go builder.subscribeToRelayForConstraints(builder.relay.Config().Endpoint) // Wait 2 seconds to save all constraints in cache time.Sleep(2 * time.Second) From a8fe548adf2274838502b872ca8249663a11c09b Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Tue, 5 Nov 2024 14:33:59 +0100 Subject: [PATCH 2/3] chore: add relay base endpoint to logs --- builder/builder.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/builder/builder.go b/builder/builder.go index cce82c2..e595a64 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -286,10 +286,10 @@ func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint string) error // Main loop to reconnect to the relay for { - log.Info("Attempting to subscribe to constraints...") + log.Info("Attempting to subscribe to constraints...", "relayBaseEndpoint", relayBaseEndpoint) if attempts >= maxAttempts { - log.Error(fmt.Sprintf("Failed to subscribe to constraints after %d attempts", maxAttempts)) + log.Error(fmt.Sprintf("Failed to subscribe to constraints after %d attempts", maxAttempts), "relayBaseEndpoint", relayBaseEndpoint) return errors.New("failed to subscribe to constraints") } @@ -303,14 +303,14 @@ func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint string) error resp, err := client.Do(req) if err != nil { - log.Error(fmt.Sprintf("Failed to connect to SSE server: %v", err)) + log.Error(fmt.Sprintf("Failed to connect to SSE server: %v", err), "relayBaseEndpoint", relayBaseEndpoint) time.Sleep(retryInterval) attempts++ continue } if resp.StatusCode != http.StatusOK { - log.Error(fmt.Sprintf("Error subscribing to constraints via SSE: %s, %v", resp.Status, err)) + log.Error(fmt.Sprintf("Error subscribing to constraints via SSE: %s, %v", resp.Status, err), "relayBaseEndpoint", relayBaseEndpoint) resp.Body.Close() // Close response body to free resources time.Sleep(retryInterval) attempts++ @@ -341,7 +341,7 @@ func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint string) error // If we encounter an EOF or another serious error, break out to reconnect isEOF := err == io.EOF || strings.Contains(err.Error(), "EOF") if isEOF { - log.Error("Encountered EOF. Connection to relay lost, attempting to reconnect...") + log.Error("Encountered EOF. Connection to relay lost, attempting to reconnect...", "relayBaseEndpoint", relayBaseEndpoint) time.Sleep(retryInterval) break // Break to reconnect by restarting the main `for` loop } From 911cda313da427fd57c95da071efb7e83744bb25 Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Tue, 5 Nov 2024 15:01:34 +0100 Subject: [PATCH 3/3] fix: typo --- builder/builder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/builder/builder.go b/builder/builder.go index e595a64..2abaf11 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -277,7 +277,7 @@ func (b *Builder) SubscribeProposerConstraints() error { // subscribeToRelayForConstraints loops almost indefinitely to subscribe to // constraints forwarded by relays. If the subscription fails or it is lost, it -// retries every 10 seconds for a max of 10 minutes the restablish the +// retries every 10 seconds for a max of 10 minutes to restablish the // connection before existing. func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint string) error { attempts := 0