diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6aff5931495..28cfe7665b5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -206,6 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] - Fix Netflow Template Sharing configuration handling. {pull}42080[42080] - Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218] +- In the `streaming` input, prevent panics on shutdown with a null check and apply a consistent namespace to contextual data in debug logs. {pull}42315[42315] *Heartbeat* diff --git a/x-pack/filebeat/input/streaming/crowdstrike.go b/x-pack/filebeat/input/streaming/crowdstrike.go index eb1797d2f6d..404e9322472 100644 --- a/x-pack/filebeat/input/streaming/crowdstrike.go +++ b/x-pack/filebeat/input/streaming/crowdstrike.go @@ -156,7 +156,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, if err != nil { return state, Warning{fmt.Errorf("failed to decode discover body: %w", err)} } - s.log.Debugw("stream discover metadata", "meta", mapstr.M(body.Meta)) + s.log.Debugw("stream discover metadata", logp.Namespace(s.ns), "meta", mapstr.M(body.Meta)) var offset int if cursor, ok := state["cursor"].(map[string]any); ok { @@ -233,6 +233,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, err := dec.Decode(&msg) if err != nil { s.metrics.errorsTotal.Inc() + //nolint:errorlint // will not be a wrapped error here. if err == io.EOF { s.log.Info("stream ended, restarting") return state, nil @@ -241,7 +242,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, } s.metrics.receivedBytesTotal.Add(uint64(len(msg))) state["response"] = []byte(msg) - s.log.Debugw("received firehose message", logp.Namespace("falcon_hose"), "msg", debugMsg(msg)) + s.log.Debugw("received firehose message", logp.Namespace(s.ns), "msg", debugMsg(msg)) err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) if err != nil { s.log.Errorw("failed to process and publish data", "error", err) diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index eeb89ad5c9b..4d71b51bf71 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -154,9 +154,11 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { // ensures this is the last connection closed when the function returns defer func() { - if err := c.Close(); err != nil { - s.metrics.errorsTotal.Inc() - s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + if c != nil { + if err := c.Close(); err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + } } }() @@ -217,7 +219,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { } s.metrics.receivedBytesTotal.Add(uint64(len(message))) state["response"] = message - s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message)) + s.log.Debugw("received websocket message", logp.Namespace(s.ns), "msg", string(message)) err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) if err != nil { s.metrics.errorsTotal.Inc() @@ -294,7 +296,7 @@ func handleConnectionResponse(resp *http.Response, metrics *inputMetrics, log *l buf.WriteString("... truncated") } - log.Debugw("websocket connection response", "body", &buf) + log.Debugw("websocket connection response", "http.response.body.content", &buf) } }