From 0176f47aeda13a966f313b0ed9678dc1e638d30c Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 7 Jan 2025 09:58:16 +0530 Subject: [PATCH 1/8] added blanket & infinite retry options and impoved logging --- .../docs/inputs/input-streaming.asciidoc | 14 +++++- x-pack/filebeat/input/streaming/config.go | 10 +++-- .../filebeat/input/streaming/config_test.go | 12 ++++++ x-pack/filebeat/input/streaming/input_test.go | 2 +- x-pack/filebeat/input/streaming/websocket.go | 43 +++++++++++++------ 5 files changed, 62 insertions(+), 19 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index 7f07fb4954f..2b45fa8fada 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -316,7 +316,7 @@ This specifies whether fields should be replaced with a `*` or deleted entirely [float] ==== `retry` -The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. +The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. It also supports blanket retries and infinite retries via the `blanket_retires` and `infinite_retries` configuration options. These are set to `false` by default. ["source","yaml",subs="attributes"] ---- @@ -333,6 +333,8 @@ filebeat.inputs: max_attempts: 5 wait_min: 1s wait_max: 10s + blanket_retries: false + infinite_retries: false ---- [float] ==== `retry.max_attempts` @@ -349,6 +351,16 @@ The minimum time to wait between retries. This ensures that retries are spaced o The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. The default value is `30` seconds. +[float] +==== `retry.blanket_retries` + +Normally the input will only retry when a connection error is found to be retryable based on the error type the RFC 6455 error codes defined by the websocket protocol. If `blanket_retries` is set to `true` (`false` by default) the input will retry on any error. This is not recommended unless the user is certain that all errors are transient and can be resolved by retrying. + +[float] +==== `retry.infinite_retries` + +Normally the input will only retry a maximum of `max_attempts` times. If `infinite_retries` is set to `true` (`false` by default) the input will retry indefinitely. This is not recommended unless the user is certain that the connection will eventually succeed. + [float] === `timeout` Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds. diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index eea8c2afc70..df557d553de 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -59,9 +59,11 @@ type redact struct { } type retry struct { - MaxAttempts int `config:"max_attempts"` - WaitMin time.Duration `config:"wait_min"` - WaitMax time.Duration `config:"wait_max"` + MaxAttempts int `config:"max_attempts"` + WaitMin time.Duration `config:"wait_min"` + WaitMax time.Duration `config:"wait_max"` + BlanketRetries bool `config:"blanket_retries"` + InfiniteRetries bool `config:"infinite_retries"` } type authConfig struct { @@ -136,7 +138,7 @@ func (c config) Validate() error { if c.Retry != nil { switch { - case c.Retry.MaxAttempts <= 0: + case c.Retry.MaxAttempts <= 0 && !c.Retry.InfiniteRetries: return errors.New("max_attempts must be greater than zero") case c.Retry.WaitMin > c.Retry.WaitMax: return errors.New("wait_min must be less than or equal to wait_max") diff --git a/x-pack/filebeat/input/streaming/config_test.go b/x-pack/filebeat/input/streaming/config_test.go index 840c35d400f..437267bc7b7 100644 --- a/x-pack/filebeat/input/streaming/config_test.go +++ b/x-pack/filebeat/input/streaming/config_test.go @@ -130,6 +130,18 @@ var configTests = []struct { "url": "wss://localhost:443/v1/stream", }, }, + { + name: "valid_retry_with_infinite", + config: map[string]interface{}{ + "retry": map[string]interface{}{ + "infinite_retries": true, + "max_attempts": 0, + "wait_min": "1s", + "wait_max": "2s", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, } func TestConfig(t *testing.T) { diff --git a/x-pack/filebeat/input/streaming/input_test.go b/x-pack/filebeat/input/streaming/input_test.go index c11784ea3db..e4a8eac1d41 100644 --- a/x-pack/filebeat/input/streaming/input_test.go +++ b/x-pack/filebeat/input/streaming/input_test.go @@ -450,7 +450,7 @@ var inputTests = []struct { "wait_max": "2s", }, }, - wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"), + wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake and (status 403)"), }, { name: "single_event_tls", diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 225bc76e8d9..f75dd397072 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -118,7 +118,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { _, message, err := c.ReadMessage() if err != nil { s.metrics.errorsTotal.Inc() - if !isRetryableError(err) { + if !s.cfg.Retry.BlanketRetries && !isRetryableError(err) { s.log.Errorw("failed to read websocket data", "error", err) return err } @@ -230,21 +230,38 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log } if cfg.Retry != nil { retryConfig := cfg.Retry - for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { - conn, response, err = dialer.DialContext(ctx, url, headers) - if err == nil { - return conn, response, nil + if !retryConfig.InfiniteRetries { + for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { + conn, response, err = dialer.DialContext(ctx, url, headers) + if err == nil { + return conn, response, nil + } + //nolint:errorlint // it will never be a wrapped error at this point + if err == websocket.ErrBadHandshake { + log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) + } else { + log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode) + } + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) + time.Sleep(waitTime) } - //nolint:errorlint // it will never be a wrapped error at this point - if err == websocket.ErrBadHandshake { - log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) - continue + return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w and (status %d)", retryConfig.MaxAttempts, err, response.StatusCode) + } else { + for attempt := 1; ; attempt++ { + conn, response, err = dialer.DialContext(ctx, url, headers) + if err == nil { + return conn, response, nil + } + //nolint:errorlint // it will never be a wrapped error at this point + if err == websocket.ErrBadHandshake { + log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) + } else { + log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode) + } + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) + time.Sleep(waitTime) } - log.Debugf("attempt %d: webSocket connection failed. retrying...\n", attempt) - waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) - time.Sleep(waitTime) } - return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err) } return dialer.DialContext(ctx, url, headers) From a04b20ff24b09896e2f71ab3942b5f422ee7c540 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 15 Jan 2025 19:15:29 +0530 Subject: [PATCH 2/8] removed an unnecessary namespace variable for a debug log causing a benign error --- x-pack/filebeat/input/streaming/websocket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index f75dd397072..626b5e70529 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -140,7 +140,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { } s.metrics.receivedBytesTotal.Add(uint64(len(message))) state["response"] = message - s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message)) + s.log.Debugw("received websocket message", "msg", string(message)) err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) if err != nil { s.metrics.errorsTotal.Inc() From 339327b74e242ee8c33e3f781a9f1d048185fd0d Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 15 Jan 2025 19:20:20 +0530 Subject: [PATCH 3/8] updated changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6aff5931495..8799d24ed00 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -206,6 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] - Fix Netflow Template Sharing configuration handling. {pull}42080[42080] - Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218] +- Removed an unnecessary namespace variable for a debug log causing a benign error. {pull}42315[42315] *Heartbeat* From 9ab94bf5557ad04a71e2e45cb6ff2d66fe907155 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 15 Jan 2025 20:00:32 +0530 Subject: [PATCH 4/8] fixed a situation when the input would panic during shutdown --- CHANGELOG.next.asciidoc | 2 +- x-pack/filebeat/input/streaming/websocket.go | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8799d24ed00..5c418a5fc70 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -206,7 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] - Fix Netflow Template Sharing configuration handling. {pull}42080[42080] - Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218] -- Removed an unnecessary namespace variable for a debug log causing a benign error. {pull}42315[42315] +- Removed an unnecessary namespace variable & put a null check to stop paincs on shutdown. {pull}42315[42315] *Heartbeat* diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 6981d212ae0..d1348fbf5ba 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -154,9 +154,11 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { // ensures this is the last connection closed when the function returns defer func() { - if err := c.Close(); err != nil { - s.metrics.errorsTotal.Inc() - s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + if c != nil { + if err := c.Close(); err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + } } }() From fcb19b47065bc1b5de47a76265a463876792901c Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 16 Jan 2025 10:33:27 +0530 Subject: [PATCH 5/8] addressed Andrew's suggestions --- CHANGELOG.next.asciidoc | 2 +- x-pack/filebeat/input/streaming/crowdstrike.go | 4 ++-- x-pack/filebeat/input/streaming/websocket.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5c418a5fc70..c1921c01e05 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -206,7 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] - Fix Netflow Template Sharing configuration handling. {pull}42080[42080] - Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218] -- Removed an unnecessary namespace variable & put a null check to stop paincs on shutdown. {pull}42315[42315] +- In the streaming input made use of namespace consistent in logs & put a null check to stop paincs on shutdown. {pull}42315[42315] *Heartbeat* diff --git a/x-pack/filebeat/input/streaming/crowdstrike.go b/x-pack/filebeat/input/streaming/crowdstrike.go index eb1797d2f6d..7d062ce827b 100644 --- a/x-pack/filebeat/input/streaming/crowdstrike.go +++ b/x-pack/filebeat/input/streaming/crowdstrike.go @@ -156,7 +156,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, if err != nil { return state, Warning{fmt.Errorf("failed to decode discover body: %w", err)} } - s.log.Debugw("stream discover metadata", "meta", mapstr.M(body.Meta)) + s.log.Debugw("stream discover metadata", logp.Namespace(s.ns), "meta", mapstr.M(body.Meta)) var offset int if cursor, ok := state["cursor"].(map[string]any); ok { @@ -241,7 +241,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, } s.metrics.receivedBytesTotal.Add(uint64(len(msg))) state["response"] = []byte(msg) - s.log.Debugw("received firehose message", logp.Namespace("falcon_hose"), "msg", debugMsg(msg)) + s.log.Debugw("received firehose message", logp.Namespace(s.ns), "msg", debugMsg(msg)) err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) if err != nil { s.log.Errorw("failed to process and publish data", "error", err) diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index d1348fbf5ba..4d71b51bf71 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -219,7 +219,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { } s.metrics.receivedBytesTotal.Add(uint64(len(message))) state["response"] = message - s.log.Debugw("received websocket message", "msg", string(message)) + s.log.Debugw("received websocket message", logp.Namespace(s.ns), "msg", string(message)) err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) if err != nil { s.metrics.errorsTotal.Inc() @@ -296,7 +296,7 @@ func handleConnectionResponse(resp *http.Response, metrics *inputMetrics, log *l buf.WriteString("... truncated") } - log.Debugw("websocket connection response", "body", &buf) + log.Debugw("websocket connection response", "http.response.body.content", &buf) } } From 31503e8fd41dee10e23014c8c9b37501e3cb041b Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 16 Jan 2025 10:35:45 +0530 Subject: [PATCH 6/8] updated changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c1921c01e05..05aac5d798f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -206,7 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] - Fix Netflow Template Sharing configuration handling. {pull}42080[42080] - Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218] -- In the streaming input made use of namespace consistent in logs & put a null check to stop paincs on shutdown. {pull}42315[42315] +- In the streaming input made namespace consistent in logs & put a null check to stop paincs on shutdown. {pull}42315[42315] *Heartbeat* From 71d0403dc3151c4adad3eed310181622e27e0a87 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 16 Jan 2025 14:16:51 +0530 Subject: [PATCH 7/8] updated linter --- x-pack/filebeat/input/streaming/crowdstrike.go | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/filebeat/input/streaming/crowdstrike.go b/x-pack/filebeat/input/streaming/crowdstrike.go index 7d062ce827b..404e9322472 100644 --- a/x-pack/filebeat/input/streaming/crowdstrike.go +++ b/x-pack/filebeat/input/streaming/crowdstrike.go @@ -233,6 +233,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, err := dec.Decode(&msg) if err != nil { s.metrics.errorsTotal.Inc() + //nolint:errorlint // will not be a wrapped error here. if err == io.EOF { s.log.Info("stream ended, restarting") return state, nil From 68b89bd1cddf36196b645c2a6482ce7eb642ebd7 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 17 Jan 2025 10:51:33 +0530 Subject: [PATCH 8/8] updated changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 05aac5d798f..28cfe7665b5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -206,7 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] - Fix Netflow Template Sharing configuration handling. {pull}42080[42080] - Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218] -- In the streaming input made namespace consistent in logs & put a null check to stop paincs on shutdown. {pull}42315[42315] +- In the `streaming` input, prevent panics on shutdown with a null check and apply a consistent namespace to contextual data in debug logs. {pull}42315[42315] *Heartbeat*