Skip to content

Commit

Permalink
Add --message-priority for STOMP and MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Dec 14, 2023
1 parent 4be3e5e commit 333a49f
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 2 deletions.
10 changes: 10 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -150,6 +151,14 @@ func RootCmd() *cobra.Command {
if cfg.Publishers == 0 || cfg.Consumers == 0 {
cfg.UseMillis = true
}

if cfg.MessagePriority != "" {
_, err := strconv.ParseUint(cfg.MessagePriority, 10, 8)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ERROR: invalid message priority: %s\n", cfg.MessagePriority)
os.Exit(1)
}
}
},
}
rootCmd.PersistentFlags().StringVarP(&cfg.PublisherUri, "publisher-uri", "", "", "URI for publishing")
Expand All @@ -174,6 +183,7 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().StringVar(&cfg.Amqp.Subject, "amqp-subject", "", "AMQP 1.0 message subject")
rootCmd.PersistentFlags().
BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable (default=true)")
rootCmd.PersistentFlags().StringVar(&cfg.MessagePriority, "message-priority", "", "Message priority (0-255, default=unset)")
rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "", "Stream consumer offset specification (default=next)")
rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValues, "stream-filter-values", "", "Stream consumer filter")
rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValueSet, "stream-filter-value-set", "", "Stream filter value for publisher")
Expand Down
11 changes: 9 additions & 2 deletions pkg/amqp10_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package amqp10_client
import (
"context"
"math/rand"
"strconv"
"time"

"github.com/rabbitmq/omq/pkg/config"
Expand Down Expand Up @@ -127,8 +128,14 @@ func (p Amqp10Publisher) Send() {
msg.Annotations = amqp.Annotations{"x-stream-filter-value": p.Config.StreamFilterValueSet}
}

msg.Header = &amqp.MessageHeader{
Durable: p.Config.MessageDurability}
msg.Header = &amqp.MessageHeader{}
msg.Header.Durable = p.Config.MessageDurability
if p.Config.MessagePriority != "" {
// already validated in root.go
priority, _ := strconv.ParseUint(p.Config.MessagePriority, 10, 8)
msg.Header.Priority = uint8(priority)
}

timer := prometheus.NewTimer(metrics.PublishingLatency.With(prometheus.Labels{"protocol": "amqp-1.0"}))
err := p.Sender.Send(context.TODO(), msg, nil)
timer.ObserveDuration()
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Config struct {
UseMillis bool
QueueDurability AmqpDurabilityMode
MessageDurability bool
MessagePriority string // to allow for "unset" value and STOMP takes strings anyway
StreamOffset string
StreamFilterValues string
StreamFilterValueSet string
Expand Down
3 changes: 3 additions & 0 deletions pkg/stomp_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func buildHeaders(cfg config.Config) []func(*frame.Frame) error {
msgDurability = "false"
}
headers = append(headers, stomp.SendOpt.Header("persistent", msgDurability))
if cfg.MessagePriority != "" {
headers = append(headers, stomp.SendOpt.Header("priority", cfg.MessagePriority))
}

if cfg.StreamFilterValueSet != "" {
headers = append(headers, stomp.SendOpt.Header("x-stream-filter-value", cfg.StreamFilterValueSet))
Expand Down

0 comments on commit 333a49f

Please sign in to comment.