Skip to content

Commit

Permalink
Refactor metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Dec 4, 2024
1 parent 537973a commit 6b02ac1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 88 deletions.
34 changes: 9 additions & 25 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ var (
amqpPropertyFilters []string
)

var rmqMgmt *mgmt.Mgmt
var (
rmqMgmt *mgmt.Mgmt
metricsServer *metrics.MetricsServer
)

func Execute() {
rootCmd := RootCmd()
Expand Down Expand Up @@ -201,6 +204,7 @@ func RootCmd() *cobra.Command {
Use: "version",
Run: func(cmd *cobra.Command, args []string) {
version.Print()
os.Exit(0)
},
}

Expand All @@ -223,9 +227,6 @@ func RootCmd() *cobra.Command {
}
}
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {
shutdown(cfg.PrintAllMetrics)
},
}

rootCmd.PersistentFlags().StringSliceVarP(&cfg.Uri, "uri", "", nil,
Expand Down Expand Up @@ -336,13 +337,15 @@ func start(cfg config.Config) {
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// handle ^C
handleInterupt(ctx, cancel)

startMetrics(cfg)
metricsServer = metrics.Start(ctx, cfg)
defer metricsServer.Stop()

rmqMgmt = mgmt.Start(ctx, cfg.ManagementUri, cfg.CleanupQueues)
defer rmqMgmt.Stop()

var wg sync.WaitGroup

Expand All @@ -361,8 +364,6 @@ func start(cfg config.Config) {
}

// every second, print the current values of the metrics
m := metrics.GetMetricsServer()
m.PrintMessageRates(ctx)
wg.Wait()
}

Expand Down Expand Up @@ -437,13 +438,6 @@ func joinCluster(expectedInstance int, serviceName string) {
}()
}

func startMetrics(cfg config.Config) {
metrics.RegisterMetrics(cfg.MetricTags)
metrics.RegisterCommandLineMetric(cfg, cfg.MetricTags)
metricsServer := metrics.GetMetricsServer()
metricsServer.Start()
}

