Skip to content

Commit

Permalink
Add severity on errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak committed Jan 24, 2025
1 parent 54539c0 commit 33904f8
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 14 deletions.
4 changes: 2 additions & 2 deletions pkg/exporter/grpc_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (g *GRPCProto) ExportFlows(input <-chan []*model.Record) {
for _, pbRecords := range pbflow.FlowsToPB(inputRecords, g.maxFlowsPerMessage) {
log.Debugf("sending %d records", len(pbRecords.Entries))
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage").Inc()
g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage", metrics.HighSeverity).Inc()
log.WithError(err).Error("couldn't send flow records to collector")
}
g.batchCounter.Inc()
Expand All @@ -66,6 +66,6 @@ func (g *GRPCProto) ExportFlows(input <-chan []*model.Record) {
}
if err := g.clientConn.Close(); err != nil {
log.WithError(err).Warn("couldn't close flow export client")
g.metrics.Errors.WithErrorName(componentGRPC, "CannotCloseClient").Inc()
g.metrics.Errors.WithErrorName(componentGRPC, "CannotCloseClient", metrics.MediumSeverity).Inc()
}
}
4 changes: 2 additions & 2 deletions pkg/exporter/kafka_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ func (kp *KafkaProto) batchAndSubmit(records []*model.Record) {
pbBytes, err := proto.Marshal(pbflow.FlowToPB(record))
if err != nil {
klog.WithError(err).Debug("can't encode protobuf message. Ignoring")
kp.Metrics.Errors.WithErrorName(componentKafka, "CannotEncodeMessage").Inc()
kp.Metrics.Errors.WithErrorName(componentKafka, "CannotEncodeMessage", metrics.HighSeverity).Inc()
continue
}
msgs = append(msgs, kafkago.Message{Value: pbBytes, Key: getFlowKey(record)})
}

