Skip to content

Commit

Permalink
Refactor AMQP release/reject
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Sep 4, 2024
1 parent cfa0722 commit 6324774
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
time.Sleep(c.Config.ConsumerLatency)
}

outcome, err := c.outcome(ctx, msg, c.Config.Amqp.ReleaseRate, c.Config.Amqp.RejectRate)
outcome, err := c.outcome(ctx, msg)
if err != nil {
if err == context.Canceled {
return
Expand All @@ -187,16 +187,16 @@ func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
log.Debug("consumer finished", "id", c.Id)
}

func (c *Amqp10Consumer) outcome(ctx context.Context, msg *amqp.Message, releaseRate int, rejectRate int) (string, error) {
func (c *Amqp10Consumer) outcome(ctx context.Context, msg *amqp.Message) (string, error) {
// don't generate random numbers if not necessary
if releaseRate == 0 && rejectRate == 0 {
if c.Config.Amqp.ReleaseRate == 0 && c.Config.Amqp.RejectRate == 0 {
return "accept", c.Receiver.AcceptMessage(ctx, msg)
}

n := rand.Intn(100)
if n < releaseRate {
if n < c.Config.Amqp.ReleaseRate {
return "release", c.Receiver.ReleaseMessage(ctx, msg)
} else if n < releaseRate+rejectRate {
} else if n < c.Config.Amqp.ReleaseRate+c.Config.Amqp.RejectRate {
return "reject", c.Receiver.RejectMessage(ctx, msg, nil)
}
return "accept", c.Receiver.AcceptMessage(ctx, msg)
Expand Down

0 comments on commit 6324774

Please sign in to comment.