Skip to content

Commit

Permalink
feat(telemetry)_: replace telemetry with prometheus metrics
Browse files Browse the repository at this point in the history
Replace telemetry with local metrics using prometheus client.
Add parameters to InitializeApplication for enabling waku metrics
over prometheus and specifying which port to use.
  • Loading branch information
vpavlin authored and adklempner committed Jan 16, 2025
1 parent 4a5a338 commit 07798db
Show file tree
Hide file tree
Showing 14 changed files with 616 additions and 1,312 deletions.
14 changes: 6 additions & 8 deletions cmd/status-cli/util.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"errors"
"fmt"
"os"
Expand All @@ -12,11 +11,11 @@ import (

"github.com/status-im/status-go/api"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/metrics/wakumetrics"
"github.com/status-im/status-go/multiaccounts"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/services/wakuv2ext"
"github.com/status-im/status-go/telemetry"

"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -74,14 +73,12 @@ func start(p StartParams, logger *zap.SugaredLogger) (*StatusCLI, error) {
}

if p.TelemetryURL != "" {
telemetryLogger, err := getLogger(true)
waku := backend.StatusNode().WakuV2Service()
telemetryClient, err := wakumetrics.NewClient(wakumetrics.WithPeerID(waku.PeerID().String()))
if err != nil {
return nil, err
}
waku := backend.StatusNode().WakuV2Service()
telemetryClient := telemetry.NewClient(telemetryLogger, p.TelemetryURL, backend.SelectedAccountKeyID(), p.Name, "cli", telemetry.WithPeerID(waku.PeerID().String()))
telemetryClient.Start(context.Background())
backend.StatusNode().WakuV2Service().SetStatusTelemetryClient(telemetryClient)
backend.StatusNode().WakuV2Service().SetMetricsHandler(telemetryClient)
}
wakuAPI := wakuv2ext.NewPublicAPI(wakuService)

Expand Down Expand Up @@ -152,7 +149,8 @@ func createAccountAndLogin(b *api.GethStatusBackend, rootDataDir, password strin
HTTPHost: "127.0.0.1",
HTTPPort: p.Port,
},
TelemetryServerURL: p.TelemetryURL,
TelemetryServerURL: p.TelemetryURL,
WakuV2EnableMissingMessageVerification: true,
}
return b.CreateAccountAndLogin(req,
params.WithFleet(p.Fleet),
Expand Down
18 changes: 15 additions & 3 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ func healthHandler() http.Handler {
func Handler(reg metrics.Registry) http.Handler {
// we disable compression because geth doesn't support it
opts := promhttp.HandlerOpts{DisableCompression: true}
// we are combining handlers to avoid having 2 endpoints
statusMetrics := promhttp.HandlerFor(prom.DefaultGatherer, opts) // our metrics
gethMetrics := gethprom.Handler(reg) // geth metrics
// we are using only our own metrics
statusMetrics := promhttp.HandlerFor(prom.DefaultGatherer, opts)
if reg == nil {
return statusMetrics
}
// if registry is provided, combine handlers
gethMetrics := gethprom.Handler(reg)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
statusMetrics.ServeHTTP(w, r)
gethMetrics.ServeHTTP(w, r)
Expand All @@ -62,3 +66,11 @@ func (p *Server) Listen() {
defer common.LogOnPanic()
logutils.ZapLogger().Info("metrics server stopped", zap.Error(p.server.ListenAndServe()))
}

// Stop gracefully shuts down the metrics server
func (p *Server) Stop() error {
if p.server != nil {
return p.server.Close()
}
return nil
}
182 changes: 182 additions & 0 deletions metrics/wakumetrics/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package wakumetrics

import (
"fmt"
"strconv"

"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2"

v1protocol "github.com/status-im/status-go/protocol/v1"
v2common "github.com/status-im/status-go/wakuv2/common"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
)

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Messages []*v1protocol.StatusMessage
}

type Client struct {
peerId string
deviceType string
version string
lastPeerConnFailures map[string]int
}

type TelemetryClientOption func(*Client)

func WithPeerID(peerId string) TelemetryClientOption {
return func(c *Client) {
c.peerId = peerId
}
}

func WithDeviceType(deviceType string) TelemetryClientOption {
return func(c *Client) {
c.deviceType = deviceType
}
}

func WithVersion(version string) TelemetryClientOption {
return func(c *Client) {
c.version = version
}
}

func NewClient(opts ...TelemetryClientOption) (*Client, error) {
client := &Client{
lastPeerConnFailures: make(map[string]int),
}

for _, opt := range opts {
opt(client)
}

return client, nil
}

// RegisterWithRegistry registers all metrics with the provided registry
func (c *Client) RegisterWithRegistry() error {
if err := RegisterMetrics(); err != nil {
return fmt.Errorf("failed to register metrics: %v", err)
}
return nil
}

func (c *Client) SetDeviceType(deviceType string) {
c.deviceType = deviceType
}

func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) {
MessagesReceivedTotal.WithLabelValues(
receivedMessages.Filter.PubsubTopic,
receivedMessages.Filter.ContentTopic.String(),
receivedMessages.Filter.ChatID,
).Add(float64(len(receivedMessages.Messages)))
}

func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) {
EnvelopeSentTotal.WithLabelValues(
sentEnvelope.Envelope.PubsubTopic(),
sentEnvelope.Envelope.Message().ContentTopic,
sentEnvelope.PublishMethod.String(),
).Inc()
}

