Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(constraints-api): eof error handling logic #4

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 81 additions & 78 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,21 @@ func (b *Builder) SubscribeProposerConstraints() error {
return nil
}

// subscribeToRelayForConstraints loops almost indefinitely to subscribe to
// constraints forwarded by relays. If the subscription fails or it is lost, it
// retries every 10 seconds for a max of 10 minutes to restablish the
// connection before existing.
func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint string) error {
attempts := 0
maxAttempts := 60 // Max 10 minutes of retries
retryInterval := 10 * time.Second

var resp *http.Response

// Main loop to reconnect to the relay
for {
log.Info("Attempting to subscribe to constraints...")
log.Info("Attempting to subscribe to constraints...", "relayBaseEndpoint", relayBaseEndpoint)

if attempts >= maxAttempts {
log.Error(fmt.Sprintf("Failed to subscribe to constraints after %d attempts", maxAttempts))
log.Error(fmt.Sprintf("Failed to subscribe to constraints after %d attempts", maxAttempts), "relayBaseEndpoint", relayBaseEndpoint)
return errors.New("failed to subscribe to constraints")
}

Expand All @@ -298,109 +301,109 @@ func (b *Builder) subscribeToRelayForConstraints(relayBaseEndpoint string) error

client := http.Client{}

resp, err = client.Do(req)
resp, err := client.Do(req)
if err != nil {
log.Error(fmt.Sprintf("Failed to connect to SSE server: %v", err))
log.Error(fmt.Sprintf("Failed to connect to SSE server: %v", err), "relayBaseEndpoint", relayBaseEndpoint)
time.Sleep(retryInterval)
attempts++
continue
}

if resp.StatusCode != http.StatusOK {
log.Error(fmt.Sprintf("Error subscribing to constraints via SSE: %s, %v", resp.Status, err))
return err
}
break
}

defer resp.Body.Close()
log.Info(fmt.Sprintf("Connected to SSE server: %s", relayBaseEndpoint))

var reader io.Reader

// Check if the response is gzipped
if resp.Header.Get("Content-Encoding") == "gzip" {
// Decompress the response body
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
return fmt.Errorf("error creating gzip reader: %v", err)
}
defer gzipReader.Close()
reader = gzipReader
} else {
reader = resp.Body
}

