From 862af719f357511058a586dc5fb33cb1b63c098a Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 26 Nov 2024 12:57:08 -0500 Subject: [PATCH 01/21] [WIP] Add registry to Filebeat's diagnostic --- filebeat/input/v2/compat/compat.go | 126 +++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index fde3f279233..23b42017309 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -21,9 +21,17 @@ package compat import ( + "archive/tar" + "bytes" + "compress/gzip" "context" "errors" "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "strings" "sync" "github.com/gofrs/uuid/v5" @@ -32,9 +40,11 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common/diagnostics" "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/go-concert/ctxtool" ) @@ -60,6 +70,7 @@ type runner struct { input v2.Input connector beat.PipelineConnector statusReporter status.StatusReporter + diag diagnostics.DiagnosticReporter } // RunnerFactory creates a cfgfile.RunnerFactory from an input Loader that is @@ -159,6 +170,121 @@ func (r *runner) Stop() { r.statusReporter = nil } +func getRegistry() []byte { + fmt.Println("================================================== getRegistry") + buf := bytes.Buffer{} + dataPath := paths.Resolve(paths.Data, "") + registryPath := filepath.Join(dataPath, "registry") + f, err := os.CreateTemp("", "filebeat-registry-*.tar") + if err != nil { + panic(err) + } + f.Close() + + defer func() { + if err := os.Remove(f.Name()); err != nil { + panic(err) + } + }() + + tarFolder(registryPath, f.Name()) + // tarFolder("/home/tiago/devel/beats/x-pack/filebeat/data/registry", "/tmp/registry.tar") + gzipFile(f.Name(), &buf) + + return buf.Bytes() +} + +func gzipFile(src string, dst io.Writer) error { + reader, err := os.Open(src) + if err != nil { + return fmt.Errorf("cannot open '%s': '%w'", src, err) + } + defer reader.Close() + + writer := gzip.NewWriter(dst) + defer writer.Close() + writer.Name = filepath.Base(src) + + if _, err := io.Copy(writer, reader); err != nil { + if err != nil { + fmt.Errorf("cannot gzip file '%s': '%w'", src, err) + } + } + + return nil +} + +// tarFolder creates a tar archive from the folder src and stores it at dst. +// +// dst must be the full path with extension, e.g: /tmp/foo.tar +// If src is not a folder an error is retruned +func tarFolder(src, dst string) error { + fullPath, err := filepath.Abs(src) + + fmt.Println("============================== src:", src) + fmt.Println("============================== fullPath:", fullPath) + fmt.Println("============================== dst:", dst) + if err != nil { + fmt.Errorf("cannot get full path from '%s': '%w'", src, err) + } + tarFile, err := os.Create(dst) + if err != nil { + return fmt.Errorf("cannot create tar file '%s': '%w'", dst, err) + } + defer tarFile.Close() + + tarWriter := tar.NewWriter(tarFile) + defer tarWriter.Close() + + info, err := os.Stat(fullPath) + if err != nil { + return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err) + } + + if !info.IsDir() { + return fmt.Errorf("'%s' is not a directory", fullPath) + } + baseDir := filepath.Base(src) + + return filepath.Walk(fullPath, func(path string, info fs.FileInfo, err error) error { + header, err := tar.FileInfoHeader(info, info.Name()) + header.Name = filepath.Join(baseDir, strings.TrimPrefix(path, src)) + + if err := tarWriter.WriteHeader(header); err != nil { + return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err) + } + + if info.IsDir() { + return nil + } + + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("cannot open '%s' for reading: '%w", path, err) + } + defer file.Close() + + if _, err := io.Copy(tarWriter, file); err != nil { + return fmt.Errorf("cannot read '%s': '%w'", path, err) + } + + return nil + }) +} + +func (r *runner) Diagnostics() []diagnostics.DiagnosticSetup { + fmt.Println("================================================== Diagnostics called!") + setup := diagnostics.DiagnosticSetup{ + Name: "registry collector", + Description: "Collect Filebeat's registry", + Filename: "registry.tar.gz", + ContentType: "application/octet-stream", + Callback: getRegistry, + } + + return []diagnostics.DiagnosticSetup{setup} +} + func configID(config *conf.C) (string, error) { tmp := struct { ID string `config:"id"` From d8d42a4d27d50f78548024f8b391ce2340fc0a6d Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 26 Nov 2024 15:06:43 -0500 Subject: [PATCH 02/21] Move the hook to a better place --- filebeat/beater/diagnostics.go | 117 +++++++++++++++++++++++++ filebeat/beater/filebeat.go | 7 ++ filebeat/input/v2/compat/compat.go | 135 +++-------------------------- 3 files changed, 136 insertions(+), 123 deletions(-) create mode 100644 filebeat/beater/diagnostics.go diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go new file mode 100644 index 00000000000..df6206b65f0 --- /dev/null +++ b/filebeat/beater/diagnostics.go @@ -0,0 +1,117 @@ +package beater + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + + "github.com/elastic/elastic-agent-libs/paths" +) + +func getRegistry() []byte { + fmt.Println("================================================== getRegistry") + buf := bytes.Buffer{} + dataPath := paths.Resolve(paths.Data, "") + registryPath := filepath.Join(dataPath, "registry") + f, err := os.CreateTemp("", "filebeat-registry-*.tar") + if err != nil { + panic(err) + } + f.Close() + + defer func() { + if err := os.Remove(f.Name()); err != nil { + panic(err) + } + }() + + tarFolder(registryPath, f.Name()) + // tarFolder("/home/tiago/devel/beats/x-pack/filebeat/data/registry", "/tmp/registry.tar") + gzipFile(f.Name(), &buf) + + return buf.Bytes() +} + +func gzipFile(src string, dst io.Writer) error { + reader, err := os.Open(src) + if err != nil { + return fmt.Errorf("cannot open '%s': '%w'", src, err) + } + defer reader.Close() + + writer := gzip.NewWriter(dst) + defer writer.Close() + writer.Name = filepath.Base(src) + + if _, err := io.Copy(writer, reader); err != nil { + if err != nil { + fmt.Errorf("cannot gzip file '%s': '%w'", src, err) + } + } + + return nil +} + +// tarFolder creates a tar archive from the folder src and stores it at dst. +// +// dst must be the full path with extension, e.g: /tmp/foo.tar +// If src is not a folder an error is retruned +func tarFolder(src, dst string) error { + fullPath, err := filepath.Abs(src) + + fmt.Println("============================== src:", src) + fmt.Println("============================== fullPath:", fullPath) + fmt.Println("============================== dst:", dst) + if err != nil { + fmt.Errorf("cannot get full path from '%s': '%w'", src, err) + } + tarFile, err := os.Create(dst) + if err != nil { + return fmt.Errorf("cannot create tar file '%s': '%w'", dst, err) + } + defer tarFile.Close() + + tarWriter := tar.NewWriter(tarFile) + defer tarWriter.Close() + + info, err := os.Stat(fullPath) + if err != nil { + return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err) + } + + if !info.IsDir() { + return fmt.Errorf("'%s' is not a directory", fullPath) + } + baseDir := filepath.Base(src) + + return filepath.Walk(fullPath, func(path string, info fs.FileInfo, err error) error { + header, err := tar.FileInfoHeader(info, info.Name()) + header.Name = filepath.Join(baseDir, strings.TrimPrefix(path, src)) + + if err := tarWriter.WriteHeader(header); err != nil { + return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err) + } + + if info.IsDir() { + return nil + } + + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("cannot open '%s' for reading: '%w", path, err) + } + defer file.Close() + + if _, err := io.Copy(tarWriter, file); err != nil { + return fmt.Errorf("cannot read '%s': '%w'", path, err) + } + + return nil + }) +} diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 815b6fabfde..7ae089d1ff1 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -144,6 +144,13 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea } return data }) + + b.Manager.RegisterDiagnosticHook( + "registry", + "Filebeat's registry", + "registry.tar.gz", + "application/octet-stream", + getRegistry) } // Add inputs created by the modules diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index 23b42017309..a289f878c4d 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -21,17 +21,9 @@ package compat import ( - "archive/tar" - "bytes" - "compress/gzip" "context" "errors" "fmt" - "io" - "io/fs" - "os" - "path/filepath" - "strings" "sync" "github.com/gofrs/uuid/v5" @@ -44,7 +36,6 @@ import ( "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/go-concert/ctxtool" ) @@ -170,120 +161,18 @@ func (r *runner) Stop() { r.statusReporter = nil } -func getRegistry() []byte { - fmt.Println("================================================== getRegistry") - buf := bytes.Buffer{} - dataPath := paths.Resolve(paths.Data, "") - registryPath := filepath.Join(dataPath, "registry") - f, err := os.CreateTemp("", "filebeat-registry-*.tar") - if err != nil { - panic(err) - } - f.Close() - - defer func() { - if err := os.Remove(f.Name()); err != nil { - panic(err) - } - }() - - tarFolder(registryPath, f.Name()) - // tarFolder("/home/tiago/devel/beats/x-pack/filebeat/data/registry", "/tmp/registry.tar") - gzipFile(f.Name(), &buf) - - return buf.Bytes() -} - -func gzipFile(src string, dst io.Writer) error { - reader, err := os.Open(src) - if err != nil { - return fmt.Errorf("cannot open '%s': '%w'", src, err) - } - defer reader.Close() - - writer := gzip.NewWriter(dst) - defer writer.Close() - writer.Name = filepath.Base(src) - - if _, err := io.Copy(writer, reader); err != nil { - if err != nil { - fmt.Errorf("cannot gzip file '%s': '%w'", src, err) - } - } - - return nil -} - -// tarFolder creates a tar archive from the folder src and stores it at dst. -// -// dst must be the full path with extension, e.g: /tmp/foo.tar -// If src is not a folder an error is retruned -func tarFolder(src, dst string) error { - fullPath, err := filepath.Abs(src) - - fmt.Println("============================== src:", src) - fmt.Println("============================== fullPath:", fullPath) - fmt.Println("============================== dst:", dst) - if err != nil { - fmt.Errorf("cannot get full path from '%s': '%w'", src, err) - } - tarFile, err := os.Create(dst) - if err != nil { - return fmt.Errorf("cannot create tar file '%s': '%w'", dst, err) - } - defer tarFile.Close() - - tarWriter := tar.NewWriter(tarFile) - defer tarWriter.Close() - - info, err := os.Stat(fullPath) - if err != nil { - return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err) - } - - if !info.IsDir() { - return fmt.Errorf("'%s' is not a directory", fullPath) - } - baseDir := filepath.Base(src) - - return filepath.Walk(fullPath, func(path string, info fs.FileInfo, err error) error { - header, err := tar.FileInfoHeader(info, info.Name()) - header.Name = filepath.Join(baseDir, strings.TrimPrefix(path, src)) - - if err := tarWriter.WriteHeader(header); err != nil { - return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err) - } - - if info.IsDir() { - return nil - } - - file, err := os.Open(path) - if err != nil { - return fmt.Errorf("cannot open '%s' for reading: '%w", path, err) - } - defer file.Close() - - if _, err := io.Copy(tarWriter, file); err != nil { - return fmt.Errorf("cannot read '%s': '%w'", path, err) - } - - return nil - }) -} - -func (r *runner) Diagnostics() []diagnostics.DiagnosticSetup { - fmt.Println("================================================== Diagnostics called!") - setup := diagnostics.DiagnosticSetup{ - Name: "registry collector", - Description: "Collect Filebeat's registry", - Filename: "registry.tar.gz", - ContentType: "application/octet-stream", - Callback: getRegistry, - } - - return []diagnostics.DiagnosticSetup{setup} -} +// func (r *runner) Diagnostics() []diagnostics.DiagnosticSetup { +// fmt.Println("================================================== Diagnostics called!") +// setup := diagnostics.DiagnosticSetup{ +// Name: "registry collector", +// Description: "Collect Filebeat's registry", +// Filename: "registry.tar.gz", +// ContentType: "application/octet-stream", +// Callback: getRegistry, +// } + +// return []diagnostics.DiagnosticSetup{setup} +// } func configID(config *conf.C) (string, error) { tmp := struct { From 65fba5983ba972d6c54d42cd485e71bcafef3911 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 26 Nov 2024 17:41:09 -0500 Subject: [PATCH 03/21] Add integration tests --- filebeat/beater/diagnostics.go | 17 ++ .../integration/registrydiagnostics_test.go | 222 ++++++++++++++++++ 2 files changed, 239 insertions(+) create mode 100644 x-pack/filebeat/tests/integration/registrydiagnostics_test.go diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index df6206b65f0..9dc0b537cda 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package beater import ( diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go new file mode 100644 index 00000000000..07b56bb8a75 --- /dev/null +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -0,0 +1,222 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package integration + +import ( + "archive/tar" + "bufio" + "bytes" + "compress/gzip" + "encoding/json" + "errors" + "fmt" + "io" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +func TestRegistryIsInDiagnostics(t *testing.T) { + filebeat := NewFilebeat(t) + logfile := filepath.Join(filebeat.TempDir(), "log.log") + integration.GenerateLogFile(t, logfile, 2, false) + input := proto.UnitExpected{ + Id: "input-" + t.Name(), + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "unit-filestream-" + t.Name(), + Type: "filestream", + Name: "Filestream-" + t.Name(), + Streams: []*proto.Stream{ + { + Id: "stream-filestream-" + t.Name(), + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "id": "stream-filestream-" + t.Name(), + "enabled": true, + "type": "filestream", + "paths": []interface{}{logfile}, + }), + }, + }, + }, + } + + output := proto.UnitExpected{ + Id: "unit-output-" + t.Name(), + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "output-" + t.Name(), + Type: "file", + Name: "file", + Source: integration.RequireNewStruct(t, + map[string]interface{}{ + "type": "file", + "path": filebeat.TempDir(), + "filename": "output", + }), + }, + } + outputGlob := filepath.Join(filebeat.TempDir(), "output*") + + var units = []*proto.UnitExpected{ + &output, + &input, + } + + waitingForDiagnostics := atomic.Bool{} + testDone := make(chan struct{}) + + server := &mock.StubServerV2{ + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + ActionsChan: make(chan *mock.PerformAction), + SentActions: map[string]*mock.PerformAction{}, + } + + requestDiagnostics := func() { + require.Eventuallyf(t, func() bool { + lines := filebeat.CountFileLines(outputGlob) + if lines == 2 { + return true + } + + return false + }, + 1*time.Minute, + 100*time.Millisecond, + "output file '%s' does not contain two events", outputGlob) + + // Once we're done, set it back to false + defer waitingForDiagnostics.Store(false) + server.ActionsChan <- &mock.PerformAction{ + Type: proto.ActionRequest_DIAGNOSTICS, + Name: "diagnostics", + Level: proto.ActionRequest_COMPONENT, // aka diagnostics for the whole Beat + DiagCallback: func(diagResults []*proto.ActionDiagnosticUnitResult, diagErr error) { + if diagErr != nil { + t.Fatalf("diagnostics failed: %s", diagErr) + } + + for _, dr := range diagResults { + if dr.Name != "registry" { + continue + } + + if len(dr.Content) == 0 { + t.Fatalf("registry cannot be an empty file") + } + + gzipReader, err := gzip.NewReader(bytes.NewReader(dr.Content)) + if err != nil { + t.Fatalf("cannot create gzip reader: '%s'", err) + } + defer gzipReader.Close() + + tarReader := tar.NewReader(gzipReader) + for { + header, err := tarReader.Next() + if errors.Is(err, io.EOF) { + return + } + + if header.Name != "registry/filebeat/log.json" { + continue + } + + validateLastRegistryEntry(t, tarReader, 100, logfile) + testDone <- struct{}{} + } + } + }, + } + } + + server.CheckinV2Impl = func(observed *proto.CheckinObserved) *proto.CheckinExpected { + // No matter the current state, we always return the same units + checkinExpected := proto.CheckinExpected{ + Units: units, + } + + // If any unit is not healthy, just return the expected state + for _, unit := range observed.Units { + if unit.GetState() != proto.State_HEALTHY { + return &checkinExpected + } + } + + // All units are healthy, we can request the diagnostics. + // Ensure we don't have any diagnostics being requested already. + if waitingForDiagnostics.CompareAndSwap(false, true) { + // Request the diagnostics asynchronously + go requestDiagnostics() + } + return &checkinExpected + } + + server.Port = 3000 + if err := server.Start(); err != nil { + t.Fatal(err) + } + + filebeat.Start( + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + "-E", "queue.mem.flush.timeout=0", + ) + + <-testDone +} + +func validateLastRegistryEntry(t *testing.T, reader io.Reader, expectedSize int, expectedPath string) { + t.Helper() + + sc := bufio.NewScanner(reader) + + var lastLine []byte + for sc.Scan() { + lastLine = sc.Bytes() + } + + entry := struct { + Data struct { + Meta struct { + Path string `json:"source"` + } `json:"meta"` + Cursor struct { + Offset int `json:"offset"` + } `json:"cursor"` + } `json:"v"` + }{} + + if err := json.Unmarshal(lastLine, &entry); err != nil { + t.Fatalf("cannot unmarshal last registry entry: %s", err) + } + + if entry.Data.Meta.Path != expectedPath { + t.Errorf( + "expecting path in registry to be '%s', got '%s' instead", + expectedPath, + entry.Data.Meta.Path) + } + + if entry.Data.Cursor.Offset != expectedSize { + t.Errorf( + "expecting offset to be %d, got %d instead", + expectedSize, + entry.Data.Cursor.Offset) + } +} From 4c833851d3a3abaea79a7bcd9edad4b3302d1010 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 27 Nov 2024 08:49:03 -0500 Subject: [PATCH 04/21] Add build tag --- x-pack/filebeat/tests/integration/registrydiagnostics_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go index 07b56bb8a75..18195b9a3d5 100644 --- a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -2,6 +2,8 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. +//go:build integration + package integration import ( From 01774edca959c9624b3171b1b451a89fc89b9fbd Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 27 Nov 2024 08:53:50 -0500 Subject: [PATCH 05/21] Remove temporary file --- filebeat/beater/diagnostics.go | 16 +++++++++------- filebeat/beater/filebeat.go | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 9dc0b537cda..6b2f1ccc3c3 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -28,11 +28,11 @@ import ( "path/filepath" "strings" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/paths" ) -func getRegistry() []byte { - fmt.Println("================================================== getRegistry") +func gzipRegistry() []byte { buf := bytes.Buffer{} dataPath := paths.Resolve(paths.Data, "") registryPath := filepath.Join(dataPath, "registry") @@ -41,6 +41,11 @@ func getRegistry() []byte { panic(err) } f.Close() + defer func() { + if err := os.Remove(f.Name()); err != nil { + logp.L().Named("diagnostics").Warnf("cannot remove temporary registry archive '%s': '%w'", f.Name(), err) + } + }() defer func() { if err := os.Remove(f.Name()); err != nil { @@ -49,12 +54,12 @@ func getRegistry() []byte { }() tarFolder(registryPath, f.Name()) - // tarFolder("/home/tiago/devel/beats/x-pack/filebeat/data/registry", "/tmp/registry.tar") gzipFile(f.Name(), &buf) return buf.Bytes() } +// gzipFile gzips src writing the compressed data to dst func gzipFile(src string, dst io.Writer) error { reader, err := os.Open(src) if err != nil { @@ -81,13 +86,10 @@ func gzipFile(src string, dst io.Writer) error { // If src is not a folder an error is retruned func tarFolder(src, dst string) error { fullPath, err := filepath.Abs(src) - - fmt.Println("============================== src:", src) - fmt.Println("============================== fullPath:", fullPath) - fmt.Println("============================== dst:", dst) if err != nil { fmt.Errorf("cannot get full path from '%s': '%w'", src, err) } + tarFile, err := os.Create(dst) if err != nil { return fmt.Errorf("cannot create tar file '%s': '%w'", dst, err) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 7ae089d1ff1..14b53446c99 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -150,7 +150,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea "Filebeat's registry", "registry.tar.gz", "application/octet-stream", - getRegistry) + gzipRegistry) } // Add inputs created by the modules From 0e36f68ee4309c15c1694681d09d18a73fa75794 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 27 Nov 2024 08:56:14 -0500 Subject: [PATCH 06/21] fix returning errors --- filebeat/beater/diagnostics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 6b2f1ccc3c3..6efa0abdeb0 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -73,7 +73,7 @@ func gzipFile(src string, dst io.Writer) error { if _, err := io.Copy(writer, reader); err != nil { if err != nil { - fmt.Errorf("cannot gzip file '%s': '%w'", src, err) + return fmt.Errorf("cannot gzip file '%s': '%w'", src, err) } } @@ -87,7 +87,7 @@ func gzipFile(src string, dst io.Writer) error { func tarFolder(src, dst string) error { fullPath, err := filepath.Abs(src) if err != nil { - fmt.Errorf("cannot get full path from '%s': '%w'", src, err) + return fmt.Errorf("cannot get full path from '%s': '%w'", src, err) } tarFile, err := os.Create(dst) From a8c5f4f72460e1432e62811aafd95734018b83d2 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 27 Nov 2024 10:07:16 -0500 Subject: [PATCH 07/21] Improve error handling and fix lint issues --- filebeat/beater/diagnostics.go | 15 +++++++++++++-- filebeat/input/v2/compat/compat.go | 2 -- .../tests/integration/registrydiagnostics_test.go | 9 ++------- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 6efa0abdeb0..1eb283ec08e 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -33,6 +33,7 @@ import ( ) func gzipRegistry() []byte { + logger := logp.L().Named("diagnostics") buf := bytes.Buffer{} dataPath := paths.Resolve(paths.Data, "") registryPath := filepath.Join(dataPath, "registry") @@ -53,8 +54,13 @@ func gzipRegistry() []byte { } }() - tarFolder(registryPath, f.Name()) - gzipFile(f.Name(), &buf) + if err := tarFolder(registryPath, f.Name()); err != nil { + logger.Errorw(fmt.Sprintf("cannot archive Filebeat's registry at '%s'", f.Name()), "error.message", err) + } + + if err := gzipFile(f.Name(), &buf); err != nil { + logger.Errorw("cannot gzip Filebeat's registry", "error.message", err) + } return buf.Bytes() } @@ -110,6 +116,11 @@ func tarFolder(src, dst string) error { baseDir := filepath.Base(src) return filepath.Walk(fullPath, func(path string, info fs.FileInfo, err error) error { + // Stop if there is any errors + if err != nil { + return err + } + header, err := tar.FileInfoHeader(info, info.Name()) header.Name = filepath.Join(baseDir, strings.TrimPrefix(path, src)) diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index a289f878c4d..37c2484adaf 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -32,7 +32,6 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" - "github.com/elastic/beats/v7/libbeat/common/diagnostics" "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -61,7 +60,6 @@ type runner struct { input v2.Input connector beat.PipelineConnector statusReporter status.StatusReporter - diag diagnostics.DiagnosticReporter } // RunnerFactory creates a cfgfile.RunnerFactory from an input Loader that is diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go index 18195b9a3d5..df14dacd32d 100644 --- a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -91,12 +91,7 @@ func TestRegistryIsInDiagnostics(t *testing.T) { requestDiagnostics := func() { require.Eventuallyf(t, func() bool { - lines := filebeat.CountFileLines(outputGlob) - if lines == 2 { - return true - } - - return false + return filebeat.CountFileLines(outputGlob) == 2 }, 1*time.Minute, 100*time.Millisecond, @@ -171,7 +166,7 @@ func TestRegistryIsInDiagnostics(t *testing.T) { server.Port = 3000 if err := server.Start(); err != nil { - t.Fatal(err) + t.Error(err) } filebeat.Start( From df755707d08020d53d5f985e4f971f4a7671f4a0 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 27 Nov 2024 11:06:05 -0500 Subject: [PATCH 08/21] Improve logging --- filebeat/beater/diagnostics.go | 22 +++++++++---------- .../integration/registrydiagnostics_test.go | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 1eb283ec08e..79b14fb432d 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -39,26 +39,24 @@ func gzipRegistry() []byte { registryPath := filepath.Join(dataPath, "registry") f, err := os.CreateTemp("", "filebeat-registry-*.tar") if err != nil { - panic(err) + logger.Errorw("cannot create temporary registry archive", "error.message", err) } + // Close the file, we just need the empty file created to use it later f.Close() - defer func() { - if err := os.Remove(f.Name()); err != nil { - logp.L().Named("diagnostics").Warnf("cannot remove temporary registry archive '%s': '%w'", f.Name(), err) - } - }() + defer logger.Debug("finished gziping Filebeat's registry") defer func() { if err := os.Remove(f.Name()); err != nil { - panic(err) + logp.L().Named("diagnostics").Warnf("cannot remove temporary registry archive '%s': '%s'", f.Name(), err) } }() - if err := tarFolder(registryPath, f.Name()); err != nil { + logger.Debugf("temporary file '%s' created", f.Name()) + if err := tarFolder(logger, registryPath, f.Name()); err != nil { logger.Errorw(fmt.Sprintf("cannot archive Filebeat's registry at '%s'", f.Name()), "error.message", err) } - if err := gzipFile(f.Name(), &buf); err != nil { + if err := gzipFile(logger, f.Name(), &buf); err != nil { logger.Errorw("cannot gzip Filebeat's registry", "error.message", err) } @@ -66,7 +64,7 @@ func gzipRegistry() []byte { } // gzipFile gzips src writing the compressed data to dst -func gzipFile(src string, dst io.Writer) error { +func gzipFile(logger *logp.Logger, src string, dst io.Writer) error { reader, err := os.Open(src) if err != nil { return fmt.Errorf("cannot open '%s': '%w'", src, err) @@ -90,7 +88,7 @@ func gzipFile(src string, dst io.Writer) error { // // dst must be the full path with extension, e.g: /tmp/foo.tar // If src is not a folder an error is retruned -func tarFolder(src, dst string) error { +func tarFolder(logger *logp.Logger, src, dst string) error { fullPath, err := filepath.Abs(src) if err != nil { return fmt.Errorf("cannot get full path from '%s': '%w'", src, err) @@ -115,6 +113,7 @@ func tarFolder(src, dst string) error { } baseDir := filepath.Base(src) + logger.Debugf("starting to walk '%s'", fullPath) return filepath.Walk(fullPath, func(path string, info fs.FileInfo, err error) error { // Stop if there is any errors if err != nil { @@ -138,6 +137,7 @@ func tarFolder(src, dst string) error { } defer file.Close() + logger.Debugf("adding '%s' to the tar archive", file.Name()) if _, err := io.Copy(tarWriter, file); err != nil { return fmt.Errorf("cannot read '%s': '%w'", path, err) } diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go index df14dacd32d..3d28d983b16 100644 --- a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -60,7 +60,7 @@ func TestRegistryIsInDiagnostics(t *testing.T) { Type: proto.UnitType_OUTPUT, ConfigStateIdx: 1, State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_INFO, + LogLevel: proto.UnitLogLevel_DEBUG, Config: &proto.UnitExpectedConfig{ Id: "output-" + t.Name(), Type: "file", From d51565a9dd3db27f5cbc5b38627d82503aa38c34 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 27 Nov 2024 11:32:16 -0500 Subject: [PATCH 09/21] Fix error handling --- filebeat/beater/diagnostics.go | 7 +++++-- .../tests/integration/registrydiagnostics_test.go | 12 ++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 79b14fb432d..4dfb4b50d81 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -114,13 +114,16 @@ func tarFolder(logger *logp.Logger, src, dst string) error { baseDir := filepath.Base(src) logger.Debugf("starting to walk '%s'", fullPath) - return filepath.Walk(fullPath, func(path string, info fs.FileInfo, err error) error { + return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error { // Stop if there is any errors - if err != nil { + if prevErr != nil { return err } header, err := tar.FileInfoHeader(info, info.Name()) + if err != nil { + return fmt.Errorf("cannot create tar info header: '%w'", err) + } header.Name = filepath.Join(baseDir, strings.TrimPrefix(path, src)) if err := tarWriter.WriteHeader(header); err != nil { diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go index 3d28d983b16..d8a72787618 100644 --- a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -20,11 +20,10 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/libbeat/tests/integration" "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/stretchr/testify/assert" ) func TestRegistryIsInDiagnostics(t *testing.T) { @@ -89,8 +88,9 @@ func TestRegistryIsInDiagnostics(t *testing.T) { SentActions: map[string]*mock.PerformAction{}, } + // requirerequestDiagnostics runs on a different goroutine, we cannot call t.Fatal requestDiagnostics := func() { - require.Eventuallyf(t, func() bool { + assert.Eventuallyf(t, func() bool { return filebeat.CountFileLines(outputGlob) == 2 }, 1*time.Minute, @@ -105,7 +105,7 @@ func TestRegistryIsInDiagnostics(t *testing.T) { Level: proto.ActionRequest_COMPONENT, // aka diagnostics for the whole Beat DiagCallback: func(diagResults []*proto.ActionDiagnosticUnitResult, diagErr error) { if diagErr != nil { - t.Fatalf("diagnostics failed: %s", diagErr) + t.Errorf("diagnostics failed: %s", diagErr) } for _, dr := range diagResults { @@ -114,12 +114,12 @@ func TestRegistryIsInDiagnostics(t *testing.T) { } if len(dr.Content) == 0 { - t.Fatalf("registry cannot be an empty file") + t.Errorf("registry cannot be an empty file") } gzipReader, err := gzip.NewReader(bytes.NewReader(dr.Content)) if err != nil { - t.Fatalf("cannot create gzip reader: '%s'", err) + t.Errorf("cannot create gzip reader: '%s'", err) } defer gzipReader.Close() From 7a4ce56ac9f584ad788bf5a408798f6ada237816 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 27 Nov 2024 14:51:19 -0500 Subject: [PATCH 10/21] Fix error handling --- filebeat/beater/diagnostics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 4dfb4b50d81..5b58d8a4e2d 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -117,7 +117,7 @@ func tarFolder(logger *logp.Logger, src, dst string) error { return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error { // Stop if there is any errors if prevErr != nil { - return err + return prevErr } header, err := tar.FileInfoHeader(info, info.Name()) From 74c042a83e26262c5ba0c3663fb1793001e4851c Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 27 Nov 2024 14:53:19 -0500 Subject: [PATCH 11/21] Add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d8aca419dda..cac808f4551 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -345,6 +345,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] - Update CEL mito extensions to v1.16.0. {pull}41727[41727] +- Filebeat's registry is now added to the Elastic-Agent diagnostics bundle {issue}33238[33238] {pull}41795[41795] *Auditbeat* From 2ec1ae65c12f3c043ec5571f2b476fd10f5a24b1 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 19 Dec 2024 11:11:32 -0500 Subject: [PATCH 12/21] Ensure we only get the registry files --- filebeat/beater/diagnostics.go | 27 +++++++++++- filebeat/beater/diagnostics_test.go | 44 +++++++++++++++++++ filebeat/input/v2/compat/compat.go | 13 ------ .../integration/registrydiagnostics_test.go | 6 ++- 4 files changed, 74 insertions(+), 16 deletions(-) create mode 100644 filebeat/beater/diagnostics_test.go diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 5b58d8a4e2d..91ed7b20426 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -26,6 +26,7 @@ import ( "io/fs" "os" "path/filepath" + "regexp" "strings" "github.com/elastic/elastic-agent-libs/logp" @@ -120,11 +121,15 @@ func tarFolder(logger *logp.Logger, src, dst string) error { return prevErr } + pathInTar := filepath.Join(baseDir, strings.TrimPrefix(path, src)) + if !matchRegistyFiles(pathInTar) { + return nil + } header, err := tar.FileInfoHeader(info, info.Name()) if err != nil { return fmt.Errorf("cannot create tar info header: '%w'", err) } - header.Name = filepath.Join(baseDir, strings.TrimPrefix(path, src)) + header.Name = pathInTar if err := tarWriter.WriteHeader(header); err != nil { return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err) @@ -148,3 +153,23 @@ func tarFolder(logger *logp.Logger, src, dst string) error { return nil }) } + +// We use regexps here because globs do not support specifying a character +// range like we do in the checkpoint file +var registryFileRegExps = []*regexp.Regexp{ + regexp.MustCompile(filepath.Join([]string{"^registry$"}...)), + regexp.MustCompile(filepath.Join([]string{"^registry", "filebeat$"}...)), + regexp.MustCompile(filepath.Join([]string{"^registry", "filebeat", "meta\\.json$"}...)), + regexp.MustCompile(filepath.Join([]string{"^registry", "filebeat", "log\\.json$"}...)), + regexp.MustCompile(filepath.Join([]string{"^registry", "filebeat", "active\\.dat$"}...)), + regexp.MustCompile(filepath.Join([]string{"^registry", "filebeat", "[[:digit:]]*\\.json$"}...)), +} + +func matchRegistyFiles(path string) bool { + for _, regExp := range registryFileRegExps { + if regExp.MatchString(path) { + return true + } + } + return false +} diff --git a/filebeat/beater/diagnostics_test.go b/filebeat/beater/diagnostics_test.go new file mode 100644 index 00000000000..65160135583 --- /dev/null +++ b/filebeat/beater/diagnostics_test.go @@ -0,0 +1,44 @@ +package beater + +import ( + "fmt" + "testing" +) + +func TestMatchRegistryFiles(t *testing.T) { + positiveMatches := []string{ + "registry/filebeat/49855.json", + "registry/filebeat/active.dat", + "registry/filebeat/meta.json", + "registry/filebeat/log.json", + } + negativeMatches := []string{ + "registry/filebeat/bar.dat", + "registry/filebeat/log.txt", + "registry/42.json", + "nop/active.dat", + } + + testFn := func(t *testing.T, path string, match bool) { + result := matchRegistyFiles(path) + if result != match { + t.Errorf( + "mathRegisryFiles('%s') should return %t, got %t instead", + path, + match, + result) + } + } + + for _, path := range positiveMatches { + t.Run(fmt.Sprintf("%s returns true", path), func(t *testing.T) { + testFn(t, path, true) + }) + } + + for _, path := range negativeMatches { + t.Run(fmt.Sprintf("%s returns false", path), func(t *testing.T) { + testFn(t, path, false) + }) + } +} diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index 37c2484adaf..fde3f279233 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -159,19 +159,6 @@ func (r *runner) Stop() { r.statusReporter = nil } -// func (r *runner) Diagnostics() []diagnostics.DiagnosticSetup { -// fmt.Println("================================================== Diagnostics called!") -// setup := diagnostics.DiagnosticSetup{ -// Name: "registry collector", -// Description: "Collect Filebeat's registry", -// Filename: "registry.tar.gz", -// ContentType: "application/octet-stream", -// Callback: getRegistry, -// } - -// return []diagnostics.DiagnosticSetup{setup} -// } - func configID(config *conf.C) (string, error) { tmp := struct { ID string `config:"id"` diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go index d8a72787618..d5f7845fb99 100644 --- a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -127,6 +127,7 @@ func TestRegistryIsInDiagnostics(t *testing.T) { for { header, err := tarReader.Next() if errors.Is(err, io.EOF) { + t.Error("registry log file not found in tar archive") return } @@ -136,8 +137,10 @@ func TestRegistryIsInDiagnostics(t *testing.T) { validateLastRegistryEntry(t, tarReader, 100, logfile) testDone <- struct{}{} + return } } + t.Error("diagnostics do not contain a valid registry") }, } } @@ -164,9 +167,8 @@ func TestRegistryIsInDiagnostics(t *testing.T) { return &checkinExpected } - server.Port = 3000 if err := server.Start(); err != nil { - t.Error(err) + t.Fatalf("cannot start gRPC server: %s", err) } filebeat.Start( From affd18b6cd28d65bceb9330f29f937b505260aa7 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 19 Dec 2024 11:55:32 -0500 Subject: [PATCH 13/21] Add a 20mb limit and improve tests --- filebeat/beater/diagnostics.go | 6 ++++++ .../filebeat/tests/integration/registrydiagnostics_test.go | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 91ed7b20426..6242cbe8c12 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -61,6 +61,12 @@ func gzipRegistry() []byte { logger.Errorw("cannot gzip Filebeat's registry", "error.message", err) } + // if the final file is too large, skip it + if buf.Len() >= 20_000_000 { // 20 Mb + logger.Warnf("registry is too large for diagnostics, %dmb bytes > 20mb", buf.Len()/1_000_000) + return nil + } + return buf.Bytes() } diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go index d5f7845fb99..ff7ff385efc 100644 --- a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -115,11 +115,13 @@ func TestRegistryIsInDiagnostics(t *testing.T) { if len(dr.Content) == 0 { t.Errorf("registry cannot be an empty file") + return } gzipReader, err := gzip.NewReader(bytes.NewReader(dr.Content)) if err != nil { t.Errorf("cannot create gzip reader: '%s'", err) + return } defer gzipReader.Close() @@ -202,7 +204,7 @@ func validateLastRegistryEntry(t *testing.T, reader io.Reader, expectedSize int, }{} if err := json.Unmarshal(lastLine, &entry); err != nil { - t.Fatalf("cannot unmarshal last registry entry: %s", err) + t.Errorf("cannot unmarshal last registry entry: %s", err) } if entry.Data.Meta.Path != expectedPath { From 464ea06ecec02fc218c38f0b4a8213e2df92ece3 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 19 Dec 2024 12:33:19 -0500 Subject: [PATCH 14/21] mage check --- filebeat/beater/diagnostics_test.go | 17 +++++++++++++++++ .../integration/registrydiagnostics_test.go | 3 ++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/filebeat/beater/diagnostics_test.go b/filebeat/beater/diagnostics_test.go index 65160135583..286b8b139f7 100644 --- a/filebeat/beater/diagnostics_test.go +++ b/filebeat/beater/diagnostics_test.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package beater import ( diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go index ff7ff385efc..2261ee35d7f 100644 --- a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -20,10 +20,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/tests/integration" "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" "github.com/elastic/elastic-agent-client/v7/pkg/proto" - "github.com/stretchr/testify/assert" ) func TestRegistryIsInDiagnostics(t *testing.T) { From d44c6093c8acce4c638ccedd3e77d670046c8ce8 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 19 Dec 2024 17:51:57 -0500 Subject: [PATCH 15/21] Support windows path separator --- filebeat/beater/diagnostics.go | 33 +++++++++++++++++++++-------- filebeat/beater/diagnostics_test.go | 17 ++++++++------- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 6242cbe8c12..1bbbe02180a 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -33,6 +33,27 @@ import ( "github.com/elastic/elastic-agent-libs/paths" ) +func init() { + preFilesList := [][]string{ + []string{"^registry$"}, + []string{"^registry", "filebeat$"}, + []string{"^registry", "filebeat", "meta\\.json$"}, + []string{"^registry", "filebeat", "log\\.json$"}, + []string{"^registry", "filebeat", "active\\.dat$"}, + []string{"^registry", "filebeat", "[[:digit:]]*\\.json$"}, + } + + for _, lst := range preFilesList { + var path string + if filepath.Separator == '\\' { + path = strings.Join(lst, `\\`) + } else { + path = filepath.Join(lst...) + } + registryFileRegExps = append(registryFileRegExps, regexp.MustCompile(path)) + } +} + func gzipRegistry() []byte { logger := logp.L().Named("diagnostics") buf := bytes.Buffer{} @@ -161,15 +182,9 @@ func tarFolder(logger *logp.Logger, src, dst string) error { } // We use regexps here because globs do not support specifying a character -// range like we do in the checkpoint file -var registryFileRegExps = []*regexp.Regexp{ - regexp.MustCompile(filepath.Join([]string{"^registry$"}...)), - regexp.MustCompile(filepath.Join([]string{"^registry", "filebeat$"}...)), - regexp.MustCompile(filepath.Join([]string{"^registry", "filebeat", "meta\\.json$"}...)), - regexp.MustCompile(filepath.Join([]string{"^registry", "filebeat", "log\\.json$"}...)), - regexp.MustCompile(filepath.Join([]string{"^registry", "filebeat", "active\\.dat$"}...)), - regexp.MustCompile(filepath.Join([]string{"^registry", "filebeat", "[[:digit:]]*\\.json$"}...)), -} +// range like we do in the checkpoint file. This slice is populated in the +// `init` function because Windows path separators need to be escaped. +var registryFileRegExps = []*regexp.Regexp{} func matchRegistyFiles(path string) bool { for _, regExp := range registryFileRegExps { diff --git a/filebeat/beater/diagnostics_test.go b/filebeat/beater/diagnostics_test.go index 286b8b139f7..275e8a845dc 100644 --- a/filebeat/beater/diagnostics_test.go +++ b/filebeat/beater/diagnostics_test.go @@ -19,21 +19,22 @@ package beater import ( "fmt" + "path/filepath" "testing" ) func TestMatchRegistryFiles(t *testing.T) { positiveMatches := []string{ - "registry/filebeat/49855.json", - "registry/filebeat/active.dat", - "registry/filebeat/meta.json", - "registry/filebeat/log.json", + filepath.Join("registry", "filebeat", "49855.json"), + filepath.Join("registry", "filebeat", "active.dat"), + filepath.Join("registry", "filebeat", "meta.json"), + filepath.Join("registry", "filebeat", "log.json"), } negativeMatches := []string{ - "registry/filebeat/bar.dat", - "registry/filebeat/log.txt", - "registry/42.json", - "nop/active.dat", + filepath.Join("registry", "filebeat", "bar.dat"), + filepath.Join("registry", "filebeat", "log.txt"), + filepath.Join("registry", "42.json"), + filepath.Join("nop", "active.dat"), } testFn := func(t *testing.T, path string, match bool) { From add8e21d253e67e2f37b0b1da3a0e8348c9b12fb Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 2 Jan 2025 10:37:57 -0500 Subject: [PATCH 16/21] Update notice to 2025 --- NOTICE.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NOTICE.txt b/NOTICE.txt index 7968c2b8fd1..1a621316567 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1,5 +1,5 @@ Elastic Beats -Copyright 2014-2024 Elasticsearch BV +Copyright 2014-2025 Elasticsearch BV This product includes software developed by The Apache Software Foundation (http://www.apache.org/). From 2c45a4c100b62a5a00bd782ed0d8fe13c80e3ff2 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 2 Jan 2025 15:22:53 -0500 Subject: [PATCH 17/21] fix otel API --- x-pack/filebeat/fbreceiver/receiver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index 928db4c4b64..7da5c24f0ad 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -130,7 +130,7 @@ func BenchmarkFactory(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := NewFactory().CreateLogsReceiver(context.Background(), receiverSettings, cfg, nil) + _, err := NewFactory().CreateLogs(context.Background(), receiverSettings, cfg, nil) require.NoError(b, err) } } From 216a5d1d9b96554bf5ad8d5e894f8297630ed4d1 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 6 Jan 2025 10:31:22 -0500 Subject: [PATCH 18/21] Disable fingerprint on tests --- .../tests/integration/registrydiagnostics_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go index 2261ee35d7f..4859247bc04 100644 --- a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -45,10 +45,14 @@ func TestRegistryIsInDiagnostics(t *testing.T) { { Id: "stream-filestream-" + t.Name(), Source: integration.RequireNewStruct(t, map[string]interface{}{ - "id": "stream-filestream-" + t.Name(), - "enabled": true, - "type": "filestream", - "paths": []interface{}{logfile}, + "id": "stream-filestream-" + t.Name(), + "enabled": true, + "type": "filestream", + "paths": []interface{}{logfile}, + "file.identity": map[string]any{}, + "prospector.scanner.fingerprint": map[string]any{ + "enabled": false, + }, }), }, }, From 7f3e5ae10c8c978305802192a687766edbcd3d56 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 7 Jan 2025 10:34:51 -0500 Subject: [PATCH 19/21] Stop using `init()` and generate `registryFileRegExps` once needed This commit moves the code that populates `registryFileRegExps` from a `init()` function to a place before `matchRegistyFiles` is called. Because generating diagnostics is very sporadic, there is not meaningful performance change between having it generated once on `init()` or generating it every time it's needed. --- filebeat/beater/diagnostics.go | 39 ++++++++++++++++++++++------- filebeat/beater/diagnostics_test.go | 6 ++++- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 1bbbe02180a..7dddd70084a 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -33,7 +33,11 @@ import ( "github.com/elastic/elastic-agent-libs/paths" ) -func init() { +func getRegexpsForRegistryFiles() ([]*regexp.Regexp, error) { + // We use regexps here because globs do not support specifying a character + // range like we do in the checkpoint file. + + registryFileRegExps := []*regexp.Regexp{} preFilesList := [][]string{ []string{"^registry$"}, []string{"^registry", "filebeat$"}, @@ -50,8 +54,20 @@ func init() { } else { path = filepath.Join(lst...) } - registryFileRegExps = append(registryFileRegExps, regexp.MustCompile(path)) + + // Compile the reg exp, if there is an error, stop and return. + // There should be no error here as this code is tested in all + // supported OSes, however to avoid a code path that leads to a + // panic, we cannot use `regexp.MustCompile` and handle the error + re, err := regexp.Compile(path) + if err != nil { + return nil, fmt.Errorf("cannot compile reg exp: %w", err) + } + + registryFileRegExps = append(registryFileRegExps, re) } + + return registryFileRegExps, nil } func gzipRegistry() []byte { @@ -142,6 +158,16 @@ func tarFolder(logger *logp.Logger, src, dst string) error { baseDir := filepath.Base(src) logger.Debugf("starting to walk '%s'", fullPath) + + // This error should never happen at runtime, if something + // breaks it should break the tests and be fixed before a + // release. We handle the error here to avoid a code path + // that can end into a panic. + registryFileRegExps, err := getRegexpsForRegistryFiles() + if err != nil { + return err + } + return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error { // Stop if there is any errors if prevErr != nil { @@ -149,7 +175,7 @@ func tarFolder(logger *logp.Logger, src, dst string) error { } pathInTar := filepath.Join(baseDir, strings.TrimPrefix(path, src)) - if !matchRegistyFiles(pathInTar) { + if !matchRegistyFiles(registryFileRegExps, pathInTar) { return nil } header, err := tar.FileInfoHeader(info, info.Name()) @@ -181,12 +207,7 @@ func tarFolder(logger *logp.Logger, src, dst string) error { }) } -// We use regexps here because globs do not support specifying a character -// range like we do in the checkpoint file. This slice is populated in the -// `init` function because Windows path separators need to be escaped. -var registryFileRegExps = []*regexp.Regexp{} - -func matchRegistyFiles(path string) bool { +func matchRegistyFiles(registryFileRegExps []*regexp.Regexp, path string) bool { for _, regExp := range registryFileRegExps { if regExp.MatchString(path) { return true diff --git a/filebeat/beater/diagnostics_test.go b/filebeat/beater/diagnostics_test.go index 275e8a845dc..8f2f33d7034 100644 --- a/filebeat/beater/diagnostics_test.go +++ b/filebeat/beater/diagnostics_test.go @@ -36,9 +36,13 @@ func TestMatchRegistryFiles(t *testing.T) { filepath.Join("registry", "42.json"), filepath.Join("nop", "active.dat"), } + registryFileRegExps, err := getRegexpsForRegistryFiles() + if err != nil { + t.Fatalf("cannot compile regexps for registry paths: %s", err) + } testFn := func(t *testing.T, path string, match bool) { - result := matchRegistyFiles(path) + result := matchRegistyFiles(registryFileRegExps, path) if result != match { t.Errorf( "mathRegisryFiles('%s') should return %t, got %t instead", From 44c25ce8773b6cc06ba186d041f55e79db9fb8cf Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 7 Jan 2025 17:55:45 -0500 Subject: [PATCH 20/21] Add a test for an "emtpy" registry This commit adds a test for when Filebat's registry is empty, that's done by using the benchmark input. The benchmark input is fixed so it can run under Elastic-Agent. --- x-pack/filebeat/input/benchmark/input.go | 8 +- .../integration/registrydiagnostics_test.go | 222 +++++++++++++----- 2 files changed, 176 insertions(+), 54 deletions(-) diff --git a/x-pack/filebeat/input/benchmark/input.go b/x-pack/filebeat/input/benchmark/input.go index e098d3e746b..4f9b64332f7 100644 --- a/x-pack/filebeat/input/benchmark/input.go +++ b/x-pack/filebeat/input/benchmark/input.go @@ -158,7 +158,13 @@ type inputMetrics struct { // newInputMetrics returns an input metric for the benchmark processor. func newInputMetrics(ctx v2.Context) *inputMetrics { - reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, ctx.Agent.Monitoring.Namespace.GetRegistry()) + var globalRegistry *monitoring.Registry + // When running under Elastic-Agent Namespace can be nil. + // Passing a nil registry to inputmon.NewInputRegistry is not a problem. + if ctx.Agent.Monitoring.Namespace != nil { + globalRegistry = ctx.Agent.Monitoring.Namespace.GetRegistry() + } + reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, globalRegistry) out := &inputMetrics{ unregister: unreg, eventsPublished: monitoring.NewUint(reg, "events_published_total"), diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go index 4859247bc04..39d0ef2becf 100644 --- a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -27,7 +27,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/proto" ) -func TestRegistryIsInDiagnostics(t *testing.T) { +func TestFilestreamRegistryIsInDiagnostics(t *testing.T) { filebeat := NewFilebeat(t) logfile := filepath.Join(filebeat.TempDir(), "log.log") integration.GenerateLogFile(t, logfile, 2, false) @@ -93,63 +93,98 @@ func TestRegistryIsInDiagnostics(t *testing.T) { SentActions: map[string]*mock.PerformAction{}, } - // requirerequestDiagnostics runs on a different goroutine, we cannot call t.Fatal - requestDiagnostics := func() { - assert.Eventuallyf(t, func() bool { - return filebeat.CountFileLines(outputGlob) == 2 - }, - 1*time.Minute, - 100*time.Millisecond, - "output file '%s' does not contain two events", outputGlob) - - // Once we're done, set it back to false - defer waitingForDiagnostics.Store(false) - server.ActionsChan <- &mock.PerformAction{ - Type: proto.ActionRequest_DIAGNOSTICS, - Name: "diagnostics", - Level: proto.ActionRequest_COMPONENT, // aka diagnostics for the whole Beat - DiagCallback: func(diagResults []*proto.ActionDiagnosticUnitResult, diagErr error) { - if diagErr != nil { - t.Errorf("diagnostics failed: %s", diagErr) - } + server.CheckinV2Impl = func(observed *proto.CheckinObserved) *proto.CheckinExpected { + // No matter the current state, we always return the same units + checkinExpected := proto.CheckinExpected{ + Units: units, + } - for _, dr := range diagResults { - if dr.Name != "registry" { - continue - } + // If any unit is not healthy, just return the expected state + for _, unit := range observed.Units { + if unit.GetState() != proto.State_HEALTHY { + return &checkinExpected + } + } - if len(dr.Content) == 0 { - t.Errorf("registry cannot be an empty file") - return - } + // All units are healthy, we can request the diagnostics. + // Ensure we don't have any diagnostics being requested already. + if waitingForDiagnostics.CompareAndSwap(false, true) { + // Request the diagnostics asynchronously + go requestDiagnosticsAndVerifyRegistry(t, filebeat, outputGlob, logfile, 100, &waitingForDiagnostics, server, testDone, false) + } + return &checkinExpected + } - gzipReader, err := gzip.NewReader(bytes.NewReader(dr.Content)) - if err != nil { - t.Errorf("cannot create gzip reader: '%s'", err) - return - } - defer gzipReader.Close() - - tarReader := tar.NewReader(gzipReader) - for { - header, err := tarReader.Next() - if errors.Is(err, io.EOF) { - t.Error("registry log file not found in tar archive") - return - } + if err := server.Start(); err != nil { + t.Fatalf("cannot start gRPC server: %s", err) + } - if header.Name != "registry/filebeat/log.json" { - continue - } + filebeat.Start( + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + "-E", "queue.mem.flush.timeout=0", + ) - validateLastRegistryEntry(t, tarReader, 100, logfile) - testDone <- struct{}{} - return - } - } - t.Error("diagnostics do not contain a valid registry") + <-testDone +} + +func TestEmptyegistryIsInDiagnostics(t *testing.T) { + filebeat := NewFilebeat(t) + input := proto.UnitExpected{ + Id: "input-" + t.Name(), + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "unit-filestream-" + t.Name(), + Type: "benchmark", + Name: "Benchmark-" + t.Name(), + Streams: []*proto.Stream{ + { + Id: "stream-benchmark-" + t.Name(), + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "id": "stream-benchmark-" + t.Name(), + "enabled": true, + "count": 2, + }), + }, }, - } + }, + } + + output := proto.UnitExpected{ + Id: "unit-output-" + t.Name(), + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "output-" + t.Name(), + Type: "file", + Name: "file", + Source: integration.RequireNewStruct(t, + map[string]interface{}{ + "type": "file", + "path": filebeat.TempDir(), + "filename": "output", + }), + }, + } + outputGlob := filepath.Join(filebeat.TempDir(), "output*") + + var units = []*proto.UnitExpected{ + &output, + &input, + } + + waitingForDiagnostics := atomic.Bool{} + testDone := make(chan struct{}) + + server := &mock.StubServerV2{ + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + ActionsChan: make(chan *mock.PerformAction), + SentActions: map[string]*mock.PerformAction{}, } server.CheckinV2Impl = func(observed *proto.CheckinObserved) *proto.CheckinExpected { @@ -169,7 +204,7 @@ func TestRegistryIsInDiagnostics(t *testing.T) { // Ensure we don't have any diagnostics being requested already. if waitingForDiagnostics.CompareAndSwap(false, true) { // Request the diagnostics asynchronously - go requestDiagnostics() + go requestDiagnosticsAndVerifyRegistry(t, filebeat, outputGlob, "", 0, &waitingForDiagnostics, server, testDone, true) } return &checkinExpected } @@ -226,3 +261,84 @@ func validateLastRegistryEntry(t *testing.T, reader io.Reader, expectedSize int, entry.Data.Cursor.Offset) } } + +// requestDiagnosticsAndVerifyRegistry runs on a different goroutine, we cannot call t.Fatal +func requestDiagnosticsAndVerifyRegistry( + t *testing.T, + filebeat *integration.BeatProc, + outputGlob, + logfile string, + logfileOffset int, + waitingForDiagnostics *atomic.Bool, + server *mock.StubServerV2, + testDone chan<- struct{}, + emtpyRegistyLogFile bool) { + + assert.Eventuallyf(t, func() bool { + return filebeat.CountFileLines(outputGlob) == 2 + }, + 1*time.Minute, + 100*time.Millisecond, + "output file '%s' does not contain two events", outputGlob) + + // Once we're done, set it back to false + defer waitingForDiagnostics.Store(false) + server.ActionsChan <- &mock.PerformAction{ + Type: proto.ActionRequest_DIAGNOSTICS, + Name: "diagnostics", + Level: proto.ActionRequest_COMPONENT, // aka diagnostics for the whole Beat + DiagCallback: func(diagResults []*proto.ActionDiagnosticUnitResult, diagErr error) { + // Let the test finish when this callback finishes + defer func() { + testDone <- struct{}{} + }() + + if diagErr != nil { + t.Errorf("diagnostics failed: %s", diagErr) + return + } + + for _, dr := range diagResults { + if dr.Name != "registry" { + continue + } + + if len(dr.Content) == 0 { + t.Errorf("registry cannot be an empty file") + return + } + + gzipReader, err := gzip.NewReader(bytes.NewReader(dr.Content)) + if err != nil { + t.Errorf("cannot create gzip reader: '%s'", err) + return + } + defer gzipReader.Close() + + tarReader := tar.NewReader(gzipReader) + for { + header, err := tarReader.Next() + if errors.Is(err, io.EOF) { + t.Error("registry log file not found in tar archive") + return + } + + if header.Name != "registry/filebeat/log.json" { + continue + } + + if emtpyRegistyLogFile { + if header.Size != 0 { + t.Errorf("expecting registry log file to be empty, got %d bytes instead", header.Size) + } + return + } + + validateLastRegistryEntry(t, tarReader, logfileOffset, logfile) + return + } + } + t.Error("diagnostics do not contain a valid registry") + }, + } +} From 5c43e5a409ebe92fcb28be85345de7b558399111 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 22 Jan 2025 16:11:43 -0500 Subject: [PATCH 21/21] Reformat code and fix logging --- filebeat/beater/diagnostics.go | 2 +- .../integration/registrydiagnostics_test.go | 19 +++++++++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 7dddd70084a..9b4e02b4e47 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -85,7 +85,7 @@ func gzipRegistry() []byte { defer func() { if err := os.Remove(f.Name()); err != nil { - logp.L().Named("diagnostics").Warnf("cannot remove temporary registry archive '%s': '%s'", f.Name(), err) + logger.Warnf("cannot remove temporary registry archive '%s': '%s'", f.Name(), err) } }() diff --git a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go index 39d0ef2becf..121999455cf 100644 --- a/x-pack/filebeat/tests/integration/registrydiagnostics_test.go +++ b/x-pack/filebeat/tests/integration/registrydiagnostics_test.go @@ -110,7 +110,16 @@ func TestFilestreamRegistryIsInDiagnostics(t *testing.T) { // Ensure we don't have any diagnostics being requested already. if waitingForDiagnostics.CompareAndSwap(false, true) { // Request the diagnostics asynchronously - go requestDiagnosticsAndVerifyRegistry(t, filebeat, outputGlob, logfile, 100, &waitingForDiagnostics, server, testDone, false) + go requestDiagnosticsAndVerifyRegistry( + t, + filebeat, + outputGlob, + logfile, + 100, + &waitingForDiagnostics, + server, + testDone, + false) } return &checkinExpected } @@ -274,9 +283,11 @@ func requestDiagnosticsAndVerifyRegistry( testDone chan<- struct{}, emtpyRegistyLogFile bool) { - assert.Eventuallyf(t, func() bool { - return filebeat.CountFileLines(outputGlob) == 2 - }, + assert.Eventuallyf( + t, + func() bool { + return filebeat.CountFileLines(outputGlob) == 2 + }, 1*time.Minute, 100*time.Millisecond, "output file '%s' does not contain two events", outputGlob)