diff --git a/cmd/cmd_test.go b/cmd/cmd_test.go index 5dec961..990ce3a 100644 --- a/cmd/cmd_test.go +++ b/cmd/cmd_test.go @@ -49,18 +49,16 @@ func TestPublishConsume(t *testing.T) { args := []string{tc.publish + "-" + tc.consume, "-C", "1", "-D", "1", "-t", topic, "-T", topic} rootCmd.SetArgs(args) fmt.Println("Running test: omq", strings.Join(args, " ")) - publishedBefore := testutil.ToFloat64(metrics.MessagesPublished.WithLabelValues(publishProtoLabel)) - consumedBefore := testutil.ToFloat64(metrics.MessagesConsumed.WithLabelValues(consumeProtoLabel)) err := rootCmd.Execute() - assert.Nil(t, err) + assert.Eventually(t, func() bool { - return testutil.ToFloat64(metrics.MessagesPublished.WithLabelValues(publishProtoLabel)) == publishedBefore+1 + return assert.Equal(t, 1.0, testutil.ToFloat64(metrics.MessagesPublished.WithLabelValues(publishProtoLabel))) }, 2*time.Second, 100*time.Millisecond) assert.Eventually(t, func() bool { - return testutil.ToFloat64(metrics.MessagesConsumed.WithLabelValues(consumeProtoLabel)) == consumedBefore+1 + return assert.Equal(t, 1.0, testutil.ToFloat64(metrics.MessagesConsumed.WithLabelValues(consumeProtoLabel))) }, 2*time.Second, 100*time.Millisecond) }) } diff --git a/cmd/root.go b/cmd/root.go index 6122eb6..4908fdb 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -36,6 +36,8 @@ var ( versionCmd = &cobra.Command{} ) +var metricTags []string + func Execute() { rootCmd := RootCmd() err := rootCmd.Execute() @@ -159,6 +161,24 @@ func RootCmd() *cobra.Command { os.Exit(1) } } + + // split metric tags into key-value pairs + cfg.MetricTags = make(map[string]string) + for _, tag := range metricTags { + parts := strings.Split(tag, "=") + if len(parts) != 2 { + _, _ = fmt.Fprintf(os.Stderr, "ERROR: invalid metric tags: %s, use label=value format\n", tag) + os.Exit(1) + } + cfg.MetricTags[parts[0]] = parts[1] + } + if metricTags != nil { + metrics.RegisterMetrics(cfg.MetricTags) + metricTags = nil + } + }, + PersistentPostRun: func(cmd *cobra.Command, args []string) { + metrics.UnregisterMetrics() }, } rootCmd.PersistentFlags().StringVarP(&cfg.PublisherUri, "publisher-uri", "", "", "URI for publishing") @@ -189,6 +209,7 @@ func RootCmd() *cobra.Command { rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValueSet, "stream-filter-value-set", "", "Stream filter value for publisher") rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, "AMQP-1.0 consumer credits / STOMP prefetch count") rootCmd.PersistentFlags().DurationVarP(&cfg.ConsumerLatency, "consumer-latency", "L", 0*time.Second, "consumer latency (time to accept message)") + rootCmd.PersistentFlags().StringSliceVar(&metricTags, "metric-tags", []string{}, "Prometheus label-value pairs, eg. l1=v1,l2=v2") rootCmd.AddCommand(amqp_amqp) rootCmd.AddCommand(amqp_stomp) diff --git a/pkg/config/config.go b/pkg/config/config.go index e17e74a..3fb9609 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -53,6 +53,7 @@ type Config struct { Amqp AmqpOptions MqttPublisher MqttOptions MqttConsumer MqttOptions + MetricTags map[string]string } func NewConfig() Config { diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 5cc1496..e19484d 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -28,39 +28,44 @@ var lock = &sync.Mutex{} var metricsServer *MetricsServer var ( + MessagesPublished *prometheus.CounterVec + MessagesConsumed *prometheus.CounterVec + PublishingLatency *prometheus.SummaryVec + EndToEndLatency *prometheus.SummaryVec +) + +func RegisterMetrics(globalLabels prometheus.Labels) { + fmt.Printf("negistering metrics: %v", globalLabels) MessagesPublished = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "omq_messages_published_total", - Help: "The total number of published messages"}, - []string{ - "protocol", - }, - ) + Name: "omq_messages_published_total", + Help: "The total number of published messages", + ConstLabels: globalLabels, + }, []string{"protocol"}) MessagesConsumed = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "omq_messages_consumed_total", - Help: "The total number of consumed messages"}, - []string{ - "protocol", - }, - ) + Name: "omq_messages_consumed_total", + Help: "The total number of consumed messages", + ConstLabels: globalLabels, + }, []string{"protocol"}) PublishingLatency = promauto.NewSummaryVec(prometheus.SummaryOpts{ - Name: "omq_publishing_latency_seconds", - Help: "Time from sending a message to receiving a confirmation", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, - }, - []string{ - "protocol", - }, - ) + Name: "omq_publishing_latency_seconds", + Help: "Time from sending a message to receiving a confirmation", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + ConstLabels: globalLabels, + }, []string{"protocol"}) EndToEndLatency = promauto.NewSummaryVec(prometheus.SummaryOpts{ - Name: "omq_end_to_end_latency_seconds", - Help: "Time from sending a message to receiving the message", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, - }, - []string{ - "protocol", - }, - ) -) + Name: "omq_end_to_end_latency_seconds", + Help: "Time from sending a message to receiving the message", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + ConstLabels: globalLabels, + }, []string{"protocol"}) +} + +func UnregisterMetrics() { + prometheus.DefaultRegisterer.Unregister(MessagesPublished) + prometheus.DefaultRegisterer.Unregister(MessagesConsumed) + prometheus.DefaultRegisterer.Unregister(PublishingLatency) + prometheus.DefaultRegisterer.Unregister(EndToEndLatency) +} func GetMetricsServer() *MetricsServer { lock.Lock()