Skip to content

Commit

Permalink
Add --uri as a shortcut for publisher&consumer URI
Browse files Browse the repository at this point in the history
Plus, better MQTT publisher handling
  • Loading branch information
mkuratczyk committed Sep 12, 2024
1 parent d5ea932 commit 943fc6c
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 7 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@ An [OCI image](https://hub.docker.com/r/pivotalrabbitmq/omq/tags) is also availa

Both `--publisher-uri` and `--consumer-uri` can be repeated multiple times to set multiple
endpoints. If `omq` can't establish a connection or an existing connection is terminated,
it will try the next URI from the list.
it will try the next URI from the list. If the endpoints are the same for publishers and consumers,
you can use `--uri` instead (but can't mix `--uri` with `--publisher-uri` and `--consumer-uri`).

Example:
For example, here both publishers and consumers will connect to either of the 3 URIs:
```
omq mqtt --consumer-uri mqtt://localhost:1883 --publisher-uri mqtt://localhost:1883 \
--consumer-uri mqtt://localhost:1884 --publisher-uri mqtt://localhost:1884 \
--consumer-uri mqtt://localhost:1885 --publisher-uri mqtt://localhost:1885
omq mqtt --uri mqtt://localhost:1883 --uri mqtt://localhost:1884 --uri mqtt://localhost:1885
```
And in this case, all consumers will connect to port 1883, while publishers to 1884:

```
omq mqtt --consumer-uri mqtt://localhost:1883 --publisher-uri mqtt://localhost:1884
```

### Terminus/Topic/Queue/Routing Key
Expand Down
15 changes: 13 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func RootCmd() *cobra.Command {
}
rootCmd.PersistentFlags().
VarP(enumflag.New(&log.Level, "log-level", log.Levels, enumflag.EnumCaseInsensitive), "log-level", "l", "Log level (debug, info, error)")
rootCmd.PersistentFlags().StringSliceVarP(&cfg.Uri, "uri", "", nil, "URI for both publishers and consumers")
rootCmd.PersistentFlags().StringSliceVarP(&cfg.PublisherUri, "publisher-uri", "", nil, "URI for publishing")
rootCmd.PersistentFlags().StringSliceVarP(&cfg.ConsumerUri, "consumer-uri", "", nil, "URI for consuming")
rootCmd.PersistentFlags().IntVarP(&cfg.Publishers, "publishers", "x", 1, "The number of publishers to start")
Expand Down Expand Up @@ -339,13 +340,23 @@ func startConsumers(ctx context.Context, consumerProto common.Protocol, wg *sync
}

func setUris(cfg *config.Config, command string) {
// --uri is a shortcut to set both --publisher-uri and --consumer-uri
if cfg.Uri != nil {
if cfg.PublisherUri != nil || cfg.ConsumerUri != nil {
log.Error("ERROR: can't specify both --uri and --publisher-uri/--consumer-uri")
os.Exit(1)
}
cfg.PublisherUri = cfg.Uri
cfg.ConsumerUri = cfg.Uri
}

if cfg.PublisherUri == nil {
log.Debug("setting default publisher uri", "uri", defaultUri(strings.Split(command, "-")[0]))
(*cfg).PublisherUri = []string{defaultUri(strings.Split(command, "-")[0])}
cfg.PublisherUri = []string{defaultUri(strings.Split(command, "-")[0])}
}
if cfg.ConsumerUri == nil {
log.Debug("setting default consumer uri", "uri", defaultUri(strings.Split(command, "-")[1]))
(*cfg).ConsumerUri = []string{defaultUri(strings.Split(command, "-")[1])}
cfg.ConsumerUri = []string{defaultUri(strings.Split(command, "-")[1])}
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type MqttOptions struct {
}

type Config struct {
Uri []string
PublisherUri []string
ConsumerUri []string
Publishers int
Expand Down
3 changes: 3 additions & 0 deletions pkg/mqtt_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func NewPublisher(cfg config.Config, id int) *MqttPublisher {
connection := mqtt.NewClient(opts)
token = connection.Connect()
token.Wait()
if token.Error() != nil {
log.Error("publisher connection failed", "id", id, "error", token.Error())
}

topic := topic.CalculateTopic(cfg.PublishTo, id)
// AMQP-1.0 and STOMP allow /exchange/amq.topic/ prefix
Expand Down

0 comments on commit 943fc6c

Please sign in to comment.