From 86379eb41cddc69ac9ef5adc44332294d8327680 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Thu, 16 Nov 2023 14:58:26 +0100 Subject: [PATCH 1/5] Allow to start FLP directly from the flow logs producer - New "InProcess" ingest stage - New entry point to start the whole FLP in-process: pipeline.StartFLPInProcess - Add some tests --- cmd/flowlogs-pipeline/main.go | 26 +---- pkg/config/pipeline_builder.go | 11 +++ pkg/pipeline/encode/encode_prom.go | 15 +-- pkg/pipeline/ingest/ingest_inprocess.go | 42 ++++++++ pkg/pipeline/inprocess.go | 32 ++++++ pkg/pipeline/inprocess_test.go | 125 ++++++++++++++++++++++++ pkg/pipeline/pipeline.go | 23 +++-- pkg/pipeline/pipeline_builder.go | 21 +++- pkg/pipeline/utils/prom_server.go | 52 ---------- pkg/prometheus/prom_server.go | 91 +++++++++++++++++ 10 files changed, 339 insertions(+), 99 deletions(-) create mode 100644 pkg/pipeline/ingest/ingest_inprocess.go create mode 100644 pkg/pipeline/inprocess.go create mode 100644 pkg/pipeline/inprocess_test.go delete mode 100644 pkg/pipeline/utils/prom_server.go create mode 100644 pkg/prometheus/prom_server.go diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go index eb618bb63..4e5382cc0 100644 --- a/cmd/flowlogs-pipeline/main.go +++ b/cmd/flowlogs-pipeline/main.go @@ -19,7 +19,6 @@ package main import ( "context" - "crypto/tls" "encoding/json" "fmt" "net/http" @@ -35,7 +34,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" - "github.com/prometheus/client_golang/prometheus" + "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -182,27 +181,8 @@ func run() { // Setup (threads) exit manager utils.SetupElegantExit() - - // set up private prometheus registry - if cfg.MetricsSettings.SuppressGoMetrics { - reg := prometheus.NewRegistry() - prometheus.DefaultRegisterer = reg - prometheus.DefaultGatherer = reg - } - - // create prometheus server for operational metrics - // if value of address is empty, then by default it will take 0.0.0.0 - addr := fmt.Sprintf("%s:%v", cfg.MetricsSettings.Address, cfg.MetricsSettings.Port) - log.Infof("startServer: addr = %s", addr) - promServer := &http.Server{ - Addr: addr, - // TLS clients must use TLS 1.2 or higher - TLSConfig: &tls.Config{ - MinVersion: tls.VersionTLS12, - }, - } - tlsConfig := cfg.MetricsSettings.TLS - go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic, prometheus.DefaultGatherer.(*prometheus.Registry)) + prometheus.SetGlobalMetricsSettings(&cfg.MetricsSettings) + promServer := prometheus.StartServerAsync(&cfg.MetricsSettings.PromConnectionInfo, nil) // Create new flows pipeline mainPipeline, err = pipeline.NewPipeline(&cfg) diff --git a/pkg/config/pipeline_builder.go b/pkg/config/pipeline_builder.go index f52dbc95e..1ed13b104 100644 --- a/pkg/config/pipeline_builder.go +++ b/pkg/config/pipeline_builder.go @@ -48,6 +48,8 @@ type PipelineBuilderStage struct { pipeline *pipeline } +const PresetIngesterStage = "preset-ingester" + // NewPipeline creates a new pipeline from an existing ingest func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) { if ingest.Collector != nil { @@ -89,6 +91,15 @@ func NewKafkaPipeline(name string, ingest api.IngestKafka) PipelineBuilderStage return PipelineBuilderStage{pipeline: &p, lastStage: name} } +// NewPresetIngesterPipeline creates a new partial pipeline without ingest stage +func NewPresetIngesterPipeline() PipelineBuilderStage { + p := pipeline{ + stages: []Stage{}, + config: []StageParam{}, + } + return PipelineBuilderStage{pipeline: &p, lastStage: PresetIngesterStage} +} + func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuilderStage { b.pipeline.stages = append(b.pipeline.stages, Stage{Name: name, Follows: b.lastStage}) b.pipeline.config = append(b.pipeline.config, param) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index f4812140b..a33886633 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -18,9 +18,7 @@ package encode import ( - "crypto/tls" "fmt" - "net/http" "strings" "time" @@ -28,6 +26,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" "github.com/netobserv/flowlogs-pipeline/pkg/utils" "github.com/prometheus/client_golang/prometheus" @@ -279,17 +278,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En if cfg.PromConnectionInfo != nil { registry := prometheus.NewRegistry() registerer = registry - addr := fmt.Sprintf("%s:%v", cfg.PromConnectionInfo.Address, cfg.PromConnectionInfo.Port) - log.Infof("startServer: addr = %s", addr) - promServer := &http.Server{ - Addr: addr, - // TLS clients must use TLS 1.2 or higher - TLSConfig: &tls.Config{ - MinVersion: tls.VersionTLS12, - }, - } - tlsConfig := cfg.PromConnectionInfo.TLS - go putils.StartPromServer(tlsConfig, promServer, true, registry) + promserver.StartServerAsync(cfg.PromConnectionInfo, nil) } else { registerer = prometheus.DefaultRegisterer } diff --git a/pkg/pipeline/ingest/ingest_inprocess.go b/pkg/pipeline/ingest/ingest_inprocess.go new file mode 100644 index 000000000..44d15652a --- /dev/null +++ b/pkg/pipeline/ingest/ingest_inprocess.go @@ -0,0 +1,42 @@ +package ingest + +import ( + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" + + "github.com/netobserv/netobserv-ebpf-agent/pkg/decode" + "github.com/sirupsen/logrus" +) + +var ilog = logrus.WithField("component", "ingest.InProcess") + +// InProcess ingester, meant to be imported and used from another program via +type InProcess struct { + flowPackets chan *pbflow.Records +} + +func NewInProcess(flowPackets chan *pbflow.Records) *InProcess { + return &InProcess{flowPackets: flowPackets} +} + +func (d *InProcess) Ingest(out chan<- config.GenericMap) { + go func() { + <-utils.ExitChannel() + d.Close() + }() + for fp := range d.flowPackets { + ilog.Debugf("Ingested %v records", len(fp.Entries)) + for _, entry := range fp.Entries { + out <- decode.PBFlowToMap(entry) + } + } +} + +func (d *InProcess) Write(record *pbflow.Records) { + d.flowPackets <- record +} + +func (d *InProcess) Close() { + close(d.flowPackets) +} diff --git a/pkg/pipeline/inprocess.go b/pkg/pipeline/inprocess.go new file mode 100644 index 000000000..aec30462c --- /dev/null +++ b/pkg/pipeline/inprocess.go @@ -0,0 +1,32 @@ +package pipeline + +import ( + "context" + "fmt" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" + "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" +) + +// StartFLPInProcess is an entry point to start the whole FLP / pipeline processing from imported code +func StartFLPInProcess(cfg config.ConfigFileStruct) (*ingest.InProcess, error) { + prometheus.SetGlobalMetricsSettings(&cfg.MetricsSettings) + promServer := prometheus.StartServerAsync(&cfg.MetricsSettings.PromConnectionInfo, nil) + + // Create new flows pipeline + ingester := ingest.NewInProcess(make(chan *pbflow.Records, 100)) + flp, err := newPipelineFromIngester(&cfg, ingester) + if err != nil { + return nil, fmt.Errorf("failed to initialize pipeline %w", err) + } + + // Starts the flows pipeline; blocking call + go func() { + flp.Run() + _ = promServer.Shutdown(context.Background()) + }() + + return ingester, nil +} diff --git a/pkg/pipeline/inprocess_test.go b/pkg/pipeline/inprocess_test.go new file mode 100644 index 000000000..69603ae4f --- /dev/null +++ b/pkg/pipeline/inprocess_test.go @@ -0,0 +1,125 @@ +package pipeline + +import ( + "bufio" + "encoding/json" + "os" + "testing" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestInProcessFLP(t *testing.T) { + pipeline := config.NewPresetIngesterPipeline() + pipeline = pipeline.WriteStdout("writer", api.WriteStdout{Format: "json"}) + cfs := config.ConfigFileStruct{ + Pipeline: pipeline.GetStages(), + Parameters: pipeline.GetStageParams(), + } + ingester, err := StartFLPInProcess(cfs) + require.NoError(t, err) + defer ingester.Close() + + capturedOut, w, _ := os.Pipe() + old := os.Stdout + os.Stdout = w + defer func() { + os.Stdout = old + }() + + // yield thread to allow pipe services correctly start + time.Sleep(10 * time.Millisecond) + + startTime := time.Now() + endTime := startTime.Add(7 * time.Second) + someDuration := endTime.Sub(startTime) + + ingester.Write(&pbflow.Records{ + Entries: []*pbflow.Record{{ + Interface: "eth0", + EthProtocol: 2048, + Bytes: 456, + Packets: 123, + Direction: pbflow.Direction_EGRESS, + TimeFlowStart: timestamppb.New(startTime), + TimeFlowEnd: timestamppb.New(endTime), + Network: &pbflow.Network{ + SrcAddr: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304}, + }, + DstAddr: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708}, + }, + Dscp: 1, + }, + DataLink: &pbflow.DataLink{ + DstMac: 0x112233445566, + SrcMac: 0x010203040506, + }, + Transport: &pbflow.Transport{ + Protocol: 17, + SrcPort: 23000, + DstPort: 443, + }, + AgentIp: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x0a0b0c0d}, + }, + PktDropBytes: 100, + PktDropPackets: 10, + PktDropLatestFlags: 1, + PktDropLatestState: 1, + PktDropLatestDropCause: 8, + DnsLatency: durationpb.New(someDuration), + DnsId: 1, + DnsFlags: 0x80, + DnsErrno: 0, + TimeFlowRtt: durationpb.New(someDuration), + }}, + }) + + scanner := bufio.NewScanner(capturedOut) + require.True(t, scanner.Scan()) + capturedRecord := map[string]interface{}{} + bytes := scanner.Bytes() + require.NoError(t, json.Unmarshal(bytes, &capturedRecord), string(bytes)) + + assert.NotZero(t, capturedRecord["TimeReceived"]) + delete(capturedRecord, "TimeReceived") + assert.EqualValues(t, map[string]interface{}{ + "FlowDirection": float64(1), + "Bytes": float64(456), + "SrcAddr": "1.2.3.4", + "DstAddr": "5.6.7.8", + "Dscp": float64(1), + "DstMac": "11:22:33:44:55:66", + "SrcMac": "01:02:03:04:05:06", + "SrcPort": float64(23000), + "DstPort": float64(443), + "Duplicate": false, + "Etype": float64(2048), + "Packets": float64(123), + "Proto": float64(17), + "TimeFlowStartMs": float64(startTime.UnixMilli()), + "TimeFlowEndMs": float64(endTime.UnixMilli()), + "Interface": "eth0", + "AgentIP": "10.11.12.13", + "PktDropBytes": float64(100), + "PktDropPackets": float64(10), + "PktDropLatestFlags": float64(1), + "PktDropLatestState": "TCP_ESTABLISHED", + "PktDropLatestDropCause": "SKB_DROP_REASON_NETFILTER_DROP", + "DnsLatencyMs": float64(someDuration.Milliseconds()), + "DnsId": float64(1), + "DnsFlags": float64(0x80), + "DnsErrno": float64(0), + "DnsFlagsResponseCode": "NoError", + "TimeFlowRttNs": float64(someDuration.Nanoseconds()), + }, capturedRecord) +} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index e9e923116..12cadfce5 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -22,6 +22,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" "github.com/netobserv/gopipes/pkg/node" log "github.com/sirupsen/logrus" ) @@ -48,18 +49,24 @@ type Pipeline struct { // NewPipeline defines the pipeline elements func NewPipeline(cfg *config.ConfigFileStruct) (*Pipeline, error) { - log.Debugf("entering NewPipeline") + return newPipelineFromIngester(cfg, nil) +} + +// newPipelineFromIngester defines the pipeline elements from a preset ingester (e.g. for in-process receiver) +func newPipelineFromIngester(cfg *config.ConfigFileStruct, ing ingest.Ingester) (*Pipeline, error) { + log.Debugf("entering newPipelineFromIngester") - stages := cfg.Pipeline - log.Debugf("stages = %v ", stages) - configParams := cfg.Parameters - log.Debugf("configParams = %v ", configParams) + log.Debugf("stages = %v ", cfg.Pipeline) + log.Debugf("configParams = %v ", cfg.Parameters) - build := newBuilder(cfg) - if err := build.readStages(); err != nil { + builder := newBuilder(cfg) + if ing != nil { + builder.presetIngester(ing) + } + if err := builder.readStages(); err != nil { return nil, err } - return build.build() + return builder.build() } func (p *Pipeline) Run() { diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index ac752e645..a3141a6f2 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -98,6 +98,17 @@ func newBuilder(cfg *config.ConfigFileStruct) *builder { } } +// use a preset ingester +func (b *builder) presetIngester(ing ingest.Ingester) { + name := config.PresetIngesterStage + log.Debugf("stage = %v", name) + b.appendEntry(pipelineEntry{ + stageName: name, + stageType: StageIngest, + Ingester: ing, + }) +} + // read the configuration stages definition and instantiate the corresponding native Go objects func (b *builder) readStages() error { for _, param := range b.configParams { @@ -124,14 +135,18 @@ func (b *builder) readStages() error { if err != nil { return err } - b.pipelineEntryMap[param.Name] = &pEntry - b.pipelineStages = append(b.pipelineStages, &pEntry) - log.Debugf("pipeline = %v", b.pipelineStages) + b.appendEntry(pEntry) } log.Debugf("pipeline = %v", b.pipelineStages) return nil } +func (b *builder) appendEntry(pEntry pipelineEntry) { + b.pipelineEntryMap[pEntry.stageName] = &pEntry + b.pipelineStages = append(b.pipelineStages, &pEntry) + log.Debugf("pipeline = %v", b.pipelineStages) +} + // reads the configured Go stages and connects between them // readStages must be invoked before this func (b *builder) build() (*Pipeline, error) { diff --git a/pkg/pipeline/utils/prom_server.go b/pkg/pipeline/utils/prom_server.go deleted file mode 100644 index 17d0e8c7d..000000000 --- a/pkg/pipeline/utils/prom_server.go +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (C) 2023 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package utils - -import ( - "net/http" - "os" - - "github.com/netobserv/flowlogs-pipeline/pkg/api" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/sirupsen/logrus" -) - -// StartPromServer listens for prometheus resource usage requests -func StartPromServer(tlsConfig *api.PromTLSConf, server *http.Server, panicOnError bool, reg *prometheus.Registry) { - logrus.Debugf("entering StartPromServer") - - // The Handler function provides a default handler to expose metrics - // via an HTTP server. "/metrics" is the usual endpoint for that. - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) - server.Handler = mux - - var err error - if tlsConfig != nil { - err = server.ListenAndServeTLS(tlsConfig.CertPath, tlsConfig.KeyPath) - } else { - err = server.ListenAndServe() - } - if err != nil && err != http.ErrServerClosed { - logrus.Errorf("error in http.ListenAndServe: %v", err) - if panicOnError { - os.Exit(1) - } - } -} diff --git a/pkg/prometheus/prom_server.go b/pkg/prometheus/prom_server.go new file mode 100644 index 000000000..9bde92b89 --- /dev/null +++ b/pkg/prometheus/prom_server.go @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package prometheus + +import ( + "crypto/tls" + "fmt" + "net/http" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +var ( + plog = logrus.WithField("component", "prometheus") + maybePanic = plog.Fatalf +) + +// SetGlobalMetricsSettings initializes some global settings used later when starting server(s) +func SetGlobalMetricsSettings(settings *config.MetricsSettings) { + if settings.SuppressGoMetrics { + // set up private prometheus registry + r := prom.NewRegistry() + prom.DefaultRegisterer = r + prom.DefaultGatherer = r + } + if settings.NoPanic { + maybePanic = plog.Errorf + } +} + +// StartServerAsync listens for prometheus resource usage requests +func StartServerAsync(conn *api.PromConnectionInfo, registry *prom.Registry) *http.Server { + // create prometheus server for operational metrics + // if value of address is empty, then by default it will take 0.0.0.0 + port := conn.Port + if port == 0 { + port = 9090 + } + addr := fmt.Sprintf("%s:%v", conn.Address, port) + plog.Infof("StartServerAsync: addr = %s", addr) + + httpServer := http.Server{ + Addr: addr, + // TLS clients must use TLS 1.2 or higher + TLSConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + }, + } + // The Handler function provides a default handler to expose metrics + // via an HTTP server. "/metrics" is the usual endpoint for that. + mux := http.NewServeMux() + if registry == nil { + mux.Handle("/metrics", promhttp.Handler()) + } else { + mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) + } + httpServer.Handler = mux + + go func() { + var err error + if conn.TLS != nil { + err = httpServer.ListenAndServeTLS(conn.TLS.CertPath, conn.TLS.KeyPath) + } else { + err = httpServer.ListenAndServe() + } + if err != nil && err != http.ErrServerClosed { + maybePanic("error in http.ListenAndServe: %v", err) + } + }() + + return &httpServer +} From 2cd36250a953a3937bc6e1f5bad2df2b56d955ab Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Thu, 16 Nov 2023 15:18:16 +0100 Subject: [PATCH 2/5] Add builder functions ToConfig / IntoConfig --- pkg/confgen/flowlogs2metrics_config.go | 8 +++----- pkg/config/pipeline_builder.go | 12 ++++++++++++ pkg/pipeline/ingest/ingest_inprocess.go | 3 ++- pkg/pipeline/inprocess.go | 4 ++-- pkg/pipeline/inprocess_test.go | 6 +----- 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 8582a04e3..c31635cda 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -63,15 +63,13 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct { if cg.config.Write.Loki != nil { forkedNode.WriteLoki("write_loki", *cg.config.Write.Loki) } - return &config.ConfigFileStruct{ - LogLevel: "error", - Pipeline: pipeline.GetStages(), - Parameters: pipeline.GetStageParams(), + return pipeline.IntoConfigFileStruct(&config.ConfigFileStruct{ + LogLevel: "error", MetricsSettings: config.MetricsSettings{ PromConnectionInfo: api.PromConnectionInfo{Port: 9102}, Prefix: "flp_op_", }, - } + }) } func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam { diff --git a/pkg/config/pipeline_builder.go b/pkg/config/pipeline_builder.go index 1ed13b104..796da0631 100644 --- a/pkg/config/pipeline_builder.go +++ b/pkg/config/pipeline_builder.go @@ -175,3 +175,15 @@ func (b *PipelineBuilderStage) GetStages() []Stage { func (b *PipelineBuilderStage) GetStageParams() []StageParam { return b.pipeline.config } + +// IntoConfigFileStruct injects the current pipeline and params in the provided ConfigFileStruct object. +func (b *PipelineBuilderStage) IntoConfigFileStruct(cfs *ConfigFileStruct) *ConfigFileStruct { + cfs.Pipeline = b.GetStages() + cfs.Parameters = b.GetStageParams() + return cfs +} + +// ToConfigFileStruct returns the current pipeline and params as a new ConfigFileStruct object. +func (b *PipelineBuilderStage) ToConfigFileStruct() *ConfigFileStruct { + return b.IntoConfigFileStruct(&ConfigFileStruct{}) +} diff --git a/pkg/pipeline/ingest/ingest_inprocess.go b/pkg/pipeline/ingest/ingest_inprocess.go index 44d15652a..3ece6b173 100644 --- a/pkg/pipeline/ingest/ingest_inprocess.go +++ b/pkg/pipeline/ingest/ingest_inprocess.go @@ -11,7 +11,8 @@ import ( var ilog = logrus.WithField("component", "ingest.InProcess") -// InProcess ingester, meant to be imported and used from another program via +// InProcess ingester is meant to be imported and used from another program +// via pipeline.StartFLPInProcess type InProcess struct { flowPackets chan *pbflow.Records } diff --git a/pkg/pipeline/inprocess.go b/pkg/pipeline/inprocess.go index aec30462c..1df783c9f 100644 --- a/pkg/pipeline/inprocess.go +++ b/pkg/pipeline/inprocess.go @@ -11,13 +11,13 @@ import ( ) // StartFLPInProcess is an entry point to start the whole FLP / pipeline processing from imported code -func StartFLPInProcess(cfg config.ConfigFileStruct) (*ingest.InProcess, error) { +func StartFLPInProcess(cfg *config.ConfigFileStruct) (*ingest.InProcess, error) { prometheus.SetGlobalMetricsSettings(&cfg.MetricsSettings) promServer := prometheus.StartServerAsync(&cfg.MetricsSettings.PromConnectionInfo, nil) // Create new flows pipeline ingester := ingest.NewInProcess(make(chan *pbflow.Records, 100)) - flp, err := newPipelineFromIngester(&cfg, ingester) + flp, err := newPipelineFromIngester(cfg, ingester) if err != nil { return nil, fmt.Errorf("failed to initialize pipeline %w", err) } diff --git a/pkg/pipeline/inprocess_test.go b/pkg/pipeline/inprocess_test.go index 69603ae4f..428e68902 100644 --- a/pkg/pipeline/inprocess_test.go +++ b/pkg/pipeline/inprocess_test.go @@ -19,11 +19,7 @@ import ( func TestInProcessFLP(t *testing.T) { pipeline := config.NewPresetIngesterPipeline() pipeline = pipeline.WriteStdout("writer", api.WriteStdout{Format: "json"}) - cfs := config.ConfigFileStruct{ - Pipeline: pipeline.GetStages(), - Parameters: pipeline.GetStageParams(), - } - ingester, err := StartFLPInProcess(cfs) + ingester, err := StartFLPInProcess(pipeline.ToConfigFileStruct()) require.NoError(t, err) defer ingester.Close() From dbe2422c3e9a42f549313856c680fcf37a2d3fc3 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Mon, 20 Nov 2023 15:08:11 +0100 Subject: [PATCH 3/5] Allow disabling operational metrics --- cmd/flowlogs-pipeline/main.go | 7 ++++--- pkg/config/config.go | 7 ++++--- pkg/pipeline/inprocess.go | 7 ++++--- pkg/prometheus/prom_server.go | 14 +++++++++----- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go index 4e5382cc0..a2fa5246e 100644 --- a/cmd/flowlogs-pipeline/main.go +++ b/cmd/flowlogs-pipeline/main.go @@ -181,8 +181,7 @@ func run() { // Setup (threads) exit manager utils.SetupElegantExit() - prometheus.SetGlobalMetricsSettings(&cfg.MetricsSettings) - promServer := prometheus.StartServerAsync(&cfg.MetricsSettings.PromConnectionInfo, nil) + promServer := prometheus.InitializePrometheus(&cfg.MetricsSettings) // Create new flows pipeline mainPipeline, err = pipeline.NewPipeline(&cfg) @@ -205,7 +204,9 @@ func run() { // Starts the flows pipeline mainPipeline.Run() - _ = promServer.Shutdown(context.Background()) + if promServer != nil { + _ = promServer.Shutdown(context.Background()) + } // Give all threads a chance to exit and then exit the process time.Sleep(time.Second) diff --git a/pkg/config/config.go b/pkg/config/config.go index bbd18b850..d2c0242cc 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -56,9 +56,10 @@ type Profile struct { // will help configuring common setting for all PromEncode stages - PromEncode settings would then act as overrides. type MetricsSettings struct { api.PromConnectionInfo - Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"` - NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"` - SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"` + DisableGlobalServer bool `yaml:"disableGlobalServer,omitempty" json:"disableGlobalServer,omitempty" doc:"disabling the global metrics server makes operational metrics unavailable. If prometheus-encoding stages are defined, they need to contain their own metrics server parameters."` + Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"` + NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"` + SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"` } // PerfSettings allows setting some internal configuration parameters diff --git a/pkg/pipeline/inprocess.go b/pkg/pipeline/inprocess.go index 1df783c9f..ef53b8c58 100644 --- a/pkg/pipeline/inprocess.go +++ b/pkg/pipeline/inprocess.go @@ -12,8 +12,7 @@ import ( // StartFLPInProcess is an entry point to start the whole FLP / pipeline processing from imported code func StartFLPInProcess(cfg *config.ConfigFileStruct) (*ingest.InProcess, error) { - prometheus.SetGlobalMetricsSettings(&cfg.MetricsSettings) - promServer := prometheus.StartServerAsync(&cfg.MetricsSettings.PromConnectionInfo, nil) + promServer := prometheus.InitializePrometheus(&cfg.MetricsSettings) // Create new flows pipeline ingester := ingest.NewInProcess(make(chan *pbflow.Records, 100)) @@ -25,7 +24,9 @@ func StartFLPInProcess(cfg *config.ConfigFileStruct) (*ingest.InProcess, error) // Starts the flows pipeline; blocking call go func() { flp.Run() - _ = promServer.Shutdown(context.Background()) + if promServer != nil { + _ = promServer.Shutdown(context.Background()) + } }() return ingester, nil diff --git a/pkg/prometheus/prom_server.go b/pkg/prometheus/prom_server.go index 9bde92b89..f0a261d86 100644 --- a/pkg/prometheus/prom_server.go +++ b/pkg/prometheus/prom_server.go @@ -34,17 +34,21 @@ var ( maybePanic = plog.Fatalf ) -// SetGlobalMetricsSettings initializes some global settings used later when starting server(s) -func SetGlobalMetricsSettings(settings *config.MetricsSettings) { +// InitializePrometheus starts the global Prometheus server, used for operational metrics and prom-encode stages if they don't override the server settings +func InitializePrometheus(settings *config.MetricsSettings) *http.Server { + if settings.NoPanic { + maybePanic = plog.Errorf + } + if settings.DisableGlobalServer { + return nil + } if settings.SuppressGoMetrics { // set up private prometheus registry r := prom.NewRegistry() prom.DefaultRegisterer = r prom.DefaultGatherer = r } - if settings.NoPanic { - maybePanic = plog.Errorf - } + return StartServerAsync(&settings.PromConnectionInfo, nil) } // StartServerAsync listens for prometheus resource usage requests From 7f50a9321191adf4d20840196512f168389778d0 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Mon, 20 Nov 2023 16:20:13 +0100 Subject: [PATCH 4/5] Add log when prom disabled --- pkg/prometheus/prom_server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/prometheus/prom_server.go b/pkg/prometheus/prom_server.go index f0a261d86..d40000bc0 100644 --- a/pkg/prometheus/prom_server.go +++ b/pkg/prometheus/prom_server.go @@ -40,6 +40,7 @@ func InitializePrometheus(settings *config.MetricsSettings) *http.Server { maybePanic = plog.Errorf } if settings.DisableGlobalServer { + plog.Info("Disabled global Prometheus server - no operational metrics will be available") return nil } if settings.SuppressGoMetrics { From 0ca3b67ee8bdbcbeaeb102769d80032bc3dd2a7a Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Tue, 21 Nov 2023 12:43:48 +0100 Subject: [PATCH 5/5] InProcess ingest now takes in channel as argument, directly with the GenericMap --- pkg/pipeline/ingest/ingest_inprocess.go | 24 ++----- pkg/pipeline/inprocess.go | 9 ++- pkg/pipeline/inprocess_test.go | 96 ++++--------------------- 3 files changed, 24 insertions(+), 105 deletions(-) diff --git a/pkg/pipeline/ingest/ingest_inprocess.go b/pkg/pipeline/ingest/ingest_inprocess.go index 3ece6b173..266b26247 100644 --- a/pkg/pipeline/ingest/ingest_inprocess.go +++ b/pkg/pipeline/ingest/ingest_inprocess.go @@ -3,22 +3,16 @@ package ingest import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" - "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" - - "github.com/netobserv/netobserv-ebpf-agent/pkg/decode" - "github.com/sirupsen/logrus" ) -var ilog = logrus.WithField("component", "ingest.InProcess") - // InProcess ingester is meant to be imported and used from another program // via pipeline.StartFLPInProcess type InProcess struct { - flowPackets chan *pbflow.Records + in chan config.GenericMap } -func NewInProcess(flowPackets chan *pbflow.Records) *InProcess { - return &InProcess{flowPackets: flowPackets} +func NewInProcess(in chan config.GenericMap) *InProcess { + return &InProcess{in: in} } func (d *InProcess) Ingest(out chan<- config.GenericMap) { @@ -26,18 +20,10 @@ func (d *InProcess) Ingest(out chan<- config.GenericMap) { <-utils.ExitChannel() d.Close() }() - for fp := range d.flowPackets { - ilog.Debugf("Ingested %v records", len(fp.Entries)) - for _, entry := range fp.Entries { - out <- decode.PBFlowToMap(entry) - } + for rec := range d.in { + out <- rec } } -func (d *InProcess) Write(record *pbflow.Records) { - d.flowPackets <- record -} - func (d *InProcess) Close() { - close(d.flowPackets) } diff --git a/pkg/pipeline/inprocess.go b/pkg/pipeline/inprocess.go index ef53b8c58..9b578610b 100644 --- a/pkg/pipeline/inprocess.go +++ b/pkg/pipeline/inprocess.go @@ -7,18 +7,17 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" - "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" ) // StartFLPInProcess is an entry point to start the whole FLP / pipeline processing from imported code -func StartFLPInProcess(cfg *config.ConfigFileStruct) (*ingest.InProcess, error) { +func StartFLPInProcess(cfg *config.ConfigFileStruct, in chan config.GenericMap) error { promServer := prometheus.InitializePrometheus(&cfg.MetricsSettings) // Create new flows pipeline - ingester := ingest.NewInProcess(make(chan *pbflow.Records, 100)) + ingester := ingest.NewInProcess(in) flp, err := newPipelineFromIngester(cfg, ingester) if err != nil { - return nil, fmt.Errorf("failed to initialize pipeline %w", err) + return fmt.Errorf("failed to initialize pipeline %w", err) } // Starts the flows pipeline; blocking call @@ -29,5 +28,5 @@ func StartFLPInProcess(cfg *config.ConfigFileStruct) (*ingest.InProcess, error) } }() - return ingester, nil + return nil } diff --git a/pkg/pipeline/inprocess_test.go b/pkg/pipeline/inprocess_test.go index 428e68902..5bed5f3df 100644 --- a/pkg/pipeline/inprocess_test.go +++ b/pkg/pipeline/inprocess_test.go @@ -9,19 +9,17 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" ) func TestInProcessFLP(t *testing.T) { pipeline := config.NewPresetIngesterPipeline() pipeline = pipeline.WriteStdout("writer", api.WriteStdout{Format: "json"}) - ingester, err := StartFLPInProcess(pipeline.ToConfigFileStruct()) + in := make(chan config.GenericMap, 100) + defer close(in) + err := StartFLPInProcess(pipeline.ToConfigFileStruct(), in) require.NoError(t, err) - defer ingester.Close() capturedOut, w, _ := os.Pipe() old := os.Stdout @@ -33,52 +31,13 @@ func TestInProcessFLP(t *testing.T) { // yield thread to allow pipe services correctly start time.Sleep(10 * time.Millisecond) - startTime := time.Now() - endTime := startTime.Add(7 * time.Second) - someDuration := endTime.Sub(startTime) - - ingester.Write(&pbflow.Records{ - Entries: []*pbflow.Record{{ - Interface: "eth0", - EthProtocol: 2048, - Bytes: 456, - Packets: 123, - Direction: pbflow.Direction_EGRESS, - TimeFlowStart: timestamppb.New(startTime), - TimeFlowEnd: timestamppb.New(endTime), - Network: &pbflow.Network{ - SrcAddr: &pbflow.IP{ - IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304}, - }, - DstAddr: &pbflow.IP{ - IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708}, - }, - Dscp: 1, - }, - DataLink: &pbflow.DataLink{ - DstMac: 0x112233445566, - SrcMac: 0x010203040506, - }, - Transport: &pbflow.Transport{ - Protocol: 17, - SrcPort: 23000, - DstPort: 443, - }, - AgentIp: &pbflow.IP{ - IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x0a0b0c0d}, - }, - PktDropBytes: 100, - PktDropPackets: 10, - PktDropLatestFlags: 1, - PktDropLatestState: 1, - PktDropLatestDropCause: 8, - DnsLatency: durationpb.New(someDuration), - DnsId: 1, - DnsFlags: 0x80, - DnsErrno: 0, - TimeFlowRtt: durationpb.New(someDuration), - }}, - }) + in <- config.GenericMap{ + "SrcAddr": "1.2.3.4", + "DstAddr": "5.6.7.8", + "Dscp": float64(1), + "DstMac": "11:22:33:44:55:66", + "SrcMac": "01:02:03:04:05:06", + } scanner := bufio.NewScanner(capturedOut) require.True(t, scanner.Scan()) @@ -86,36 +45,11 @@ func TestInProcessFLP(t *testing.T) { bytes := scanner.Bytes() require.NoError(t, json.Unmarshal(bytes, &capturedRecord), string(bytes)) - assert.NotZero(t, capturedRecord["TimeReceived"]) - delete(capturedRecord, "TimeReceived") assert.EqualValues(t, map[string]interface{}{ - "FlowDirection": float64(1), - "Bytes": float64(456), - "SrcAddr": "1.2.3.4", - "DstAddr": "5.6.7.8", - "Dscp": float64(1), - "DstMac": "11:22:33:44:55:66", - "SrcMac": "01:02:03:04:05:06", - "SrcPort": float64(23000), - "DstPort": float64(443), - "Duplicate": false, - "Etype": float64(2048), - "Packets": float64(123), - "Proto": float64(17), - "TimeFlowStartMs": float64(startTime.UnixMilli()), - "TimeFlowEndMs": float64(endTime.UnixMilli()), - "Interface": "eth0", - "AgentIP": "10.11.12.13", - "PktDropBytes": float64(100), - "PktDropPackets": float64(10), - "PktDropLatestFlags": float64(1), - "PktDropLatestState": "TCP_ESTABLISHED", - "PktDropLatestDropCause": "SKB_DROP_REASON_NETFILTER_DROP", - "DnsLatencyMs": float64(someDuration.Milliseconds()), - "DnsId": float64(1), - "DnsFlags": float64(0x80), - "DnsErrno": float64(0), - "DnsFlagsResponseCode": "NoError", - "TimeFlowRttNs": float64(someDuration.Nanoseconds()), + "SrcAddr": "1.2.3.4", + "DstAddr": "5.6.7.8", + "Dscp": float64(1), + "DstMac": "11:22:33:44:55:66", + "SrcMac": "01:02:03:04:05:06", }, capturedRecord) }