func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) {
EnvelopeSentErrors.WithLabelValues(
errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(),
errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic,
).Inc()
}

func (c *Client) PushPeerCount(peerCount int) {
ConnectedPeers.Set(float64(peerCount))
}

func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) {
for peerID, failures := range peerConnFailures {
if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists {
if failures == lastFailures {
continue
}
}
c.lastPeerConnFailures[peerID] = failures
PeerConnectionFailures.Add(float64(failures))
}
}

func (c *Client) PushMessageCheckSuccess() {
StoreQuerySuccesses.Inc()
}

func (c *Client) PushMessageCheckFailure() {
StoreQueryFailures.Inc()
}

func (c *Client) PushPeerCountByShard(peerCountByShard map[uint16]uint) {
for shard, count := range peerCountByShard {
PeersByShard.WithLabelValues(strconv.FormatUint(uint64(shard), 10)).Set(float64(count))
}
}

func (c *Client) PushPeerCountByOrigin(peerCountByOrigin map[wps.Origin]uint) {
for origin, count := range peerCountByOrigin {
PeersByOrigin.WithLabelValues(getOriginString(origin)).Set(float64(count))
}
}

func (c *Client) PushDialFailure(dialFailure v2common.DialError) {
PeerDialFailures.WithLabelValues(
dialFailure.ErrType.String(),
dialFailure.Protocols,
).Inc()
}

func (c *Client) PushMissedMessage(envelope *v2protocol.Envelope) {
MissedMessages.WithLabelValues(
envelope.PubsubTopic(),
envelope.Message().ContentTopic,
).Inc()
}

func (c *Client) PushMissedRelevantMessage(receivedMessage *v2common.ReceivedMessage) {
MissedMessages.WithLabelValues(
receivedMessage.PubsubTopic,
receivedMessage.ContentTopic.String(),
).Inc()
}

func (c *Client) PushMessageDeliveryConfirmed() {
MessageDeliveryConfirmations.Inc()
}

func (c *Client) PushSentMessageTotal(messageSize uint32, publishMethod string) {
WakuMessagesSizeBytes.WithLabelValues(publishMethod).Add(float64(messageSize))
MessagesSentTotal.WithLabelValues(publishMethod).Inc()
}

func getOriginString(origin wps.Origin) string {
switch origin {
case wps.Unknown:
return "unknown"
case wps.Discv5:
return "discv5"
case wps.Static:
return "static"
case wps.PeerExchange:
return "peer_exchange"
case wps.DNSDiscovery:
return "dns_discovery"
case wps.Rendezvous:
return "rendezvous"
case wps.PeerManager:
return "peer_manager"
default:
return "unknown"
}
}
Loading

0 comments on commit 07798db

Please sign in to comment.