diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3e825c8c0984..1daf853b75de 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -370,6 +370,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] - Refactor & cleanup with updates to default values and documentation. {pull}41834[41834] - Update CEL mito extensions to v1.16.0. {pull}41727[41727] +- Filebeat's registry is now added to the Elastic-Agent diagnostics bundle {issue}33238[33238] {pull}41795[41795] - Add `unifiedlogs` input for MacOS. {pull}41791[41791] - Add evaluation state dump debugging option to CEL input. {pull}41335[41335] - Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862] diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go new file mode 100644 index 000000000000..7dddd70084ad --- /dev/null +++ b/filebeat/beater/diagnostics.go @@ -0,0 +1,217 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "regexp" + "strings" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" +) + +func getRegexpsForRegistryFiles() ([]*regexp.Regexp, error) { + // We use regexps here because globs do not support specifying a character + // range like we do in the checkpoint file. + + registryFileRegExps := []*regexp.Regexp{} + preFilesList := [][]string{ + []string{"^registry$"}, + []string{"^registry", "filebeat$"}, + []string{"^registry", "filebeat", "meta\\.json$"}, + []string{"^registry", "filebeat", "log\\.json$"}, + []string{"^registry", "filebeat", "active\\.dat$"}, + []string{"^registry", "filebeat", "[[:digit:]]*\\.json$"}, + } + + for _, lst := range preFilesList { + var path string + if filepath.Separator == '\\' { + path = strings.Join(lst, `\\`) + } else { + path = filepath.Join(lst...) + } + + // Compile the reg exp, if there is an error, stop and return. + // There should be no error here as this code is tested in all + // supported OSes, however to avoid a code path that leads to a + // panic, we cannot use `regexp.MustCompile` and handle the error + re, err := regexp.Compile(path) + if err != nil { + return nil, fmt.Errorf("cannot compile reg exp: %w", err) + } + + registryFileRegExps = append(registryFileRegExps, re) + } + + return registryFileRegExps, nil +} + +func gzipRegistry() []byte { + logger := logp.L().Named("diagnostics") + buf := bytes.Buffer{} + dataPath := paths.Resolve(paths.Data, "") + registryPath := filepath.Join(dataPath, "registry") + f, err := os.CreateTemp("", "filebeat-registry-*.tar") + if err != nil { + logger.Errorw("cannot create temporary registry archive", "error.message", err) + } + // Close the file, we just need the empty file created to use it later + f.Close() + defer logger.Debug("finished gziping Filebeat's registry") + + defer func() { + if err := os.Remove(f.Name()); err != nil { + logp.L().Named("diagnostics").Warnf("cannot remove temporary registry archive '%s': '%s'", f.Name(), err) + } + }() + + logger.Debugf("temporary file '%s' created", f.Name()) + if err := tarFolder(logger, registryPath, f.Name()); err != nil { + logger.Errorw(fmt.Sprintf("cannot archive Filebeat's registry at '%s'", f.Name()), "error.message", err) + } + + if err := gzipFile(logger, f.Name(), &buf); err != nil { + logger.Errorw("cannot gzip Filebeat's registry", "error.message", err) + } + + // if the final file is too large, skip it + if buf.Len() >= 20_000_000 { // 20 Mb + logger.Warnf("registry is too large for diagnostics, %dmb bytes > 20mb", buf.Len()/1_000_000) + return nil + } + + return buf.Bytes() +} + +// gzipFile gzips src writing the compressed data to dst +func gzipFile(logger *logp.Logger, src string, dst io.Writer) error { + reader, err := os.Open(src) + if err != nil { + return fmt.Errorf("cannot open '%s': '%w'", src, err) + } + defer reader.Close() + + writer := gzip.NewWriter(dst) + defer writer.Close() + writer.Name = filepath.Base(src) + + if _, err := io.Copy(writer, reader); err != nil { + if err != nil { + return fmt.Errorf("cannot gzip file '%s': '%w'", src, err) + } + } + + return nil +} + +// tarFolder creates a tar archive from the folder src and stores it at dst. +// +// dst must be the full path with extension, e.g: /tmp/foo.tar +// If src is not a folder an error is retruned +func tarFolder(logger *logp.Logger, src, dst string) error { + fullPath, err := filepath.Abs(src) + if err != nil { + return fmt.Errorf("cannot get full path from '%s': '%w'", src, err) + } + + tarFile, err := os.Create(dst) + if err != nil { + return fmt.Errorf("cannot create tar file '%s': '%w'", dst, err) + } + defer tarFile.Close() + + tarWriter := tar.NewWriter(tarFile) + defer tarWriter.Close() + + info, err := os.Stat(fullPath) + if err != nil { + return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err) + } + + if !info.IsDir() { + return fmt.Errorf("'%s' is not a directory", fullPath) + } + baseDir := filepath.Base(src) + + logger.Debugf("starting to walk '%s'", fullPath) + + // This error should never happen at runtime, if something + // breaks it should break the tests and be fixed before a + // release. We handle the error here to avoid a code path + // that can end into a panic. + registryFileRegExps, err := getRegexpsForRegistryFiles() + if err != nil { + return err + } + + return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error { + // Stop if there is any errors + if prevErr != nil { + return prevErr + } + + pathInTar := filepath.Join(baseDir, strings.TrimPrefix(path, src)) + if !matchRegistyFiles(registryFileRegExps, pathInTar) { + return nil + } + header, err := tar.FileInfoHeader(info, info.Name()) + if err != nil { + return fmt.Errorf("cannot create tar info header: '%w'", err) + } + header.Name = pathInTar + + if err := tarWriter.WriteHeader(header); err != nil { + return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err) + } + + if info.IsDir() { + return nil + } + + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("cannot open '%s' for reading: '%w", path, err) + } + defer file.Close() + + logger.Debugf("adding '%s' to the tar archive", file.Name()) + if _, err := io.Copy(tarWriter, file); err != nil { + return fmt.Errorf("cannot read '%s': '%w'", path, err) + } + + return nil + }) +} + +func matchRegistyFiles(registryFileRegExps []*regexp.Regexp, path string) bool { + for _, regExp := range registryFileRegExps { + if regExp.MatchString(path) { + return true + } + } + return false +} diff --git a/filebeat/beater/diagnostics_test.go b/filebeat/beater/diagnostics_test.go new file mode 100644 index 000000000000..8f2f33d7034e --- /dev/null +++ b/filebeat/beater/diagnostics_test.go @@ -0,0 +1,66 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "fmt" + "path/filepath" + "testing" +) + +func TestMatchRegistryFiles(t *testing.T) { + positiveMatches := []string{ + filepath.Join("registry", "filebeat", "49855.json"), + filepath.Join("registry", "filebeat", "active.dat"), + filepath.Join("registry", "filebeat", "meta.json"), + filepath.Join("registry", "filebeat", "log.json"), + } + negativeMatches := []string{ + filepath.Join("registry", "filebeat", "bar.dat"), + filepath.Join("registry", "filebeat", "log.txt"), + filepath.Join("registry", "42.json"), + filepath.Join("nop", "active.dat"), + } + registryFileRegExps, err := getRegexpsForRegistryFiles() + if err != nil { + t.Fatalf("cannot compile regexps for registry paths: %s", err) + } + + testFn := func(t *testing.T, path string, match bool) { + result := matchRegistyFiles(registryFileRegExps, path) + if result != match { + t.Errorf( + "mathRegisryFiles('%s') should return %t, got %t instead", + path, + match, + result) + } + } + + for _, path := range positiveMatches { + t.Run(fmt.Sprintf("%s returns true", path), func(t *testing.T) { + testFn(t, path, true) + }) + } + + for _, path := range negativeMatches { + t.Run(fmt.Sprintf("%s returns false", path), func(t *testing.T) { + testFn(t, path, false) + }) + } +} diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 14e3ad79f557..fb8fab19dd7e 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -145,6 +145,13 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea } return data }) + + b.Manager.RegisterDiagnosticHook( + "registry", + "Filebeat's registry", + "registry.tar.gz", + "application/octet-stream", + gzipRegistry) } // Add inputs created by the modules diff --git a/x-pack/filebeat/input/benchmark/input.go b/x-pack/filebeat/input/benchmark/input.go index e098d3e746bc..4f9b64332f7f 100644 --- a/x-pack/filebeat/input/benchmark/input.go +++ b/x-pack/filebeat/input/benchmark/input.go @@ -158,7 +158,13 @@ type inputMetrics struct { // newInputMetrics returns an input metric for the benchmark processor. func newInputMetrics(ctx v2.Context) *inputMetrics { - reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, ctx.Agent.Monitoring.Namespace.GetRegistry()) + var globalRegistry *monitoring.Registry + // When running under Elastic-Agent Namespace can be nil. + // Passing a nil registry to inputmon.NewInputRegistry is not a problem. + if ctx.Agent.Monitoring.Namespace != nil { + globalRegistry = ctx.Agent.Monitoring.Namespace.GetRegistry() + } + reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, globalRegistry) out := &inputMetrics{ unregister: unreg, eventsPublished: monitoring.NewUint(reg, "events_published_total"), diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go new file mode 100644 index 000000000000..39d0ef2becfe --- /dev/null +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -0,0 +1,344 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "archive/tar" + "bufio" + "bytes" + "compress/gzip" + "encoding/json" + "errors" + "fmt" + "io" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +func TestFilestreamRegistryIsInDiagnostics(t *testing.T) { + filebeat := NewFilebeat(t) + logfile := filepath.Join(filebeat.TempDir(), "log.log") + integration.GenerateLogFile(t, logfile, 2, false) + input := proto.UnitExpected{ + Id: "input-" + t.Name(), + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "unit-filestream-" + t.Name(), + Type: "filestream", + Name: "Filestream-" + t.Name(), + Streams: []*proto.Stream{ + { + Id: "stream-filestream-" + t.Name(), + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "id": "stream-filestream-" + t.Name(), + "enabled": true, + "type": "filestream", + "paths": []interface{}{logfile}, + "file.identity": map[string]any{}, + "prospector.scanner.fingerprint": map[string]any{ + "enabled": false, + }, + }), + }, + }, + }, + } + + output := proto.UnitExpected{ + Id: "unit-output-" + t.Name(), + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "output-" + t.Name(), + Type: "file", + Name: "file", + Source: integration.RequireNewStruct(t, + map[string]interface{}{ + "type": "file", + "path": filebeat.TempDir(), + "filename": "output", + }), + }, + } + outputGlob := filepath.Join(filebeat.TempDir(), "output*") + + var units = []*proto.UnitExpected{ + &output, + &input, + } + + waitingForDiagnostics := atomic.Bool{} + testDone := make(chan struct{}) + + server := &mock.StubServerV2{ + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + ActionsChan: make(chan *mock.PerformAction), + SentActions: map[string]*mock.PerformAction{}, + } + + server.CheckinV2Impl = func(observed *proto.CheckinObserved) *proto.CheckinExpected { + // No matter the current state, we always return the same units + checkinExpected := proto.CheckinExpected{ + Units: units, + } + + // If any unit is not healthy, just return the expected state + for _, unit := range observed.Units { + if unit.GetState() != proto.State_HEALTHY { + return &checkinExpected + } + } + + // All units are healthy, we can request the diagnostics. + // Ensure we don't have any diagnostics being requested already. + if waitingForDiagnostics.CompareAndSwap(false, true) { + // Request the diagnostics asynchronously + go requestDiagnosticsAndVerifyRegistry(t, filebeat, outputGlob, logfile, 100, &waitingForDiagnostics, server, testDone, false) + } + return &checkinExpected + } + + if err := server.Start(); err != nil { + t.Fatalf("cannot start gRPC server: %s", err) + } + + filebeat.Start( + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + "-E", "queue.mem.flush.timeout=0", + ) + + <-testDone +} + +func TestEmptyegistryIsInDiagnostics(t *testing.T) { + filebeat := NewFilebeat(t) + input := proto.UnitExpected{ + Id: "input-" + t.Name(), + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "unit-filestream-" + t.Name(), + Type: "benchmark", + Name: "Benchmark-" + t.Name(), + Streams: []*proto.Stream{ + { + Id: "stream-benchmark-" + t.Name(), + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "id": "stream-benchmark-" + t.Name(), + "enabled": true, + "count": 2, + }), + }, + }, + }, + } + + output := proto.UnitExpected{ + Id: "unit-output-" + t.Name(), + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "output-" + t.Name(), + Type: "file", + Name: "file", + Source: integration.RequireNewStruct(t, + map[string]interface{}{ + "type": "file", + "path": filebeat.TempDir(), + "filename": "output", + }), + }, + } + outputGlob := filepath.Join(filebeat.TempDir(), "output*") + + var units = []*proto.UnitExpected{ + &output, + &input, + } + + waitingForDiagnostics := atomic.Bool{} + testDone := make(chan struct{}) + + server := &mock.StubServerV2{ + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + ActionsChan: make(chan *mock.PerformAction), + SentActions: map[string]*mock.PerformAction{}, + } + + server.CheckinV2Impl = func(observed *proto.CheckinObserved) *proto.CheckinExpected { + // No matter the current state, we always return the same units + checkinExpected := proto.CheckinExpected{ + Units: units, + } + + // If any unit is not healthy, just return the expected state + for _, unit := range observed.Units { + if unit.GetState() != proto.State_HEALTHY { + return &checkinExpected + } + } + + // All units are healthy, we can request the diagnostics. + // Ensure we don't have any diagnostics being requested already. + if waitingForDiagnostics.CompareAndSwap(false, true) { + // Request the diagnostics asynchronously + go requestDiagnosticsAndVerifyRegistry(t, filebeat, outputGlob, "", 0, &waitingForDiagnostics, server, testDone, true) + } + return &checkinExpected + } + + if err := server.Start(); err != nil { + t.Fatalf("cannot start gRPC server: %s", err) + } + + filebeat.Start( + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + "-E", "queue.mem.flush.timeout=0", + ) + + <-testDone +} + +func validateLastRegistryEntry(t *testing.T, reader io.Reader, expectedSize int, expectedPath string) { + t.Helper() + + sc := bufio.NewScanner(reader) + + var lastLine []byte + for sc.Scan() { + lastLine = sc.Bytes() + } + + entry := struct { + Data struct { + Meta struct { + Path string `json:"source"` + } `json:"meta"` + Cursor struct { + Offset int `json:"offset"` + } `json:"cursor"` + } `json:"v"` + }{} + + if err := json.Unmarshal(lastLine, &entry); err != nil { + t.Errorf("cannot unmarshal last registry entry: %s", err) + } + + if entry.Data.Meta.Path != expectedPath { + t.Errorf( + "expecting path in registry to be '%s', got '%s' instead", + expectedPath, + entry.Data.Meta.Path) + } + + if entry.Data.Cursor.Offset != expectedSize { + t.Errorf( + "expecting offset to be %d, got %d instead", + expectedSize, + entry.Data.Cursor.Offset) + } +} + +// requestDiagnosticsAndVerifyRegistry runs on a different goroutine, we cannot call t.Fatal +func requestDiagnosticsAndVerifyRegistry( + t *testing.T, + filebeat *integration.BeatProc, + outputGlob, + logfile string, + logfileOffset int, + waitingForDiagnostics *atomic.Bool, + server *mock.StubServerV2, + testDone chan<- struct{}, + emtpyRegistyLogFile bool) { + + assert.Eventuallyf(t, func() bool { + return filebeat.CountFileLines(outputGlob) == 2 + }, + 1*time.Minute, + 100*time.Millisecond, + "output file '%s' does not contain two events", outputGlob) + + // Once we're done, set it back to false + defer waitingForDiagnostics.Store(false) + server.ActionsChan <- &mock.PerformAction{ + Type: proto.ActionRequest_DIAGNOSTICS, + Name: "diagnostics", + Level: proto.ActionRequest_COMPONENT, // aka diagnostics for the whole Beat + DiagCallback: func(diagResults []*proto.ActionDiagnosticUnitResult, diagErr error) { + // Let the test finish when this callback finishes + defer func() { + testDone <- struct{}{} + }() + + if diagErr != nil { + t.Errorf("diagnostics failed: %s", diagErr) + return + } + + for _, dr := range diagResults { + if dr.Name != "registry" { + continue + } + + if len(dr.Content) == 0 { + t.Errorf("registry cannot be an empty file") + return + } + + gzipReader, err := gzip.NewReader(bytes.NewReader(dr.Content)) + if err != nil { + t.Errorf("cannot create gzip reader: '%s'", err) + return + } + defer gzipReader.Close() + + tarReader := tar.NewReader(gzipReader) + for { + header, err := tarReader.Next() + if errors.Is(err, io.EOF) { + t.Error("registry log file not found in tar archive") + return + } + + if header.Name != "registry/filebeat/log.json" { + continue + } + + if emtpyRegistyLogFile { + if header.Size != 0 { + t.Errorf("expecting registry log file to be empty, got %d bytes instead", header.Size) + } + return + } + + validateLastRegistryEntry(t, tarReader, logfileOffset, logfile) + return + } + } + t.Error("diagnostics do not contain a valid registry") + }, + } +}