Skip to content

Commit

Permalink
Merge pull request #1402 from kube-logging/syslog-ng-output-config
Browse files Browse the repository at this point in the history
syslog-ng aggregator fixes
  • Loading branch information
pepov authored Jul 27, 2023
2 parents b25cfef + 6d15842 commit 4443476
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/resources/fluentbit/configsecret.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er

if r.Logging.Spec.SyslogNGSpec != nil {
input.SyslogNGOutput = &syslogNGOutputConfig{}
input.SyslogNGOutput.Host = fmt.Sprintf("%s.%s.svc.cluster.local", r.Logging.QualifiedName(syslogng.ServiceName), r.Logging.Spec.ControlNamespace)
input.SyslogNGOutput.Host = fmt.Sprintf("%s.%s.svc%s", r.Logging.QualifiedName(syslogng.ServiceName), r.Logging.Spec.ControlNamespace, r.Logging.ClusterDomainAsSuffix())
input.SyslogNGOutput.Port = syslogng.ServicePort
input.SyslogNGOutput.JSONDateKey = "ts"
input.SyslogNGOutput.JSONDateFormat = "iso8601"
Expand Down
23 changes: 19 additions & 4 deletions pkg/sdk/logging/model/syslogng/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package config

import (
"errors"
"io"
"reflect"

"emperror.dev/errors"
"github.com/cisco-open/operator-tools/pkg/secret"
"github.com/siliconbrain/go-seqs/seqs"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render"
Expand Down Expand Up @@ -62,6 +64,8 @@ func configRenderer(in Input) (render.Renderer, error) {
return nil, errors.New("missing syslog-ng spec")
}

var errs error

// TODO: this should happen at the spec level, in something like `SyslogNGSpec.FinalGlobalOptions() GlobalOptions`
if in.Logging.Spec.SyslogNGSpec.Metrics != nil {
setDefault(&in.Logging.Spec.SyslogNGSpec.GlobalOptions, &v1beta1.GlobalOptions{})
Expand All @@ -79,7 +83,12 @@ func configRenderer(in Input) (render.Renderer, error) {
globalOptions := renderAny(in.Logging.Spec.SyslogNGSpec.GlobalOptions, in.SecretLoaderFactory.SecretLoaderForNamespace(in.Logging.Namespace))

destinationDefs := make([]render.Renderer, 0, len(in.ClusterOutputs)+len(in.Outputs))
clusterOutputRefs := make(map[string]types.NamespacedName, len(in.ClusterOutputs))
for _, co := range in.ClusterOutputs {
clusterOutputRefs[co.Name] = types.NamespacedName{
Namespace: co.Namespace,
Name: co.Name,
}
destinationDefs = append(destinationDefs, renderClusterOutput(co, in.SecretLoaderFactory))
}
for _, o := range in.Outputs {
Expand All @@ -88,10 +97,16 @@ func configRenderer(in Input) (render.Renderer, error) {

logDefs := make([]render.Renderer, 0, len(in.ClusterFlows)+len(in.Flows))
for _, cf := range in.ClusterFlows {
logDefs = append(logDefs, renderClusterFlow(sourceName, cf, in.SecretLoaderFactory))
if err := validateClusterOutputs(clusterOutputRefs, client.ObjectKeyFromObject(&cf).String(), cf.Spec.GlobalOutputRefs); err != nil {
errs = errors.Append(errs, err)
}
logDefs = append(logDefs, renderClusterFlow(clusterOutputRefs, sourceName, cf, in.SecretLoaderFactory))
}
for _, f := range in.Flows {
logDefs = append(logDefs, renderFlow(in.Logging.Spec.ControlNamespace, sourceName, keyDelim(in.Logging.Spec.SyslogNGSpec.JSONKeyDelimiter), f, in.SecretLoaderFactory))
if err := validateClusterOutputs(clusterOutputRefs, client.ObjectKeyFromObject(&f).String(), f.Spec.GlobalOutputRefs); err != nil {
errs = errors.Append(errs, err)
}
logDefs = append(logDefs, renderFlow(clusterOutputRefs, sourceName, keyDelim(in.Logging.Spec.SyslogNGSpec.JSONKeyDelimiter), f, in.SecretLoaderFactory))
}

if in.Logging.Spec.SyslogNGSpec.JSONKeyPrefix == "" {
Expand Down Expand Up @@ -132,7 +147,7 @@ func configRenderer(in Input) (render.Renderer, error) {
func(rnd render.Renderer) bool { return rnd != nil },
),
render.Line(render.Empty),
)), nil
)), errs
}

func versionStmt(version string) render.Renderer {
Expand Down
131 changes: 131 additions & 0 deletions pkg/sdk/logging/model/syslogng/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,137 @@ destination "output_default_my-output" {
};
`,
},
"clusteroutput with flow ref": {
input: Input{
Logging: v1beta1.Logging{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1beta1.LoggingSpec{
SyslogNGSpec: &v1beta1.SyslogNGSpec{},
ControlNamespace: "logging",
},
},
Flows: []v1beta1.SyslogNGFlow{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-flow",
},
Spec: v1beta1.SyslogNGFlowSpec{
GlobalOutputRefs: []string{
"clusterout",
},
},
},
},
ClusterOutputs: []v1beta1.SyslogNGClusterOutput{
{
ObjectMeta: metav1.ObjectMeta{
Name: "clusterout",
Namespace: "logging",
},
Spec: v1beta1.SyslogNGClusterOutputSpec{
SyslogNGOutputSpec: v1beta1.SyslogNGOutputSpec{
Syslog: &output.SyslogOutput{
Host: "127.0.0.1",
},
},
},
},
},
SecretLoaderFactory: &TestSecretLoaderFactory{},
SourcePort: 601,
},
wantOut: `@version: current
@include "scl.conf"
source "main_input" {
channel {
source {
network(flags("no-parse") port(601) transport("tcp"));
};
parser {
json-parser(prefix("json."));
};
};
};
destination "clusteroutput_logging_clusterout" {
syslog("127.0.0.1" persist_name("clusteroutput_logging_clusterout"));
};
filter "flow_default_test-flow_ns_filter" {
match("default" value("json.kubernetes.namespace_name") type("string"));
};
log {
source("main_input");
filter("flow_default_test-flow_ns_filter");
destination("clusteroutput_logging_clusterout");
};
`,
},
"flow referencing non-existent cluster output": {
input: Input{
Logging: v1beta1.Logging{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1beta1.LoggingSpec{
SyslogNGSpec: &v1beta1.SyslogNGSpec{},
ControlNamespace: "logging",
},
},
Flows: []v1beta1.SyslogNGFlow{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-flow",
},
Spec: v1beta1.SyslogNGFlowSpec{
GlobalOutputRefs: []string{
"clusterout",
},
},
},
},
ClusterOutputs: nil,
SecretLoaderFactory: &TestSecretLoaderFactory{},
SourcePort: 601,
},
wantErr: true,
},
"clusterFlow referencing non-existent cluster output": {
input: Input{
Logging: v1beta1.Logging{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1beta1.LoggingSpec{
SyslogNGSpec: &v1beta1.SyslogNGSpec{},
ControlNamespace: "logging",
},
},
ClusterFlows: []v1beta1.SyslogNGClusterFlow{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-flow",
},
Spec: v1beta1.SyslogNGClusterFlowSpec{
GlobalOutputRefs: []string{
"clusterout",
},
},
},
},
ClusterOutputs: nil,
SecretLoaderFactory: &TestSecretLoaderFactory{},
SourcePort: 601,
},
wantErr: true,
},
"parser": {
input: Input{
Logging: v1beta1.Logging{
Expand Down
28 changes: 22 additions & 6 deletions pkg/sdk/logging/model/syslogng/config/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,28 @@ import (
"strconv"
"strings"

"emperror.dev/errors"
"github.com/cisco-open/operator-tools/pkg/secret"
"github.com/siliconbrain/go-seqs/seqs"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/model"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render"
filter "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/filter"
"github.com/siliconbrain/go-seqs/seqs"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func renderClusterFlow(sourceName string, f v1beta1.SyslogNGClusterFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer {
func validateClusterOutputs(clusterOutputRefs map[string]types.NamespacedName, flow string, globalOutputRefs []string) error {
return seqs.Reduce(seqs.FromSlice(globalOutputRefs), nil, func(err error, ref string) error {
if _, ok := clusterOutputRefs[ref]; !ok {
return errors.Append(err, errors.Errorf("cluster output reference %s for flow %s cannot be found", ref, flow))
}
return err
})
}

func renderClusterFlow(clusterOutputRefs map[string]types.NamespacedName, sourceName string, f v1beta1.SyslogNGClusterFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer {
baseName := fmt.Sprintf("clusterflow_%s_%s", f.Namespace, f.Name)
matchName := fmt.Sprintf("%s_match", baseName)
filterDefs := seqs.MapWithIndex(seqs.FromSlice(f.Spec.Filters), func(idx int, flt v1beta1.SyslogNGFilter) render.Renderer {
Expand All @@ -48,12 +60,14 @@ func renderClusterFlow(sourceName string, f v1beta1.SyslogNGClusterFlow, secretL
return parenDefStmt(filterKind(flt), render.Literal(filterID(flt, idx, baseName)))
}),
)),
seqs.ToSlice(seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string { return clusterOutputDestName(f.Namespace, ref) })),
seqs.ToSlice(seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string {
return clusterOutputDestName(clusterOutputRefs[ref].Namespace, ref)
})),
),
)
}

func renderFlow(controlNS string, sourceName string, keyDelim string, f v1beta1.SyslogNGFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer {
func renderFlow(clusterOutputRefs map[string]types.NamespacedName, sourceName string, keyDelim string, f v1beta1.SyslogNGFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer {
baseName := fmt.Sprintf("flow_%s_%s", f.Namespace, f.Name)
matchName := fmt.Sprintf("%s_match", baseName)
nsFilterName := fmt.Sprintf("%s_ns_filter", baseName)
Expand All @@ -80,7 +94,9 @@ func renderFlow(controlNS string, sourceName string, keyDelim string, f v1beta1.
}),
)),
seqs.ToSlice(seqs.Concat(
seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string { return clusterOutputDestName(f.Namespace, ref) }),
seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string {
return clusterOutputDestName(clusterOutputRefs[ref].Namespace, ref)
}),
seqs.Map(seqs.FromSlice(f.Spec.LocalOutputRefs), func(ref string) string { return outputDestName(f.Namespace, ref) }),
)),
),
Expand Down
7 changes: 4 additions & 3 deletions pkg/sdk/logging/model/syslogng/config/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"strings"
"testing"

"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render"
)

func TestRenderClusterFlow(t *testing.T) {
Expand Down Expand Up @@ -65,7 +66,7 @@ source("test_input");
testCase := testCase
t.Run(name, func(t *testing.T) {
out := strings.Builder{}
require.NoError(t, renderClusterFlow("test_input", testCase.clusterFlow, nil)(render.RenderContext{
require.NoError(t, renderClusterFlow(nil, "test_input", testCase.clusterFlow, nil)(render.RenderContext{
Out: &out,
}))
assert.Equal(t, testCase.expected, out.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/sdk/logging/model/syslogng/config/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/c
go 1.20

require (
emperror.dev/errors v0.8.1
github.com/cisco-open/operator-tools v0.30.0
github.com/kube-logging/logging-operator/pkg/sdk v0.9.1
github.com/siliconbrain/go-seqs v0.5.0
Expand All @@ -14,7 +15,6 @@ require (
)

require (
emperror.dev/errors v0.8.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down

0 comments on commit 4443476

Please sign in to comment.