Skip to content

Commit

Permalink
Add --metric-tags option (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk authored Feb 12, 2024
1 parent c43ae66 commit ab50fe7
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 34 deletions.
8 changes: 3 additions & 5 deletions cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,16 @@ func TestPublishConsume(t *testing.T) {
args := []string{tc.publish + "-" + tc.consume, "-C", "1", "-D", "1", "-t", topic, "-T", topic}
rootCmd.SetArgs(args)
fmt.Println("Running test: omq", strings.Join(args, " "))
publishedBefore := testutil.ToFloat64(metrics.MessagesPublished.WithLabelValues(publishProtoLabel))
consumedBefore := testutil.ToFloat64(metrics.MessagesConsumed.WithLabelValues(consumeProtoLabel))

err := rootCmd.Execute()

assert.Nil(t, err)

assert.Eventually(t, func() bool {
return testutil.ToFloat64(metrics.MessagesPublished.WithLabelValues(publishProtoLabel)) == publishedBefore+1
return assert.Equal(t, 1.0, testutil.ToFloat64(metrics.MessagesPublished.WithLabelValues(publishProtoLabel)))

}, 2*time.Second, 100*time.Millisecond)
assert.Eventually(t, func() bool {
return testutil.ToFloat64(metrics.MessagesConsumed.WithLabelValues(consumeProtoLabel)) == consumedBefore+1
return assert.Equal(t, 1.0, testutil.ToFloat64(metrics.MessagesConsumed.WithLabelValues(consumeProtoLabel)))
}, 2*time.Second, 100*time.Millisecond)
})
}
Expand Down
21 changes: 21 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var (
versionCmd = &cobra.Command{}
)

var metricTags []string

func Execute() {
rootCmd := RootCmd()
err := rootCmd.Execute()
Expand Down Expand Up @@ -159,6 +161,24 @@ func RootCmd() *cobra.Command {
os.Exit(1)
}
}

// split metric tags into key-value pairs
cfg.MetricTags = make(map[string]string)
for _, tag := range metricTags {
parts := strings.Split(tag, "=")
if len(parts) != 2 {
_, _ = fmt.Fprintf(os.Stderr, "ERROR: invalid metric tags: %s, use label=value format\n", tag)
os.Exit(1)
}
cfg.MetricTags[parts[0]] = parts[1]
}
if metricTags != nil {
metrics.RegisterMetrics(cfg.MetricTags)
metricTags = nil
}
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {
metrics.UnregisterMetrics()
},
}
rootCmd.PersistentFlags().StringVarP(&cfg.PublisherUri, "publisher-uri", "", "", "URI for publishing")
Expand Down Expand Up @@ -189,6 +209,7 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValueSet, "stream-filter-value-set", "", "Stream filter value for publisher")
rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, "AMQP-1.0 consumer credits / STOMP prefetch count")
rootCmd.PersistentFlags().DurationVarP(&cfg.ConsumerLatency, "consumer-latency", "L", 0*time.Second, "consumer latency (time to accept message)")
rootCmd.PersistentFlags().StringSliceVar(&metricTags, "metric-tags", []string{}, "Prometheus label-value pairs, eg. l1=v1,l2=v2")

rootCmd.AddCommand(amqp_amqp)
rootCmd.AddCommand(amqp_stomp)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Config struct {
Amqp AmqpOptions
MqttPublisher MqttOptions
MqttConsumer MqttOptions
MetricTags map[string]string
}

func NewConfig() Config {
Expand Down
63 changes: 34 additions & 29 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,44 @@ var lock = &sync.Mutex{}
var metricsServer *MetricsServer

var (
MessagesPublished *prometheus.CounterVec
MessagesConsumed *prometheus.CounterVec
PublishingLatency *prometheus.SummaryVec
EndToEndLatency *prometheus.SummaryVec
)

func RegisterMetrics(globalLabels prometheus.Labels) {
fmt.Printf("negistering metrics: %v", globalLabels)
MessagesPublished = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "omq_messages_published_total",
Help: "The total number of published messages"},
[]string{
"protocol",
},
)
Name: "omq_messages_published_total",
Help: "The total number of published messages",
ConstLabels: globalLabels,
}, []string{"protocol"})
MessagesConsumed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "omq_messages_consumed_total",
Help: "The total number of consumed messages"},
[]string{
"protocol",
},
)
Name: "omq_messages_consumed_total",
Help: "The total number of consumed messages",
ConstLabels: globalLabels,
}, []string{"protocol"})
PublishingLatency = promauto.NewSummaryVec(prometheus.SummaryOpts{
Name: "omq_publishing_latency_seconds",
Help: "Time from sending a message to receiving a confirmation",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
},
[]string{
"protocol",
},
)
Name: "omq_publishing_latency_seconds",
Help: "Time from sending a message to receiving a confirmation",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
ConstLabels: globalLabels,
}, []string{"protocol"})
EndToEndLatency = promauto.NewSummaryVec(prometheus.SummaryOpts{
Name: "omq_end_to_end_latency_seconds",
Help: "Time from sending a message to receiving the message",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
},
[]string{
"protocol",
},
)
)
Name: "omq_end_to_end_latency_seconds",
Help: "Time from sending a message to receiving the message",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
ConstLabels: globalLabels,
}, []string{"protocol"})
}

func UnregisterMetrics() {
prometheus.DefaultRegisterer.Unregister(MessagesPublished)
prometheus.DefaultRegisterer.Unregister(MessagesConsumed)
prometheus.DefaultRegisterer.Unregister(PublishingLatency)
prometheus.DefaultRegisterer.Unregister(EndToEndLatency)
}

func GetMetricsServer() *MetricsServer {
lock.Lock()
Expand Down

0 comments on commit ab50fe7

Please sign in to comment.