From b86110fe110a428490059e7a8669304a08bf5784 Mon Sep 17 00:00:00 2001 From: Peter Wilcsinszky Date: Wed, 26 Jul 2023 14:01:48 +0200 Subject: [PATCH 1/2] fix: lookup actual cluster output namespace, validate and bail out if invalid Signed-off-by: Peter Wilcsinszky --- .../logging/model/syslogng/config/config.go | 23 ++- .../model/syslogng/config/config_test.go | 131 ++++++++++++++++++ pkg/sdk/logging/model/syslogng/config/flow.go | 28 +++- .../model/syslogng/config/flow_test.go | 7 +- pkg/sdk/logging/model/syslogng/config/go.mod | 2 +- 5 files changed, 177 insertions(+), 14 deletions(-) diff --git a/pkg/sdk/logging/model/syslogng/config/config.go b/pkg/sdk/logging/model/syslogng/config/config.go index 9482ee5f9..a78fba89b 100644 --- a/pkg/sdk/logging/model/syslogng/config/config.go +++ b/pkg/sdk/logging/model/syslogng/config/config.go @@ -15,12 +15,14 @@ package config import ( - "errors" "io" "reflect" + "emperror.dev/errors" "github.com/cisco-open/operator-tools/pkg/secret" "github.com/siliconbrain/go-seqs/seqs" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1" "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render" @@ -62,6 +64,8 @@ func configRenderer(in Input) (render.Renderer, error) { return nil, errors.New("missing syslog-ng spec") } + var errs error + // TODO: this should happen at the spec level, in something like `SyslogNGSpec.FinalGlobalOptions() GlobalOptions` if in.Logging.Spec.SyslogNGSpec.Metrics != nil { setDefault(&in.Logging.Spec.SyslogNGSpec.GlobalOptions, &v1beta1.GlobalOptions{}) @@ -79,7 +83,12 @@ func configRenderer(in Input) (render.Renderer, error) { globalOptions := renderAny(in.Logging.Spec.SyslogNGSpec.GlobalOptions, in.SecretLoaderFactory.SecretLoaderForNamespace(in.Logging.Namespace)) destinationDefs := make([]render.Renderer, 0, len(in.ClusterOutputs)+len(in.Outputs)) + clusterOutputRefs := make(map[string]types.NamespacedName, len(in.ClusterOutputs)) for _, co := range in.ClusterOutputs { + clusterOutputRefs[co.Name] = types.NamespacedName{ + Namespace: co.Namespace, + Name: co.Name, + } destinationDefs = append(destinationDefs, renderClusterOutput(co, in.SecretLoaderFactory)) } for _, o := range in.Outputs { @@ -88,10 +97,16 @@ func configRenderer(in Input) (render.Renderer, error) { logDefs := make([]render.Renderer, 0, len(in.ClusterFlows)+len(in.Flows)) for _, cf := range in.ClusterFlows { - logDefs = append(logDefs, renderClusterFlow(sourceName, cf, in.SecretLoaderFactory)) + if err := validateClusterOutputs(clusterOutputRefs, client.ObjectKeyFromObject(&cf).String(), cf.Spec.GlobalOutputRefs); err != nil { + errs = errors.Append(errs, err) + } + logDefs = append(logDefs, renderClusterFlow(clusterOutputRefs, sourceName, cf, in.SecretLoaderFactory)) } for _, f := range in.Flows { - logDefs = append(logDefs, renderFlow(in.Logging.Spec.ControlNamespace, sourceName, keyDelim(in.Logging.Spec.SyslogNGSpec.JSONKeyDelimiter), f, in.SecretLoaderFactory)) + if err := validateClusterOutputs(clusterOutputRefs, client.ObjectKeyFromObject(&f).String(), f.Spec.GlobalOutputRefs); err != nil { + errs = errors.Append(errs, err) + } + logDefs = append(logDefs, renderFlow(clusterOutputRefs, sourceName, keyDelim(in.Logging.Spec.SyslogNGSpec.JSONKeyDelimiter), f, in.SecretLoaderFactory)) } if in.Logging.Spec.SyslogNGSpec.JSONKeyPrefix == "" { @@ -132,7 +147,7 @@ func configRenderer(in Input) (render.Renderer, error) { func(rnd render.Renderer) bool { return rnd != nil }, ), render.Line(render.Empty), - )), nil + )), errs } func versionStmt(version string) render.Renderer { diff --git a/pkg/sdk/logging/model/syslogng/config/config_test.go b/pkg/sdk/logging/model/syslogng/config/config_test.go index 5c36fe067..aa6a31b61 100644 --- a/pkg/sdk/logging/model/syslogng/config/config_test.go +++ b/pkg/sdk/logging/model/syslogng/config/config_test.go @@ -395,6 +395,137 @@ destination "output_default_my-output" { }; `, }, + "clusteroutput with flow ref": { + input: Input{ + Logging: v1beta1.Logging{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: v1beta1.LoggingSpec{ + SyslogNGSpec: &v1beta1.SyslogNGSpec{}, + ControlNamespace: "logging", + }, + }, + Flows: []v1beta1.SyslogNGFlow{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-flow", + }, + Spec: v1beta1.SyslogNGFlowSpec{ + GlobalOutputRefs: []string{ + "clusterout", + }, + }, + }, + }, + ClusterOutputs: []v1beta1.SyslogNGClusterOutput{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "clusterout", + Namespace: "logging", + }, + Spec: v1beta1.SyslogNGClusterOutputSpec{ + SyslogNGOutputSpec: v1beta1.SyslogNGOutputSpec{ + Syslog: &output.SyslogOutput{ + Host: "127.0.0.1", + }, + }, + }, + }, + }, + SecretLoaderFactory: &TestSecretLoaderFactory{}, + SourcePort: 601, + }, + wantOut: `@version: current + +@include "scl.conf" + +source "main_input" { + channel { + source { + network(flags("no-parse") port(601) transport("tcp")); + }; + parser { + json-parser(prefix("json.")); + }; + }; +}; + +destination "clusteroutput_logging_clusterout" { + syslog("127.0.0.1" persist_name("clusteroutput_logging_clusterout")); +}; + +filter "flow_default_test-flow_ns_filter" { + match("default" value("json.kubernetes.namespace_name") type("string")); +}; +log { + source("main_input"); + filter("flow_default_test-flow_ns_filter"); + destination("clusteroutput_logging_clusterout"); +}; +`, + }, + "flow referencing non-existent cluster output": { + input: Input{ + Logging: v1beta1.Logging{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: v1beta1.LoggingSpec{ + SyslogNGSpec: &v1beta1.SyslogNGSpec{}, + ControlNamespace: "logging", + }, + }, + Flows: []v1beta1.SyslogNGFlow{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-flow", + }, + Spec: v1beta1.SyslogNGFlowSpec{ + GlobalOutputRefs: []string{ + "clusterout", + }, + }, + }, + }, + ClusterOutputs: nil, + SecretLoaderFactory: &TestSecretLoaderFactory{}, + SourcePort: 601, + }, + wantErr: true, + }, + "clusterFlow referencing non-existent cluster output": { + input: Input{ + Logging: v1beta1.Logging{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: v1beta1.LoggingSpec{ + SyslogNGSpec: &v1beta1.SyslogNGSpec{}, + ControlNamespace: "logging", + }, + }, + ClusterFlows: []v1beta1.SyslogNGClusterFlow{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-flow", + }, + Spec: v1beta1.SyslogNGClusterFlowSpec{ + GlobalOutputRefs: []string{ + "clusterout", + }, + }, + }, + }, + ClusterOutputs: nil, + SecretLoaderFactory: &TestSecretLoaderFactory{}, + SourcePort: 601, + }, + wantErr: true, + }, "parser": { input: Input{ Logging: v1beta1.Logging{ diff --git a/pkg/sdk/logging/model/syslogng/config/flow.go b/pkg/sdk/logging/model/syslogng/config/flow.go index 35b329a90..c37beaa7a 100644 --- a/pkg/sdk/logging/model/syslogng/config/flow.go +++ b/pkg/sdk/logging/model/syslogng/config/flow.go @@ -20,16 +20,28 @@ import ( "strconv" "strings" + "emperror.dev/errors" "github.com/cisco-open/operator-tools/pkg/secret" + "github.com/siliconbrain/go-seqs/seqs" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1" "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/model" "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render" filter "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/filter" - "github.com/siliconbrain/go-seqs/seqs" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func renderClusterFlow(sourceName string, f v1beta1.SyslogNGClusterFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer { +func validateClusterOutputs(clusterOutputRefs map[string]types.NamespacedName, flow string, globalOutputRefs []string) error { + return seqs.Reduce(seqs.FromSlice(globalOutputRefs), nil, func(err error, ref string) error { + if _, ok := clusterOutputRefs[ref]; !ok { + return errors.Append(err, errors.Errorf("cluster output reference %s for flow %s cannot be found", ref, flow)) + } + return err + }) +} + +func renderClusterFlow(clusterOutputRefs map[string]types.NamespacedName, sourceName string, f v1beta1.SyslogNGClusterFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer { baseName := fmt.Sprintf("clusterflow_%s_%s", f.Namespace, f.Name) matchName := fmt.Sprintf("%s_match", baseName) filterDefs := seqs.MapWithIndex(seqs.FromSlice(f.Spec.Filters), func(idx int, flt v1beta1.SyslogNGFilter) render.Renderer { @@ -48,12 +60,14 @@ func renderClusterFlow(sourceName string, f v1beta1.SyslogNGClusterFlow, secretL return parenDefStmt(filterKind(flt), render.Literal(filterID(flt, idx, baseName))) }), )), - seqs.ToSlice(seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string { return clusterOutputDestName(f.Namespace, ref) })), + seqs.ToSlice(seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string { + return clusterOutputDestName(clusterOutputRefs[ref].Namespace, ref) + })), ), ) } -func renderFlow(controlNS string, sourceName string, keyDelim string, f v1beta1.SyslogNGFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer { +func renderFlow(clusterOutputRefs map[string]types.NamespacedName, sourceName string, keyDelim string, f v1beta1.SyslogNGFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer { baseName := fmt.Sprintf("flow_%s_%s", f.Namespace, f.Name) matchName := fmt.Sprintf("%s_match", baseName) nsFilterName := fmt.Sprintf("%s_ns_filter", baseName) @@ -80,7 +94,9 @@ func renderFlow(controlNS string, sourceName string, keyDelim string, f v1beta1. }), )), seqs.ToSlice(seqs.Concat( - seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string { return clusterOutputDestName(f.Namespace, ref) }), + seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string { + return clusterOutputDestName(clusterOutputRefs[ref].Namespace, ref) + }), seqs.Map(seqs.FromSlice(f.Spec.LocalOutputRefs), func(ref string) string { return outputDestName(f.Namespace, ref) }), )), ), diff --git a/pkg/sdk/logging/model/syslogng/config/flow_test.go b/pkg/sdk/logging/model/syslogng/config/flow_test.go index d6ef5d7da..25f49588e 100644 --- a/pkg/sdk/logging/model/syslogng/config/flow_test.go +++ b/pkg/sdk/logging/model/syslogng/config/flow_test.go @@ -18,11 +18,12 @@ import ( "strings" "testing" - "github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1" - "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1" + "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render" ) func TestRenderClusterFlow(t *testing.T) { @@ -65,7 +66,7 @@ source("test_input"); testCase := testCase t.Run(name, func(t *testing.T) { out := strings.Builder{} - require.NoError(t, renderClusterFlow("test_input", testCase.clusterFlow, nil)(render.RenderContext{ + require.NoError(t, renderClusterFlow(nil, "test_input", testCase.clusterFlow, nil)(render.RenderContext{ Out: &out, })) assert.Equal(t, testCase.expected, out.String()) diff --git a/pkg/sdk/logging/model/syslogng/config/go.mod b/pkg/sdk/logging/model/syslogng/config/go.mod index ff0446c4e..fefcc9d1f 100644 --- a/pkg/sdk/logging/model/syslogng/config/go.mod +++ b/pkg/sdk/logging/model/syslogng/config/go.mod @@ -3,6 +3,7 @@ module github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/c go 1.20 require ( + emperror.dev/errors v0.8.1 github.com/cisco-open/operator-tools v0.30.0 github.com/kube-logging/logging-operator/pkg/sdk v0.9.1 github.com/siliconbrain/go-seqs v0.5.0 @@ -14,7 +15,6 @@ require ( ) require ( - emperror.dev/errors v0.8.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect From 6d158429de2ef9b2b3e26ab054cda50312edbcff Mon Sep 17 00:00:00 2001 From: Peter Wilcsinszky Date: Wed, 26 Jul 2023 23:09:58 +0200 Subject: [PATCH 2/2] fix: make cluster.local domain configurable for fluentbit in syslog-ng aggregator mode as well Signed-off-by: Peter Wilcsinszky --- pkg/resources/fluentbit/configsecret.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/resources/fluentbit/configsecret.go b/pkg/resources/fluentbit/configsecret.go index 97d9237a6..7a7d58899 100644 --- a/pkg/resources/fluentbit/configsecret.go +++ b/pkg/resources/fluentbit/configsecret.go @@ -337,7 +337,7 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er if r.Logging.Spec.SyslogNGSpec != nil { input.SyslogNGOutput = &syslogNGOutputConfig{} - input.SyslogNGOutput.Host = fmt.Sprintf("%s.%s.svc.cluster.local", r.Logging.QualifiedName(syslogng.ServiceName), r.Logging.Spec.ControlNamespace) + input.SyslogNGOutput.Host = fmt.Sprintf("%s.%s.svc%s", r.Logging.QualifiedName(syslogng.ServiceName), r.Logging.Spec.ControlNamespace, r.Logging.ClusterDomainAsSuffix()) input.SyslogNGOutput.Port = syslogng.ServicePort input.SyslogNGOutput.JSONDateKey = "ts" input.SyslogNGOutput.JSONDateFormat = "iso8601"