Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syslog-ng aggregator fixes #1402

Merged
merged 2 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/resources/fluentbit/configsecret.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er

if r.Logging.Spec.SyslogNGSpec != nil {
input.SyslogNGOutput = &syslogNGOutputConfig{}
input.SyslogNGOutput.Host = fmt.Sprintf("%s.%s.svc.cluster.local", r.Logging.QualifiedName(syslogng.ServiceName), r.Logging.Spec.ControlNamespace)
input.SyslogNGOutput.Host = fmt.Sprintf("%s.%s.svc%s", r.Logging.QualifiedName(syslogng.ServiceName), r.Logging.Spec.ControlNamespace, r.Logging.ClusterDomainAsSuffix())
input.SyslogNGOutput.Port = syslogng.ServicePort
input.SyslogNGOutput.JSONDateKey = "ts"
input.SyslogNGOutput.JSONDateFormat = "iso8601"
Expand Down
23 changes: 19 additions & 4 deletions pkg/sdk/logging/model/syslogng/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package config

import (
"errors"
"io"
"reflect"

"emperror.dev/errors"
"github.com/cisco-open/operator-tools/pkg/secret"
"github.com/siliconbrain/go-seqs/seqs"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render"
Expand Down Expand Up @@ -62,6 +64,8 @@ func configRenderer(in Input) (render.Renderer, error) {
return nil, errors.New("missing syslog-ng spec")
}

var errs error

// TODO: this should happen at the spec level, in something like `SyslogNGSpec.FinalGlobalOptions() GlobalOptions`
if in.Logging.Spec.SyslogNGSpec.Metrics != nil {
setDefault(&in.Logging.Spec.SyslogNGSpec.GlobalOptions, &v1beta1.GlobalOptions{})
Expand All @@ -79,7 +83,12 @@ func configRenderer(in Input) (render.Renderer, error) {
globalOptions := renderAny(in.Logging.Spec.SyslogNGSpec.GlobalOptions, in.SecretLoaderFactory.SecretLoaderForNamespace(in.Logging.Namespace))

destinationDefs := make([]render.Renderer, 0, len(in.ClusterOutputs)+len(in.Outputs))
clusterOutputRefs := make(map[string]types.NamespacedName, len(in.ClusterOutputs))
for _, co := range in.ClusterOutputs {
clusterOutputRefs[co.Name] = types.NamespacedName{
Namespace: co.Namespace,
Name: co.Name,
}
destinationDefs = append(destinationDefs, renderClusterOutput(co, in.SecretLoaderFactory))
}
for _, o := range in.Outputs {
Expand All @@ -88,10 +97,16 @@ func configRenderer(in Input) (render.Renderer, error) {

logDefs := make([]render.Renderer, 0, len(in.ClusterFlows)+len(in.Flows))
for _, cf := range in.ClusterFlows {
logDefs = append(logDefs, renderClusterFlow(sourceName, cf, in.SecretLoaderFactory))
if err := validateClusterOutputs(clusterOutputRefs, client.ObjectKeyFromObject(&cf).String(), cf.Spec.GlobalOutputRefs); err != nil {
errs = errors.Append(errs, err)
}
logDefs = append(logDefs, renderClusterFlow(clusterOutputRefs, sourceName, cf, in.SecretLoaderFactory))
}
for _, f := range in.Flows {
logDefs = append(logDefs, renderFlow(in.Logging.Spec.ControlNamespace, sourceName, keyDelim(in.Logging.Spec.SyslogNGSpec.JSONKeyDelimiter), f, in.SecretLoaderFactory))
if err := validateClusterOutputs(clusterOutputRefs, client.ObjectKeyFromObject(&f).String(), f.Spec.GlobalOutputRefs); err != nil {
errs = errors.Append(errs, err)
}
logDefs = append(logDefs, renderFlow(clusterOutputRefs, sourceName, keyDelim(in.Logging.Spec.SyslogNGSpec.JSONKeyDelimiter), f, in.SecretLoaderFactory))
}

if in.Logging.Spec.SyslogNGSpec.JSONKeyPrefix == "" {
Expand Down Expand Up @@ -132,7 +147,7 @@ func configRenderer(in Input) (render.Renderer, error) {
func(rnd render.Renderer) bool { return rnd != nil },
),
render.Line(render.Empty),
)), nil
)), errs
}

func versionStmt(version string) render.Renderer {
Expand Down
131 changes: 131 additions & 0 deletions pkg/sdk/logging/model/syslogng/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,137 @@ destination "output_default_my-output" {
};
`,
},
"clusteroutput with flow ref": {
input: Input{
Logging: v1beta1.Logging{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1beta1.LoggingSpec{
SyslogNGSpec: &v1beta1.SyslogNGSpec{},
ControlNamespace: "logging",
},
},
Flows: []v1beta1.SyslogNGFlow{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-flow",
},
Spec: v1beta1.SyslogNGFlowSpec{
GlobalOutputRefs: []string{
"clusterout",
},
},
},
},
ClusterOutputs: []v1beta1.SyslogNGClusterOutput{
{
ObjectMeta: metav1.ObjectMeta{
Name: "clusterout",
Namespace: "logging",
},
Spec: v1beta1.SyslogNGClusterOutputSpec{
SyslogNGOutputSpec: v1beta1.SyslogNGOutputSpec{
Syslog: &output.SyslogOutput{
Host: "127.0.0.1",
},
},
},
},
},
SecretLoaderFactory: &TestSecretLoaderFactory{},
SourcePort: 601,
},
wantOut: `@version: current

@include "scl.conf"

source "main_input" {
channel {
source {
network(flags("no-parse") port(601) transport("tcp"));
};
parser {
json-parser(prefix("json."));
};
};
};

destination "clusteroutput_logging_clusterout" {
syslog("127.0.0.1" persist_name("clusteroutput_logging_clusterout"));
};

filter "flow_default_test-flow_ns_filter" {
match("default" value("json.kubernetes.namespace_name") type("string"));
};
log {
source("main_input");
filter("flow_default_test-flow_ns_filter");
destination("clusteroutput_logging_clusterout");
};
`,
},
"flow referencing non-existent cluster output": {
input: Input{
Logging: v1beta1.Logging{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1beta1.LoggingSpec{
SyslogNGSpec: &v1beta1.SyslogNGSpec{},
ControlNamespace: "logging",
},
},
Flows: []v1beta1.SyslogNGFlow{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-flow",
},
Spec: v1beta1.SyslogNGFlowSpec{
GlobalOutputRefs: []string{
"clusterout",
},
},
},
},
ClusterOutputs: nil,
SecretLoaderFactory: &TestSecretLoaderFactory{},
SourcePort: 601,
},
wantErr: true,
},
"clusterFlow referencing non-existent cluster output": {
input: Input{
Logging: v1beta1.Logging{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1beta1.LoggingSpec{
SyslogNGSpec: &v1beta1.SyslogNGSpec{},
ControlNamespace: "logging",
},
},
ClusterFlows: []v1beta1.SyslogNGClusterFlow{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-flow",
},
Spec: v1beta1.SyslogNGClusterFlowSpec{
GlobalOutputRefs: []string{
"clusterout",
},
},
},
},
ClusterOutputs: nil,
SecretLoaderFactory: &TestSecretLoaderFactory{},
SourcePort: 601,
},
wantErr: true,
},
"parser": {
input: Input{
Logging: v1beta1.Logging{
Expand Down
28 changes: 22 additions & 6 deletions pkg/sdk/logging/model/syslogng/config/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,28 @@ import (
"strconv"
"strings"

"emperror.dev/errors"
"github.com/cisco-open/operator-tools/pkg/secret"
"github.com/siliconbrain/go-seqs/seqs"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/model"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render"
filter "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/filter"
"github.com/siliconbrain/go-seqs/seqs"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func renderClusterFlow(sourceName string, f v1beta1.SyslogNGClusterFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer {
func validateClusterOutputs(clusterOutputRefs map[string]types.NamespacedName, flow string, globalOutputRefs []string) error {
return seqs.Reduce(seqs.FromSlice(globalOutputRefs), nil, func(err error, ref string) error {
if _, ok := clusterOutputRefs[ref]; !ok {
return errors.Append(err, errors.Errorf("cluster output reference %s for flow %s cannot be found", ref, flow))
}
return err
})
}

func renderClusterFlow(clusterOutputRefs map[string]types.NamespacedName, sourceName string, f v1beta1.SyslogNGClusterFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer {
baseName := fmt.Sprintf("clusterflow_%s_%s", f.Namespace, f.Name)
matchName := fmt.Sprintf("%s_match", baseName)
filterDefs := seqs.MapWithIndex(seqs.FromSlice(f.Spec.Filters), func(idx int, flt v1beta1.SyslogNGFilter) render.Renderer {
Expand All @@ -48,12 +60,14 @@ func renderClusterFlow(sourceName string, f v1beta1.SyslogNGClusterFlow, secretL
return parenDefStmt(filterKind(flt), render.Literal(filterID(flt, idx, baseName)))
}),
)),
seqs.ToSlice(seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string { return clusterOutputDestName(f.Namespace, ref) })),
seqs.ToSlice(seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string {
return clusterOutputDestName(clusterOutputRefs[ref].Namespace, ref)
})),
),
)
}

func renderFlow(controlNS string, sourceName string, keyDelim string, f v1beta1.SyslogNGFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer {
func renderFlow(clusterOutputRefs map[string]types.NamespacedName, sourceName string, keyDelim string, f v1beta1.SyslogNGFlow, secretLoaderFactory SecretLoaderFactory) render.Renderer {
baseName := fmt.Sprintf("flow_%s_%s", f.Namespace, f.Name)
matchName := fmt.Sprintf("%s_match", baseName)
nsFilterName := fmt.Sprintf("%s_ns_filter", baseName)
Expand All @@ -80,7 +94,9 @@ func renderFlow(controlNS string, sourceName string, keyDelim string, f v1beta1.
}),
)),
seqs.ToSlice(seqs.Concat(
seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string { return clusterOutputDestName(f.Namespace, ref) }),
seqs.Map(seqs.FromSlice(f.Spec.GlobalOutputRefs), func(ref string) string {
return clusterOutputDestName(clusterOutputRefs[ref].Namespace, ref)
}),
seqs.Map(seqs.FromSlice(f.Spec.LocalOutputRefs), func(ref string) string { return outputDestName(f.Namespace, ref) }),
)),
),
Expand Down
7 changes: 4 additions & 3 deletions pkg/sdk/logging/model/syslogng/config/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"strings"
"testing"

"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
"github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/config/render"
)

func TestRenderClusterFlow(t *testing.T) {
Expand Down Expand Up @@ -65,7 +66,7 @@ source("test_input");
testCase := testCase
t.Run(name, func(t *testing.T) {
out := strings.Builder{}
require.NoError(t, renderClusterFlow("test_input", testCase.clusterFlow, nil)(render.RenderContext{
require.NoError(t, renderClusterFlow(nil, "test_input", testCase.clusterFlow, nil)(render.RenderContext{
Out: &out,
}))
assert.Equal(t, testCase.expected, out.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/sdk/logging/model/syslogng/config/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kube-logging/logging-operator/pkg/sdk/logging/model/syslogng/c
go 1.20

require (
emperror.dev/errors v0.8.1
github.com/cisco-open/operator-tools v0.30.0
github.com/kube-logging/logging-operator/pkg/sdk v0.9.1
github.com/siliconbrain/go-seqs v0.5.0
Expand All @@ -14,7 +15,6 @@ require (
)

require (
emperror.dev/errors v0.8.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down