diff --git a/pkg/amqp10_client/consumer.go b/pkg/amqp10_client/consumer.go index 03c65ef..ac8c351 100644 --- a/pkg/amqp10_client/consumer.go +++ b/pkg/amqp10_client/consumer.go @@ -113,6 +113,9 @@ func (c *Amqp10Consumer) CreateReceiver(ctx context.Context) { for c.Receiver == nil { receiver, err := c.Session.NewReceiver(ctx, c.Topic, &amqp.ReceiverOptions{SourceDurability: durability, Credit: int32(c.Config.ConsumerCredits), Properties: buildLinkProperties(c.Config), Filters: buildLinkFilters(c.Config)}) if err != nil { + if err == context.Canceled { + return + } log.Error("consumer failed to create a receiver", "consumerId", c.Id, "error", err.Error()) time.Sleep(1 * time.Second) } else { @@ -142,7 +145,10 @@ func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) { default: msg, err := c.Receiver.Receive(ctx, nil) if err != nil { - log.Error("failed to receive a message", "consumerId", c.Id, "terminus", c.Topic) + if err == context.Canceled { + return + } + log.Error("failed to receive a message", "consumerId", c.Id, "terminus", c.Topic, "error", err.Error()) c.Connect(ctx) continue } diff --git a/pkg/stomp_client/consumer.go b/pkg/stomp_client/consumer.go index 8425391..93c3d2c 100644 --- a/pkg/stomp_client/consumer.go +++ b/pkg/stomp_client/consumer.go @@ -186,6 +186,5 @@ func buildSubscribeOpts(cfg config.Config) []func(*frame.Frame) error { subscribeOpts = append(subscribeOpts, stomp.SubscribeOpt.Header("x-stream-filter", cfg.StreamFilterValues)) } - log.Info("subscribe options", "filter", cfg.StreamFilterValues) return subscribeOpts }