From 93e523f7a40aac0a18e7e906a58f3235088d2e74 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sat, 18 Jan 2025 12:20:09 +0530 Subject: [PATCH] [8.17](backport #42315) [streaming] - Made namespace consistent in logging & put a null check to stop paincs on shutdown (#42338) --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/streaming/crowdstrike.go | 5 +++-- x-pack/filebeat/input/streaming/websocket.go | 12 +++++++----- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f2cc1091257..8911b84ee95 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -115,6 +115,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Ensure netflow custom field configuration is applied. {issue}40735[40735] {pull}40730[40730] - Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015] - Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218] +- In the `streaming` input, prevent panics on shutdown with a null check and apply a consistent namespace to contextual data in debug logs. {pull}42315[42315] *Heartbeat* diff --git a/x-pack/filebeat/input/streaming/crowdstrike.go b/x-pack/filebeat/input/streaming/crowdstrike.go index 3fed6a69c1a..404e9322472 100644 --- a/x-pack/filebeat/input/streaming/crowdstrike.go +++ b/x-pack/filebeat/input/streaming/crowdstrike.go @@ -156,7 +156,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, if err != nil { return state, Warning{fmt.Errorf("failed to decode discover body: %w", err)} } - s.log.Debugw("stream discover metadata", "meta", mapstr.M(body.Meta)) + s.log.Debugw("stream discover metadata", logp.Namespace(s.ns), "meta", mapstr.M(body.Meta)) var offset int if cursor, ok := state["cursor"].(map[string]any); ok { @@ -233,6 +233,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, err := dec.Decode(&msg) if err != nil { s.metrics.errorsTotal.Inc() + //nolint:errorlint // will not be a wrapped error here. if err == io.EOF { s.log.Info("stream ended, restarting") return state, nil @@ -241,7 +242,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, } s.metrics.receivedBytesTotal.Add(uint64(len(msg))) state["response"] = []byte(msg) - s.log.Debugw("received firehose message", logp.Namespace("falcon_hose"), debugMsg(msg)) + s.log.Debugw("received firehose message", logp.Namespace(s.ns), "msg", debugMsg(msg)) err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) if err != nil { s.log.Errorw("failed to process and publish data", "error", err) diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index c1403ce8fd5..4d71b51bf71 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -154,9 +154,11 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { // ensures this is the last connection closed when the function returns defer func() { - if err := c.Close(); err != nil { - s.metrics.errorsTotal.Inc() - s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + if c != nil { + if err := c.Close(); err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + } } }() @@ -217,7 +219,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { } s.metrics.receivedBytesTotal.Add(uint64(len(message))) state["response"] = message - s.log.Debugw("received websocket message", logp.Namespace("websocket"), string(message)) + s.log.Debugw("received websocket message", logp.Namespace(s.ns), "msg", string(message)) err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) if err != nil { s.metrics.errorsTotal.Inc() @@ -294,7 +296,7 @@ func handleConnectionResponse(resp *http.Response, metrics *inputMetrics, log *l buf.WriteString("... truncated") } - log.Debugw("websocket connection response", "body", &buf) + log.Debugw("websocket connection response", "http.response.body.content", &buf) } }