Skip to content

Commit

Permalink
Support for --rate 0
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed May 16, 2024
1 parent cdc08d9 commit 05f1cd9
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 6 deletions.
17 changes: 15 additions & 2 deletions pkg/amqp10_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ func (p Amqp10Publisher) Start(ctx context.Context) {

p.msg = utils.MessageBody(p.Config.Size)

if p.Config.Rate == -1 {
switch p.Config.Rate {
case -1:
p.StartFullSpeed(ctx)
} else {
case 0:
p.StartIdle(ctx)
default:
p.StartRateLimited(ctx)
}

log.Debug("publisher completed", "protocol", "amqp-1.0", "publisherId", p.Id)
}

Expand All @@ -101,6 +105,15 @@ func (p Amqp10Publisher) StartFullSpeed(ctx context.Context) {
}
}

func (p Amqp10Publisher) StartIdle(ctx context.Context) {
log.Info("publisher started", "protocol", "AMQP-1.0", "publisherId", p.Id, "rate", "-", "destination", p.Topic)

select {
case <-ctx.Done():
return
}
}

func (p Amqp10Publisher) StartRateLimited(ctx context.Context) {
log.Info("publisher started", "protocol", "AMQP-1.0", "publisherId", p.Id, "rate", p.Config.Rate, "destination", p.Topic)
ticker := time.NewTicker(time.Duration(1000/float64(p.Config.Rate)) * time.Millisecond)
Expand Down
16 changes: 14 additions & 2 deletions pkg/mqtt_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ func (p MqttPublisher) Start(ctx context.Context) {

p.msg = utils.MessageBody(p.Config.Size)

if p.Config.Rate == -1 {
switch p.Config.Rate {
case -1:
p.StartFullSpeed(ctx)
} else {
case 0:
p.StartIdle(ctx)
default:
p.StartRateLimited(ctx)
}
log.Debug("publisher stopped", "protocol", "MQTT", "publisherId", p.Id)
Expand All @@ -92,6 +95,15 @@ func (p MqttPublisher) StartFullSpeed(ctx context.Context) {
}
}

func (p MqttPublisher) StartIdle(ctx context.Context) {
log.Info("publisher started", "protocol", "MQTT", "publisherId", p.Id, "rate", "-", "destination", p.Topic)

select {
case <-ctx.Done():
return
}
}

func (p MqttPublisher) StartRateLimited(ctx context.Context) {
log.Info("publisher started", "protocol", "MQTT", "publisherId", p.Id, "rate", p.Config.Rate, "destination", p.Topic)
ticker := time.NewTicker(time.Duration(1000/float64(p.Config.Rate)) * time.Millisecond)
Expand Down
16 changes: 14 additions & 2 deletions pkg/stomp_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ func (p StompPublisher) Start(ctx context.Context) {

p.msg = utils.MessageBody(p.Config.Size)

if p.Config.Rate == -1 {
switch p.Config.Rate {
case -1:
p.StartFullSpeed(ctx)
} else {
case 0:
p.StartIdle(ctx)
default:
p.StartRateLimited(ctx)
}
}
Expand All @@ -76,6 +79,15 @@ func (p StompPublisher) StartFullSpeed(ctx context.Context) {
log.Debug("publisher completed", "protocol", "stomp", "publisherId", p.Id)
}

func (p StompPublisher) StartIdle(ctx context.Context) {
log.Info("publisher started", "protocol", "STOMP", "publisherId", p.Id, "rate", "-", "destination", p.Topic)

select {
case <-ctx.Done():
return
}
}

func (p StompPublisher) StartRateLimited(ctx context.Context) {
log.Info("publisher started", "protocol", "STOMP", "publisherId", p.Id, "rate", p.Config.Rate, "destination", p.Topic)
ticker := time.NewTicker(time.Duration(1000/float64(p.Config.Rate)) * time.Millisecond)
Expand Down

0 comments on commit 05f1cd9

Please sign in to comment.