Skip to content

Commit

Permalink
Add --log-out-of-order-messages flag
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Aug 7, 2024
1 parent 072344b commit ae745d9
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ messages published with perf-test can be consumed by `omq` or vice versa, and th
--consumer-uri string URI for consuming
-y, --consumers int The number of consumers to start (default 1)
-h, --help help for omq
--log-out-of-order-messages Print a log line when a message is received that is older than the previously received message
-d, --message-durability Mark messages as durable (default true)
--message-priority string Message priority (0-255, default=unset)
--metric-tags strings Prometheus label-value pairs, eg. l1=v1,l2=v2
Expand Down
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, "AMQP-1.0 consumer credits / STOMP prefetch count")
rootCmd.PersistentFlags().DurationVarP(&cfg.ConsumerLatency, "consumer-latency", "L", 0*time.Second, "consumer latency (time to accept message)")
rootCmd.PersistentFlags().StringSliceVar(&metricTags, "metric-tags", []string{}, "Prometheus label-value pairs, eg. l1=v1,l2=v2")
rootCmd.PersistentFlags().
BoolVar(&cfg.LogOutOfOrder, "log-out-of-order-messages", false, "Print a log line when a message is received that is older than the previously received message")

rootCmd.AddCommand(amqp_amqp)
rootCmd.AddCommand(amqp_stomp)
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
timeSent, latency := utils.CalculateEndToEndLatency(&payload)
m.With(prometheus.Labels{"protocol": "amqp-1.0"}).Observe(latency.Seconds())

if timeSent.Before(previousMessageTimeSent) {
if c.Config.LogOutOfOrder && timeSent.Before(previousMessageTimeSent) {
metrics.MessagesConsumedOutOfOrder.With(prometheus.Labels{"protocol": "amqp-1.0", "priority": priority}).Inc()
log.Info("Out of order message received. This message was sent before the previous message", "this messsage", timeSent, "previous message", previousMessageTimeSent)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Config struct {
MqttPublisher MqttOptions
MqttConsumer MqttOptions
MetricTags map[string]string
LogOutOfOrder bool
}

func NewConfig() Config {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mqtt_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c MqttConsumer) Start(ctx context.Context, subscribed chan bool) {
timeSent, latency := utils.CalculateEndToEndLatency(&payload)
m.Observe(latency.Seconds())

if timeSent.Before(previousMessageTimeSent) {
if c.Config.LogOutOfOrder && timeSent.Before(previousMessageTimeSent) {
metrics.MessagesConsumedOutOfOrder.With(prometheus.Labels{"protocol": "mqtt"}).Inc()
log.Info("Out of order message received. This message was sent before the previous message", "this messsage", timeSent, "previous message", previousMessageTimeSent)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stomp_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c StompConsumer) Start(ctx context.Context, subscribed chan bool) {

priority := msg.Header.Get("priority")

if timeSent.Before(previousMessageTimeSent) {
if c.Config.LogOutOfOrder && timeSent.Before(previousMessageTimeSent) {
metrics.MessagesConsumedOutOfOrder.With(prometheus.Labels{"protocol": "amqp-1.0", "priority": priority}).Inc()
log.Info("Out of order message received. This message was sent before the previous message", "this messsage", timeSent, "previous message", previousMessageTimeSent)
}
Expand Down

0 comments on commit ae745d9

Please sign in to comment.