if err := kp.Writer.WriteMessages(context.TODO(), msgs...); err != nil {
klog.WithError(err).Error("can't write messages into Kafka")
kp.Metrics.Errors.WithErrorName(componentKafka, "CannotWriteMessage").Inc()
kp.Metrics.Errors.WithErrorName(componentKafka, "CannotWriteMessage", metrics.HighSeverity).Inc()
}
kp.Metrics.EvictionCounter.WithSource(componentKafka).Inc()
kp.Metrics.EvictedFlowsCounter.WithSource(componentKafka).Add(float64(len(records)))
Expand Down
4 changes: 2 additions & 2 deletions pkg/flow/tracer_ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ func (m *RingBufTracer) TraceLoop(ctx context.Context) node.StartFunc[*model.Raw
func (m *RingBufTracer) listenAndForwardRingBuffer(debugging bool, forwardCh chan<- *model.RawRecord) error {
event, err := m.ringBuffer.ReadRingBuf()
if err != nil {
m.metrics.Errors.WithErrorName("ringbuffer", "CannotReadRingbuffer").Inc()
m.metrics.Errors.WithErrorName("ringbuffer", "CannotReadRingbuffer", metrics.HighSeverity).Inc()
return fmt.Errorf("reading from ring buffer: %w", err)
}
// Parses the ringbuf event entry into an Event structure.
readFlow, err := model.ReadFrom(bytes.NewBuffer(event.RawSample))
if err != nil {
m.metrics.Errors.WithErrorName("ringbuffer", "CannotParseRingbuffer").Inc()
m.metrics.Errors.WithErrorName("ringbuffer", "CannotParseRingbuffer", metrics.HighSeverity).Inc()
return fmt.Errorf("parsing data received from the ring buffer: %w", err)
}
mapFullError := readFlow.Metrics.Errno == uint8(syscall.E2BIG)
Expand Down
13 changes: 11 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ var (
TypeCounter,
"component",
"error",
"severity",
)
flowEnrichmentCounterCounter = defineMetric(
"flows_enrichment_total",
Expand Down Expand Up @@ -307,6 +308,14 @@ type ErrorCounter struct {
vec *prometheus.CounterVec
}

func (c *ErrorCounter) WithErrorName(component, errName string) prometheus.Counter {
return c.vec.WithLabelValues(component, errName)
type ErrorSeverity string

const (
HighSeverity ErrorSeverity = "high"
MediumSeverity ErrorSeverity = "medium"
LowSeverity ErrorSeverity = "low"
)

func (c *ErrorCounter) WithErrorName(component, errName string, severity ErrorSeverity) prometheus.Counter {
return c.vec.WithLabelValues(component, errName, string(severity))
}
8 changes: 4 additions & 4 deletions pkg/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[ebpf.BpfFlowI
return m.legacyLookupAndDeleteMap(met)
}
log.WithError(err).WithField("flowId", id).Warnf("couldn't lookup/delete flow entry")
met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc()
met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows", metrics.HighSeverity).Inc()
continue
}
flows[id] = model.NewBpfFlowContent(baseMetrics)
Expand All @@ -911,7 +911,7 @@ func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[ebpf.BpfFlowI
return m.legacyLookupAndDeleteMap(met)
}
log.WithError(err).WithField("flowId", id).Warnf("couldn't lookup/delete additional metrics entry")
met.Errors.WithErrorName("flow-fetcher", "CannotDeleteAdditionalMetric").Inc()
met.Errors.WithErrorName("flow-fetcher", "CannotDeleteAdditionalMetric", metrics.HighSeverity).Inc()
continue
}
flow, found := flows[id]
Expand Down Expand Up @@ -959,7 +959,7 @@ func (m *FlowFetcher) ReadGlobalCounter(met *metrics.Metrics) {
ebpf.BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_GROUPID_MISMATCH: met.NetworkEventsCounter.WithSourceAndReason("network-events", "NetworkEventsErrorsGroupIDMismatch"),
ebpf.BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS: met.NetworkEventsCounter.WithSourceAndReason("network-events", "NetworkEventsErrorsFlowMapUpdate"),
ebpf.BpfGlobalCountersKeyTNETWORK_EVENTS_GOOD: met.NetworkEventsCounter.WithSourceAndReason("network-events", "NetworkEventsGoodEvent"),
ebpf.BpfGlobalCountersKeyTOBSERVED_INTF_MISSED: met.Errors.WithErrorName("flow-fetcher", "MaxObservedInterfacesReached"),
ebpf.BpfGlobalCountersKeyTOBSERVED_INTF_MISSED: met.Errors.WithErrorName("flow-fetcher", "MaxObservedInterfacesReached", metrics.LowSeverity),
}
zeroCounters := make([]uint32, cilium.MustPossibleCPU())
for key := ebpf.BpfGlobalCountersKeyT(0); key < ebpf.BpfGlobalCountersKeyTMAX_COUNTERS; key++ {
Expand Down Expand Up @@ -1723,7 +1723,7 @@ func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte
return p.legacyLookupAndDeleteMap(met)
}
log.WithError(err).WithField("packetID", id).Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteEntry").Inc()
met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteEntry", metrics.HighSeverity).Inc()
}
packets[id] = packet
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/tracer/tracer_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (m *FlowFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[ebpf.Bp
count++
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
met.Errors.WithErrorName("flow-fetcher-legacy", "CannotDeleteFlows").Inc()
met.Errors.WithErrorName("flow-fetcher-legacy", "CannotDeleteFlows", metrics.HighSeverity).Inc()
}
flows[id] = model.NewBpfFlowContent(baseMetrics)
}
Expand All @@ -44,7 +44,7 @@ func (p *PacketFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[int][
for iterator.Next(&id, &packet) {
if err := packetMap.Delete(id); err != nil {
log.WithError(err).WithField("packetID ", id).Warnf("couldn't delete entry")
met.Errors.WithErrorName("pkt-fetcher-legacy", "CannotDeleteEntry").Inc()
met.Errors.WithErrorName("pkt-fetcher-legacy", "CannotDeleteEntry", metrics.HighSeverity).Inc()
}
packets[id] = append(packets[id], packet...)
}
Expand Down

0 comments on commit 33904f8

Please sign in to comment.