bufReader := bufio.NewReader(reader)
for {
line, err := bufReader.ReadString('\n')
if err != nil {
if err == io.EOF {
log.Info("End of stream")
break
}
log.Error(fmt.Sprintf("Error reading from response body: %v", err))
continue
}

if !strings.HasPrefix(line, "data: ") {
log.Error(fmt.Sprintf("Error subscribing to constraints via SSE: %s, %v", resp.Status, err), "relayBaseEndpoint", relayBaseEndpoint)
resp.Body.Close() // Close response body to free resources
time.Sleep(retryInterval)
attempts++
continue
}

data := strings.TrimPrefix(line, "data: ")
defer resp.Body.Close()
log.Info(fmt.Sprintf("Connected to SSE server: %s", relayBaseEndpoint))
// Reset subscribing attempts
attempts = 0

// We assume the data is the JSON representation of the constraints
log.Info(fmt.Sprintf("Received new constraint: %s", data))
constraintsSigned := make(types.SignedConstraintsList, 0, 8)
if err := json.Unmarshal([]byte(data), &constraintsSigned); err != nil {
log.Warn(fmt.Sprintf("Failed to unmarshal constraints: %v", err))
continue
var reader io.Reader
if resp.Header.Get("Content-Encoding") == "gzip" {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
return fmt.Errorf("error creating gzip reader: %v", err)
}
defer gzipReader.Close()
reader = gzipReader
} else {
reader = resp.Body
}

if len(constraintsSigned) == 0 {
log.Warn("Received 0 length list of constraints")
continue
}
bufReader := bufio.NewReader(reader)
for {
line, err := bufReader.ReadString('\n')
if err != nil {
// If we encounter an EOF or another serious error, break out to reconnect
isEOF := err == io.EOF || strings.Contains(err.Error(), "EOF")
if isEOF {
log.Error("Encountered EOF. Connection to relay lost, attempting to reconnect...", "relayBaseEndpoint", relayBaseEndpoint)
time.Sleep(retryInterval)
break // Break to reconnect by restarting the main `for` loop
}

for _, constraint := range constraintsSigned {
// Check that the constraints pubkey is authorized to sign constraints
if !slices.Contains(b.slotConstraintsPubkeys, constraint.Message.Pubkey) {
log.Warn("Received constraint from unauthorized pubkey", "pubkey", constraint.Message.Pubkey)
// Log other errors but continue reading if recoverable
log.Error(fmt.Sprintf("Error reading from response body: %v", err))
continue
}

// Verify the signature of the constraints message
valid, err := constraint.VerifySignature(constraint.Message.Pubkey, b.GetConstraintsDomain())
if err != nil || !valid {
log.Error("Failed to verify constraint signature", "err", err)
if !strings.HasPrefix(line, "data: ") {
continue
}

decodedConstraints, err := DecodeConstraints(constraint)
if err != nil {
log.Error("Failed to decode constraint: ", err)
data := strings.TrimPrefix(line, "data: ")

constraintsSigned := make(types.SignedConstraintsList, 0, 8)
if err := json.Unmarshal([]byte(data), &constraintsSigned); err != nil {
log.Warn(fmt.Sprintf("Failed to unmarshal constraints: %v", err))
continue
}

// For every constraint, we need to check if it has already been seen for the associated slot
slotConstraints, _ := b.constraintsCache.Get(constraint.Message.Slot)
if len(slotConstraints) == 0 {
// New constraint for this slot, add it in the map and continue with the next constraint
b.constraintsCache.Put(constraint.Message.Slot, decodedConstraints)
if len(constraintsSigned) == 0 {
log.Warn("Received 0 length list of constraints")
continue
}

for hash := range decodedConstraints {
// Update the slot constraints
slotConstraints[hash] = decodedConstraints[hash]
}
log.Info(fmt.Sprintf("Received %d new constraints", len(constraintsSigned)))

for _, constraint := range constraintsSigned {
if !slices.Contains(b.slotConstraintsPubkeys, constraint.Message.Pubkey) {
log.Warn("Received constraint from unauthorized pubkey", "pubkey", constraint.Message.Pubkey)
continue
}

valid, err := constraint.VerifySignature(constraint.Message.Pubkey, b.GetConstraintsDomain())
if err != nil || !valid {
log.Error("Failed to verify constraint signature", "err", err)
continue
}

// Update the slot constraints in the cache
b.constraintsCache.Put(constraint.Message.Slot, slotConstraints)
decodedConstraints, err := DecodeConstraints(constraint)
if err != nil {
log.Error("Failed to decode constraint: ", err)
continue
}

slotConstraints, _ := b.constraintsCache.Get(constraint.Message.Slot)
if len(slotConstraints) == 0 {
b.constraintsCache.Put(constraint.Message.Slot, decodedConstraints)
continue
}

for hash := range decodedConstraints {
slotConstraints[hash] = decodedConstraints[hash]

log.Debug(fmt.Sprintf("Adding constraint with hash %s to cache for slot %d", hash, constraint.Message.Slot))
}

b.constraintsCache.Put(constraint.Message.Slot, slotConstraints)
}
}
}

return nil
}

func (b *Builder) Stop() error {
Expand Down
2 changes: 1 addition & 1 deletion builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func TestSubscribeProposerConstraints(t *testing.T) {
_, ok := builder.constraintsCache.Get(0)
require.Equal(t, false, ok)

builder.subscribeToRelayForConstraints(builder.relay.Config().Endpoint)
go builder.subscribeToRelayForConstraints(builder.relay.Config().Endpoint)
// Wait 2 seconds to save all constraints in cache
time.Sleep(2 * time.Second)

Expand Down
Loading