From 80b0af90a58a934dec592be968b791e714d9e973 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 7 Jan 2025 20:50:21 +0530 Subject: [PATCH] [8.16](backport #42225) [filebeat][websocket] - Added infinite & blanket retry options to websockets and improved logging and retry logic (#42234) * [filebeat][websocket] - Added infinite & blanket retry options to websockets and improved logging and retry logic (#42225) * added blanket & infinite retry options and improved logging (cherry picked from commit 177a47a01599c754c190e1e27da11aafafdd715c) * Update CHANGELOG.next.asciidoc --------- Co-authored-by: ShourieG --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-streaming.asciidoc | 14 +++++- x-pack/filebeat/input/streaming/config.go | 10 +++-- .../filebeat/input/streaming/config_test.go | 12 ++++++ x-pack/filebeat/input/streaming/input_test.go | 2 +- x-pack/filebeat/input/streaming/websocket.go | 43 +++++++++++++------ 6 files changed, 63 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 86e0a3a3cafd..aa55d3834447 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -212,6 +212,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Update CEL mito extensions to v1.12.2. {pull}39755[39755] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] +- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index 7f07fb4954f6..85a7c02467af 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -316,7 +316,7 @@ This specifies whether fields should be replaced with a `*` or deleted entirely [float] ==== `retry` -The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. +The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. It also supports blanket retries and infinite retries via the `blanket_retires` and `infinite_retries` configuration options. These are set to `false` by default. ["source","yaml",subs="attributes"] ---- @@ -333,6 +333,8 @@ filebeat.inputs: max_attempts: 5 wait_min: 1s wait_max: 10s + blanket_retries: false + infinite_retries: false ---- [float] ==== `retry.max_attempts` @@ -349,6 +351,16 @@ The minimum time to wait between retries. This ensures that retries are spaced o The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. The default value is `30` seconds. +[float] +==== `retry.blanket_retries` + +Normally the input will only retry when a connection error is found to be retryable based on the error type and the RFC 6455 error codes defined by the websocket protocol. If `blanket_retries` is set to `true` (`false` by default) the input will retry on any error. This is not recommended unless the user is certain that all errors are transient and can be resolved by retrying. + +[float] +==== `retry.infinite_retries` + +Normally the input will only retry a maximum of `max_attempts` times. If `infinite_retries` is set to `true` (`false` by default) the input will retry indefinitely. This is not recommended unless the user is certain that the connection will eventually succeed. + [float] === `timeout` Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds. diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index eea8c2afc704..df557d553de2 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -59,9 +59,11 @@ type redact struct { } type retry struct { - MaxAttempts int `config:"max_attempts"` - WaitMin time.Duration `config:"wait_min"` - WaitMax time.Duration `config:"wait_max"` + MaxAttempts int `config:"max_attempts"` + WaitMin time.Duration `config:"wait_min"` + WaitMax time.Duration `config:"wait_max"` + BlanketRetries bool `config:"blanket_retries"` + InfiniteRetries bool `config:"infinite_retries"` } type authConfig struct { @@ -136,7 +138,7 @@ func (c config) Validate() error { if c.Retry != nil { switch { - case c.Retry.MaxAttempts <= 0: + case c.Retry.MaxAttempts <= 0 && !c.Retry.InfiniteRetries: return errors.New("max_attempts must be greater than zero") case c.Retry.WaitMin > c.Retry.WaitMax: return errors.New("wait_min must be less than or equal to wait_max") diff --git a/x-pack/filebeat/input/streaming/config_test.go b/x-pack/filebeat/input/streaming/config_test.go index 840c35d400fa..437267bc7b71 100644 --- a/x-pack/filebeat/input/streaming/config_test.go +++ b/x-pack/filebeat/input/streaming/config_test.go @@ -130,6 +130,18 @@ var configTests = []struct { "url": "wss://localhost:443/v1/stream", }, }, + { + name: "valid_retry_with_infinite", + config: map[string]interface{}{ + "retry": map[string]interface{}{ + "infinite_retries": true, + "max_attempts": 0, + "wait_min": "1s", + "wait_max": "2s", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, } func TestConfig(t *testing.T) { diff --git a/x-pack/filebeat/input/streaming/input_test.go b/x-pack/filebeat/input/streaming/input_test.go index c11784ea3dbf..e4a8eac1d417 100644 --- a/x-pack/filebeat/input/streaming/input_test.go +++ b/x-pack/filebeat/input/streaming/input_test.go @@ -450,7 +450,7 @@ var inputTests = []struct { "wait_max": "2s", }, }, - wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"), + wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake and (status 403)"), }, { name: "single_event_tls", diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index d21eb9c21b47..584852aabcce 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -118,7 +118,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { _, message, err := c.ReadMessage() if err != nil { s.metrics.errorsTotal.Inc() - if !isRetryableError(err) { + if !s.cfg.Retry.BlanketRetries && !isRetryableError(err) { s.log.Errorw("failed to read websocket data", "error", err) return err } @@ -233,21 +233,38 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log } if cfg.Retry != nil { retryConfig := cfg.Retry - for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { - conn, response, err = dialer.DialContext(ctx, url, headers) - if err == nil { - return conn, response, nil + if !retryConfig.InfiniteRetries { + for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { + conn, response, err = dialer.DialContext(ctx, url, headers) + if err == nil { + return conn, response, nil + } + //nolint:errorlint // it will never be a wrapped error at this point + if err == websocket.ErrBadHandshake { + log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) + } else { + log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode) + } + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) + time.Sleep(waitTime) } - //nolint:errorlint // it will never be a wrapped error at this point - if err == websocket.ErrBadHandshake { - log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) - continue + return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w and (status %d)", retryConfig.MaxAttempts, err, response.StatusCode) + } else { + for attempt := 1; ; attempt++ { + conn, response, err = dialer.DialContext(ctx, url, headers) + if err == nil { + return conn, response, nil + } + //nolint:errorlint // it will never be a wrapped error at this point + if err == websocket.ErrBadHandshake { + log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) + } else { + log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode) + } + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) + time.Sleep(waitTime) } - log.Debugf("attempt %d: webSocket connection failed. retrying...\n", attempt) - waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) - time.Sleep(waitTime) } - return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err) } return dialer.DialContext(ctx, url, headers)