Skip to content

Commit

Permalink
WIP: AMQP 0.9.1 publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Dec 17, 2024
1 parent 6e02254 commit dc1be16
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 10 deletions.
67 changes: 57 additions & 10 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ import (

var cfg config.Config
var (
amqp_amqp = &cobra.Command{}
amqp_stomp = &cobra.Command{}
amqp_mqtt = &cobra.Command{}
stomp_stomp = &cobra.Command{}
stomp_amqp = &cobra.Command{}
stomp_mqtt = &cobra.Command{}
mqtt_mqtt = &cobra.Command{}
mqtt_amqp = &cobra.Command{}
mqtt_stomp = &cobra.Command{}
versionCmd = &cobra.Command{}
amqp_amqp = &cobra.Command{}
amqp_stomp = &cobra.Command{}
amqp_mqtt = &cobra.Command{}
stomp_stomp = &cobra.Command{}
stomp_amqp = &cobra.Command{}
stomp_mqtt = &cobra.Command{}
mqtt_mqtt = &cobra.Command{}
mqtt_amqp = &cobra.Command{}
mqtt_stomp = &cobra.Command{}
amqp091_amqp091 = &cobra.Command{}
amqp091_amqp = &cobra.Command{}
amqp091_mqtt = &cobra.Command{}
amqp091_stomp = &cobra.Command{}
versionCmd = &cobra.Command{}
)

var (
Expand Down Expand Up @@ -204,6 +208,43 @@ func RootCmd() *cobra.Command {
}
mqtt_stomp.Flags().AddFlagSet(mqttPublisherFlags)

amqp091_amqp091 = &cobra.Command{
Use: "amqp091-amqp091",
Aliases: []string{"amqp091"},
Run: func(cmd *cobra.Command, args []string) {
cfg.PublisherProto = config.AMQP091
cfg.ConsumerProto = config.AMQP091
start(cfg)
},
}

amqp091_amqp = &cobra.Command{
Use: "amqp091-amqp",
Run: func(cmd *cobra.Command, args []string) {
cfg.PublisherProto = config.AMQP091
cfg.ConsumerProto = config.AMQP
start(cfg)
},
}

amqp091_mqtt = &cobra.Command{
Use: "amqp091-mqtt",
Run: func(cmd *cobra.Command, args []string) {
cfg.PublisherProto = config.AMQP091
cfg.ConsumerProto = config.MQTT
start(cfg)
},
}

amqp091_stomp = &cobra.Command{
Use: "amqp091-stomp",
Run: func(cmd *cobra.Command, args []string) {
cfg.PublisherProto = config.AMQP091
cfg.ConsumerProto = config.STOMP
start(cfg)
},
}

versionCmd = &cobra.Command{
Use: "version",
Run: func(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -318,6 +359,10 @@ func RootCmd() *cobra.Command {
rootCmd.AddCommand(mqtt_mqtt)
rootCmd.AddCommand(mqtt_amqp)
rootCmd.AddCommand(mqtt_stomp)
rootCmd.AddCommand(amqp091_amqp091)
rootCmd.AddCommand(amqp091_amqp)
rootCmd.AddCommand(amqp091_mqtt)
rootCmd.AddCommand(amqp091_stomp)
rootCmd.AddCommand(versionCmd)

return rootCmd
Expand Down Expand Up @@ -549,6 +594,8 @@ func defaultUri(proto string) string {
switch proto {
case "amqp":
uri = "amqp://localhost/"
case "amqp091":
uri = "amqp://localhost/"
case "stomp":
uri = "stomp://localhost:61613"
case "mqtt":
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
github.com/muesli/termenv v0.15.2 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8b
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241121093408-1a6679a20123 h1:vwJUkrY81ekNGCh2xN+ii16ZSRfsmClJkJhHGimiocA=
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241121093408-1a6679a20123/go.mod h1:Km231GyOZAw9I3SZIqkfB9VVzCsu8jvFWYdghmnwueM=
github.com/relvacode/iso8601 v1.6.0 h1:eFXUhMJN3Gz8Rcq82f9DTMW0svjtAVuIEULglM7QHTU=
Expand Down
187 changes: 187 additions & 0 deletions pkg/amqp091/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// TODO
// context
// wait for confirms before closing the connection
// handle errors
package amqp091

import (
"context"
"math/rand"
"sync"
"sync/atomic"
"time"

amqp091 "github.com/rabbitmq/amqp091-go"
"github.com/rabbitmq/omq/pkg/config"
"github.com/rabbitmq/omq/pkg/log"
"github.com/rabbitmq/omq/pkg/metrics"
"github.com/rabbitmq/omq/pkg/utils"
)

type Amqp091Publisher struct {
Id int
Connection *amqp091.Connection
Channel *amqp091.Channel
confirms chan amqp091.Confirmation
publishTimes sync.Map
Terminus string
Config config.Config
msg []byte
whichUri int
ctx context.Context
}

func NewPublisher(ctx context.Context, cfg config.Config, id int) *Amqp091Publisher {
publisher := &Amqp091Publisher{
Id: id,
Connection: nil,
Config: cfg,
Terminus: utils.InjectId(cfg.PublishTo, id),
whichUri: 0,
ctx: ctx,
}

if cfg.SpreadConnections {
publisher.whichUri = (id - 1) % len(cfg.PublisherUri)
}

publisher.Connect()

return publisher
}

func (p *Amqp091Publisher) Connect() {
var conn *amqp091.Connection
var err error

if p.Connection != nil {
_ = p.Connection.Close()
}
p.Connection = nil

for p.Connection == nil {
if p.whichUri >= len(p.Config.PublisherUri) {
p.whichUri = 0
}
uri := p.Config.PublisherUri[p.whichUri]
p.whichUri++
// hostname, vhost := hostAndVHost(uri)
conn, err = amqp091.Dial(uri)
if err != nil {
log.Error("connection failed", "id", p.Id, "error", err.Error())
select {
case <-time.After(1 * time.Second):
continue
case <-p.ctx.Done():
return
}
} else {
log.Debug("publisher connected", "id", p.Id, "uri", uri)
p.Connection = conn
}
}
p.Channel, err = p.Connection.Channel() // TODO
if err != nil {
log.Error("channel creation failed", "id", p.Id, "error", err.Error())
}
p.confirms = make(chan amqp091.Confirmation) // p.Config.MaxInFlight ?
_ = p.Channel.Confirm(false)
p.Channel.NotifyPublish(p.confirms)
}

func (p *Amqp091Publisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) {
p.msg = utils.MessageBody(p.Config.Size)

close(publisherReady)

select {
case <-ctx.Done():
return
case <-startPublishing:
// short random delay to avoid all publishers publishing at the same time
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}

log.Info("publisher started", "id", p.Id, "rate", utils.Rate(p.Config.Rate), "destination", p.Terminus)
var farewell string
if p.Config.Rate == 0 {
// idle connection
<-ctx.Done()
farewell = "context cancelled"
} else {
farewell = p.StartPublishing(ctx)
}
p.Stop(farewell)
}

func (p *Amqp091Publisher) StartPublishing(ctx context.Context) string {
limiter := utils.RateLimiter(p.Config.Rate)

go p.handleConfirms()

var msgSent atomic.Int64
for {
select {
case <-ctx.Done():
return "context cancelled"
default:
n := uint64(msgSent.Add(1))
if n > uint64(p.Config.PublishCount) {
return "--pmessages value reached"
}
if p.Config.Rate > 0 {
_ = limiter.Wait(ctx)
}
err := p.SendAsync(ctx, n)
if err != nil {
p.Connect()
} else {
log.Debug("message sent", "id", p.Id, "deliveryTag", n)
}
}
}
}

func (p *Amqp091Publisher) SendAsync(ctx context.Context, n uint64) error {
msg := p.prepareMessage()

p.publishTimes.Store(n, time.Now())
err := p.Channel.Publish("", p.Config.PublishTo, false, false, msg)
return err
}

func (p *Amqp091Publisher) handleConfirms() {
for confirm := range p.confirms {
if confirm.Ack {
pubTime, _ := p.publishTimes.LoadAndDelete(confirm.DeliveryTag)
latency := time.Since(pubTime.(time.Time))
metrics.PublishingLatency.Update(latency.Seconds())
metrics.MessagesPublished.Inc()
log.Debug("message confirmed", "id", p.Id, "delivery_tag", confirm.DeliveryTag, "latency", latency)
} else {
p.publishTimes.Delete(confirm.DeliveryTag)
log.Debug("message not confirmed by the broker", "id", p.Id)
}
}
}

func (p *Amqp091Publisher) Stop(reason string) {
// for p.publishTimes. > 0 {
// time.Sleep(100 * time.Millisecond)
// }
log.Debug("closing publisher connection", "id", p.Id, "reason", reason)
if p.Channel != nil {
_ = p.Channel.Close()
}
if p.Connection != nil {
_ = p.Connection.Close()
}
}

func (p *Amqp091Publisher) prepareMessage() amqp091.Publishing {
utils.UpdatePayload(p.Config.UseMillis, &p.msg)
return amqp091.Publishing{
DeliveryMode: amqp091.Persistent,
Body: p.msg,
}
}
7 changes: 7 additions & 0 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/rabbitmq/omq/pkg/amqp091"
"github.com/rabbitmq/omq/pkg/amqp10"
"github.com/rabbitmq/omq/pkg/config"
"github.com/rabbitmq/omq/pkg/mqtt"
Expand All @@ -26,6 +27,12 @@ func NewPublisher(ctx context.Context, cfg config.Config, id int) (Publisher, er
return nil, fmt.Errorf("failed to create an AMQP-1.0 publisher")
}
return p, nil
case config.AMQP091:
p := amqp091.NewPublisher(ctx, cfg, id)
if p == nil {
return nil, fmt.Errorf("failed to create an AMQP-0.9.1 publisher")
}
return p, nil
case config.STOMP:
p := stomp.NewPublisher(ctx, cfg, id)
if p == nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Protocol int

const (
AMQP Protocol = iota
AMQP091
STOMP
MQTT
MQTT5
Expand Down

0 comments on commit dc1be16

Please sign in to comment.