Skip to content

Commit

Permalink
Handle context cancellation gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Aug 19, 2024
1 parent 1be7aaa commit ac9e79c
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
8 changes: 7 additions & 1 deletion pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func (c *Amqp10Consumer) CreateReceiver(ctx context.Context) {
for c.Receiver == nil {
receiver, err := c.Session.NewReceiver(ctx, c.Topic, &amqp.ReceiverOptions{SourceDurability: durability, Credit: int32(c.Config.ConsumerCredits), Properties: buildLinkProperties(c.Config), Filters: buildLinkFilters(c.Config)})
if err != nil {
if err == context.Canceled {
return
}
log.Error("consumer failed to create a receiver", "consumerId", c.Id, "error", err.Error())
time.Sleep(1 * time.Second)
} else {
Expand Down Expand Up @@ -142,7 +145,10 @@ func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
default:
msg, err := c.Receiver.Receive(ctx, nil)
if err != nil {
log.Error("failed to receive a message", "consumerId", c.Id, "terminus", c.Topic)
if err == context.Canceled {
return
}
log.Error("failed to receive a message", "consumerId", c.Id, "terminus", c.Topic, "error", err.Error())
c.Connect(ctx)
continue
}
Expand Down
1 change: 0 additions & 1 deletion pkg/stomp_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,5 @@ func buildSubscribeOpts(cfg config.Config) []func(*frame.Frame) error {
subscribeOpts = append(subscribeOpts,
stomp.SubscribeOpt.Header("x-stream-filter", cfg.StreamFilterValues))
}
log.Info("subscribe options", "filter", cfg.StreamFilterValues)
return subscribeOpts
}

0 comments on commit ac9e79c

Please sign in to comment.