diff --git a/builder/builder.go b/builder/builder.go index 64d7c9f..2abaf11 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -275,18 +275,21 @@ func (b *Builder) SubscribeProposerConstraints() error { return nil } +// subscribeToRelayForConstraints loops almost indefinitely to subscribe to +// constraints forwarded by relays. If the subscription fails or it is lost, it +// retries every 10 seconds for a max of 10 minutes to restablish the +// connection before existing. func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint string) error { attempts := 0 maxAttempts := 60 // Max 10 minutes of retries retryInterval := 10 * time.Second - var resp *http.Response - + // Main loop to reconnect to the relay for { - log.Info("Attempting to subscribe to constraints...") + log.Info("Attempting to subscribe to constraints...", "relayBaseEndpoint", relayBaseEndpoint) if attempts >= maxAttempts { - log.Error(fmt.Sprintf("Failed to subscribe to constraints after %d attempts", maxAttempts)) + log.Error(fmt.Sprintf("Failed to subscribe to constraints after %d attempts", maxAttempts), "relayBaseEndpoint", relayBaseEndpoint) return errors.New("failed to subscribe to constraints") } @@ -298,109 +301,109 @@ func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint string) error client := http.Client{} - resp, err = client.Do(req) + resp, err := client.Do(req) if err != nil { - log.Error(fmt.Sprintf("Failed to connect to SSE server: %v", err)) + log.Error(fmt.Sprintf("Failed to connect to SSE server: %v", err), "relayBaseEndpoint", relayBaseEndpoint) time.Sleep(retryInterval) attempts++ continue } if resp.StatusCode != http.StatusOK { - log.Error(fmt.Sprintf("Error subscribing to constraints via SSE: %s, %v", resp.Status, err)) - return err - } - break - } - - defer resp.Body.Close() - log.Info(fmt.Sprintf("Connected to SSE server: %s", relayBaseEndpoint)) - - var reader io.Reader - - // Check if the response is gzipped - if resp.Header.Get("Content-Encoding") == "gzip" { - // Decompress the response body - gzipReader, err := gzip.NewReader(resp.Body) - if err != nil { - return fmt.Errorf("error creating gzip reader: %v", err) - } - defer gzipReader.Close() - reader = gzipReader - } else { - reader = resp.Body - } - - bufReader := bufio.NewReader(reader) - for { - line, err := bufReader.ReadString('\n') - if err != nil { - if err == io.EOF { - log.Info("End of stream") - break - } - log.Error(fmt.Sprintf("Error reading from response body: %v", err)) - continue - } - - if !strings.HasPrefix(line, "data: ") { + log.Error(fmt.Sprintf("Error subscribing to constraints via SSE: %s, %v", resp.Status, err), "relayBaseEndpoint", relayBaseEndpoint) + resp.Body.Close() // Close response body to free resources + time.Sleep(retryInterval) + attempts++ continue } - data := strings.TrimPrefix(line, "data: ") + defer resp.Body.Close() + log.Info(fmt.Sprintf("Connected to SSE server: %s", relayBaseEndpoint)) + // Reset subscribing attempts + attempts = 0 - // We assume the data is the JSON representation of the constraints - log.Info(fmt.Sprintf("Received new constraint: %s", data)) - constraintsSigned := make(types.SignedConstraintsList, 0, 8) - if err := json.Unmarshal([]byte(data), &constraintsSigned); err != nil { - log.Warn(fmt.Sprintf("Failed to unmarshal constraints: %v", err)) - continue + var reader io.Reader + if resp.Header.Get("Content-Encoding") == "gzip" { + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { + return fmt.Errorf("error creating gzip reader: %v", err) + } + defer gzipReader.Close() + reader = gzipReader + } else { + reader = resp.Body } - if len(constraintsSigned) == 0 { - log.Warn("Received 0 length list of constraints") - continue - } + bufReader := bufio.NewReader(reader) + for { + line, err := bufReader.ReadString('\n') + if err != nil { + // If we encounter an EOF or another serious error, break out to reconnect + isEOF := err == io.EOF || strings.Contains(err.Error(), "EOF") + if isEOF { + log.Error("Encountered EOF. Connection to relay lost, attempting to reconnect...", "relayBaseEndpoint", relayBaseEndpoint) + time.Sleep(retryInterval) + break // Break to reconnect by restarting the main `for` loop + } - for _, constraint := range constraintsSigned { - // Check that the constraints pubkey is authorized to sign constraints - if !slices.Contains(b.slotConstraintsPubkeys, constraint.Message.Pubkey) { - log.Warn("Received constraint from unauthorized pubkey", "pubkey", constraint.Message.Pubkey) + // Log other errors but continue reading if recoverable + log.Error(fmt.Sprintf("Error reading from response body: %v", err)) continue } - // Verify the signature of the constraints message - valid, err := constraint.VerifySignature(constraint.Message.Pubkey, b.GetConstraintsDomain()) - if err != nil || !valid { - log.Error("Failed to verify constraint signature", "err", err) + if !strings.HasPrefix(line, "data: ") { continue } - decodedConstraints, err := DecodeConstraints(constraint) - if err != nil { - log.Error("Failed to decode constraint: ", err) + data := strings.TrimPrefix(line, "data: ") + + constraintsSigned := make(types.SignedConstraintsList, 0, 8) + if err := json.Unmarshal([]byte(data), &constraintsSigned); err != nil { + log.Warn(fmt.Sprintf("Failed to unmarshal constraints: %v", err)) continue } - // For every constraint, we need to check if it has already been seen for the associated slot - slotConstraints, _ := b.constraintsCache.Get(constraint.Message.Slot) - if len(slotConstraints) == 0 { - // New constraint for this slot, add it in the map and continue with the next constraint - b.constraintsCache.Put(constraint.Message.Slot, decodedConstraints) + if len(constraintsSigned) == 0 { + log.Warn("Received 0 length list of constraints") continue } - for hash := range decodedConstraints { - // Update the slot constraints - slotConstraints[hash] = decodedConstraints[hash] - } + log.Info(fmt.Sprintf("Received %d new constraints", len(constraintsSigned))) + + for _, constraint := range constraintsSigned { + if !slices.Contains(b.slotConstraintsPubkeys, constraint.Message.Pubkey) { + log.Warn("Received constraint from unauthorized pubkey", "pubkey", constraint.Message.Pubkey) + continue + } + + valid, err := constraint.VerifySignature(constraint.Message.Pubkey, b.GetConstraintsDomain()) + if err != nil || !valid { + log.Error("Failed to verify constraint signature", "err", err) + continue + } - // Update the slot constraints in the cache - b.constraintsCache.Put(constraint.Message.Slot, slotConstraints) + decodedConstraints, err := DecodeConstraints(constraint) + if err != nil { + log.Error("Failed to decode constraint: ", err) + continue + } + + slotConstraints, _ := b.constraintsCache.Get(constraint.Message.Slot) + if len(slotConstraints) == 0 { + b.constraintsCache.Put(constraint.Message.Slot, decodedConstraints) + continue + } + + for hash := range decodedConstraints { + slotConstraints[hash] = decodedConstraints[hash] + + log.Debug(fmt.Sprintf("Adding constraint with hash %s to cache for slot %d", hash, constraint.Message.Slot)) + } + + b.constraintsCache.Put(constraint.Message.Slot, slotConstraints) + } } } - - return nil } func (b *Builder) Stop() error { diff --git a/builder/builder_test.go b/builder/builder_test.go index 07d7d1f..0383e58 100644 --- a/builder/builder_test.go +++ b/builder/builder_test.go @@ -438,7 +438,7 @@ func TestSubscribeProposerConstraints(t *testing.T) { _, ok := builder.constraintsCache.Get(0) require.Equal(t, false, ok) - builder.subscribeToRelayForConstraints(builder.relay.Config().Endpoint) + go builder.subscribeToRelayForConstraints(builder.relay.Config().Endpoint) // Wait 2 seconds to save all constraints in cache time.Sleep(2 * time.Second)