Skip to content

Commit

Permalink
chore: Enhance to support metrics prefix for OpEx metric name (#42)
Browse files Browse the repository at this point in the history
Signed-off-by: sarabala1979 <[email protected]>
  • Loading branch information
sarabala1979 authored Aug 6, 2024
1 parent 7b68e65 commit 0e8e3a2
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 9 deletions.
7 changes: 6 additions & 1 deletion prometheus-pusher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
METRICS_LABELS = "METRICS_LABELS"
METRICS_NAME = "METRICS_NAME"
EXCLUDE_METRIC_LABELS = "EXCLUDE_METRICS_LABELS"
OPEX_METRIC_PREFIX = "OPEX_METRIC_PREFIX"
)

type prometheusSink struct {
Expand Down Expand Up @@ -198,10 +199,14 @@ func main() {
skipFailedStr := os.Getenv(SKIP_VALIDATION_FAILED)
labels := parseStringToMap(os.Getenv(METRICS_LABELS))
excludeLabels := parseStringToSlice(os.Getenv(EXCLUDE_METRIC_LABELS))
opexMetricsPrefix := os.Getenv(os.Getenv(OPEX_METRIC_PREFIX))
metricName := os.Getenv(METRICS_NAME)
if metricName == "" {
metricName = "namespace_app_rollouts_unified_anomaly"
}
if opexMetricsPrefix == "" {
opexMetricsPrefix = "numaflow_prom_sink"
}
var metricPort int
var ignoreMetricsTs, enableMsgTransformer bool
meticslabels := numaflag.MapFlag{}
Expand All @@ -224,7 +229,7 @@ func main() {
ps := prometheusSink{logger: logger, skipFailed: skipFailed, labels: labels, excludeLabels: excludeLabels,
ignoreMetricsTs: ignoreMetricsTs, metricsName: metricName, enableMsgTransformer: enableMsgTransformer}

ps.metrics = NewMetricsServer(labels)
ps.metrics = NewMetricsServer(labels, opexMetricsPrefix)
go ps.metrics.startMetricServer(metricPort)
ps.logger.Infof("Metrics publisher initialized with port=%d", metricPort)
err = sinksdk.NewServer(&ps).Start(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion prometheus-pusher/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestPusher(t *testing.T) {

pgwOK := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
dec := expfmt.NewDecoder(r.Body, expfmt.FmtProtoDelim)
dec := expfmt.NewDecoder(r.Body, expfmt.ResponseFormat(r.Header))

var mf io_prometheus_client.MetricFamily
dec.Decode(&mf)
Expand Down
14 changes: 8 additions & 6 deletions prometheus-pusher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,33 @@ type MetricsPublisher struct {
metricsTotalSkipped prometheus.Counter
metricsAnomalyGenerated *prometheus.CounterVec
labels map[string]string
opexMetricPrefix string
}

func (mp *MetricsPublisher) registerMertics() {
mp.metricsTotalPushed = promauto.NewCounter(prometheus.CounterOpts{
Name: "total_metrics_pushed",
Name: mp.opexMetricPrefix + "_" + "total_metrics_pushed",
Help: "The total number of metrics pushed",
ConstLabels: mp.labels,
})
mp.metricsTotalSuccess = promauto.NewCounter(prometheus.CounterOpts{
Name: "total_metrics_success",
Name: mp.opexMetricPrefix + "_" + "total_metrics_success",
Help: "The total number of metrics successfully pushed",
ConstLabels: mp.labels,
})
mp.metricsTotalFailed = promauto.NewCounter(prometheus.CounterOpts{
Name: "total_metrics_failed",
Name: mp.opexMetricPrefix + "_" + "total_metrics_failed",
Help: "The total number of metrics failed push",
ConstLabels: mp.labels,
})
mp.metricsTotalSkipped = promauto.NewCounter(prometheus.CounterOpts{
Name: "total_metrics_skipped",
Name: mp.opexMetricPrefix + "_" + "total_metrics_skipped",
Help: "The total number of metrics skipped",
ConstLabels: mp.labels,
})

mp.metricsAnomalyGenerated = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "total_anomaly_generated",
Name: mp.opexMetricPrefix + "_" + "total_anomaly_generated",
Help: "The total count of anomaly score generator",
ConstLabels: mp.labels,
}, []string{"namespace", "app", "metrics"})
Expand All @@ -64,9 +65,10 @@ func (mp *MetricsPublisher) IncreaseAnomalyGenerated(namespace, app, metricName
mp.metricsAnomalyGenerated.WithLabelValues(namespace, app, metricName).Inc()
}

func NewMetricsServer(labels map[string]string) *MetricsPublisher {
func NewMetricsServer(labels map[string]string, prefix string) *MetricsPublisher {
metricsPublisher := &MetricsPublisher{}
metricsPublisher.labels = labels
metricsPublisher.opexMetricPrefix = prefix
metricsPublisher.registerMertics()
return metricsPublisher

Expand Down
2 changes: 1 addition & 1 deletion prometheus-pusher/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestMetricsPublisher(t *testing.T) {
mp := NewMetricsServer(map[string]string{"label": "val1", "label2": "val2"})
mp := NewMetricsServer(map[string]string{"label": "val1", "label2": "val2"}, "prefix")
mp.IncreaseTotalPushed()
mp.IncreaseTotalSuccess()
mp.IncreaseTotalSkipped()
Expand Down

0 comments on commit 0e8e3a2

Please sign in to comment.