func startConsumers(ctx context.Context, wg *sync.WaitGroup) {
for i := 1; i <= cfg.Consumers; i++ {
select {
Expand Down Expand Up @@ -548,16 +542,6 @@ func defaultUri(proto string) string {
return uri
}

func shutdown(printAllMetrics bool) {
rmqMgmt.Stop()
metricsServer := metrics.GetMetricsServer()
metricsServer.PrintSummary()

if printAllMetrics {
metricsServer.PrintAll()
}
}

func sanitizeConfig(cfg *config.Config) error {
if cfg.Size < 12 {
return fmt.Errorf("size can't be less than 12 bytes")
Expand Down
117 changes: 54 additions & 63 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
)

type MetricsServer struct {
httpServer *http.Server
running bool
started time.Time
httpServer *http.Server
running bool
started time.Time
printAllOnStop bool
}

var lock = &sync.Mutex{}
var once sync.Once

var metricsServer *MetricsServer

Expand All @@ -42,7 +43,47 @@ var (
EndToEndLatency *vmetrics.Summary
)

func RegisterMetrics(globalLabels map[string]string) {
func startServer() {
once.Do(func() {
http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
vmetrics.WritePrometheus(w, true)
})

metricsServer = &MetricsServer{
httpServer: &http.Server{
Addr: get_metrics_ip() + ":8080",
},
}
})
}

func Start(ctx context.Context, cfg config.Config) *MetricsServer {
startServer()
metricsServer.printAllOnStop = cfg.PrintAllMetrics
registerMetrics(cfg.MetricTags)
registerCommandLineMetric(cfg, cfg.MetricTags)
metricsServer.printMessageRates(ctx)

go func() {
for {
metricsServer.httpServer.RegisterOnShutdown(func() {
metricsServer.running = false
})
metricsServer.started = time.Now()
metricsServer.running = true
log.Debug("starting Prometheus metrics server", "address", metricsServer.httpServer.Addr)
err := metricsServer.httpServer.ListenAndServe()
if errors.Is(err, syscall.EADDRINUSE) {
port, _ := strconv.Atoi(strings.Split(metricsServer.httpServer.Addr, ":")[1])
metricsServer.httpServer.Addr = get_metrics_ip() + ":" + fmt.Sprint(port+1)
log.Info("prometheus metrics: port already in use, trying the next one", "port", metricsServer.httpServer.Addr)
}
}
}()
return metricsServer
}

func registerMetrics(globalLabels map[string]string) {
normal := map[string]string{"priority": "normal"}
maps.Copy(normal, globalLabels)
normalPriorityLabels := labelsToString(normal)
Expand All @@ -59,7 +100,7 @@ func RegisterMetrics(globalLabels map[string]string) {
EndToEndLatency = vmetrics.GetOrCreateSummaryExt(`omq_end_to_end_latency_seconds`+labelsToString(globalLabels), 1*time.Second, []float64{0.5, 0.9, 0.95, 0.99})
}

func RegisterCommandLineMetric(cfg config.Config, globalLabels map[string]string) {
func registerCommandLineMetric(cfg config.Config, globalLabels map[string]string) {
var args []string
// some of the command line args are not useful in the metric
ignoredArgs := []string{
Expand Down Expand Up @@ -103,63 +144,12 @@ func MessagesConsumedOutOfOrderMetric(priority int) *vmetrics.Counter {
return MessagesConsumedOutOfOrderNormalPriority
}

func Reset() {
MessagesPublished.Set(0)
MessagesConsumedNormalPriority.Set(0)
MessagesConsumedHighPriority.Set(0)
MessagesConsumedOutOfOrderNormalPriority.Set(0)
MessagesConsumedOutOfOrderHighPriority.Set(0)
// PublishingLatency.Reset()
// EndToEndLatency.Reset()
}

func GetMetricsServer() *MetricsServer {
lock.Lock()
defer lock.Unlock()
if metricsServer == nil {
http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
vmetrics.WritePrometheus(w, true)
})

metricsServer = &MetricsServer{
httpServer: &http.Server{
Addr: get_metrics_ip() + ":8080",
},
}
}

return metricsServer
}

func (m *MetricsServer) Start() {
if m.running {
return
}

go func() {
for {
m.httpServer.RegisterOnShutdown(func() {
m.running = false
})
m.started = time.Now()
m.running = true
log.Debug("starting Prometheus metrics server", "address", m.httpServer.Addr)
err := m.httpServer.ListenAndServe()
if errors.Is(err, syscall.EADDRINUSE) {
port, _ := strconv.Atoi(strings.Split(m.httpServer.Addr, ":")[1])
m.httpServer.Addr = get_metrics_ip() + ":" + fmt.Sprint(port+1)
log.Info("prometheus metrics: port already in use, trying the next one", "port", m.httpServer.Addr)
}
}
}()
}

var (
previouslyPublished uint64
previouslyConsumed uint64
)

func (m *MetricsServer) PrintMessageRates(ctx context.Context) {
func (m *MetricsServer) printMessageRates(ctx context.Context) {
go func() {
for {
select {
Expand All @@ -182,13 +172,14 @@ func (m *MetricsServer) PrintMessageRates(ctx context.Context) {
}()
}

func (m *MetricsServer) PrintSummary() {
// this might ve called before the metrics were registered
// eg. by `omq --help`
if MessagesPublished == nil {
return
func (m *MetricsServer) Stop() {
m.PrintSummary()
if m.printAllOnStop {
m.PrintAll()
}
}

func (m *MetricsServer) PrintSummary() {
log.Print("TOTAL PUBLISHED",
"messages", MessagesPublished.Get(),
"rate", fmt.Sprintf("%.2f/s", float64(MessagesPublished.Get())/time.Since(m.started).Seconds()))
Expand Down

0 comments on commit 6b02ac1

Please sign in to comment.