From e79bf2579c93f7d3fe2b0597d3e95171095670fc Mon Sep 17 00:00:00 2001 From: Tetsuya Morimoto Date: Mon, 15 May 2023 13:34:30 +0900 Subject: [PATCH 1/3] Add Channel.ConsumeWithContext to be able to cancel delivering --- channel.go | 73 +++++++++++++++++++++++++++++++++++++++++++++ integration_test.go | 44 +++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) diff --git a/channel.go b/channel.go index ae6f2d1..ca28f7d 100644 --- a/channel.go +++ b/channel.go @@ -1089,8 +1089,70 @@ be dropped. When the consumer tag is cancelled, all inflight messages will be delivered until the returned chan is closed. + +Deprecated: Use ConsumeWithContext instead. */ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) { + return ch.ConsumeWithContext(context.Background(), queue, consumer, autoAck, exclusive, noLocal, noWait, args) +} + +/* +ConsumeWithContext immediately starts delivering queued messages. + +Begin receiving on the returned chan Delivery before any other operation on the +Connection or Channel. + +Continues deliveries to the returned chan Delivery until Channel.Cancel, +Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must +range over the chan to ensure all deliveries are received. Unreceived +deliveries will block all methods on the same connection. + +All deliveries in AMQP must be acknowledged. It is expected of the consumer to +call Delivery.Ack after it has successfully processed the delivery. If the +consumer is cancelled or the channel or connection is closed any unacknowledged +deliveries will be requeued at the end of the same queue. + +The consumer is identified by a string that is unique and scoped for all +consumers on this channel. If you wish to eventually cancel the consumer, use +the same non-empty identifier in Channel.Cancel. An empty string will cause +the library to generate a unique identity. The consumer identity will be +included in every Delivery in the ConsumerTag field + +When autoAck (also known as noAck) is true, the server will acknowledge +deliveries to this consumer prior to writing the delivery to the network. When +autoAck is true, the consumer should not call Delivery.Ack. Automatically +acknowledging deliveries means that some deliveries may get lost if the +consumer is unable to process them after the server delivers them. +See http://www.rabbitmq.com/confirms.html for more details. + +When exclusive is true, the server will ensure that this is the sole consumer +from this queue. When exclusive is false, the server will fairly distribute +deliveries across multiple consumers. + +The noLocal flag is not supported by RabbitMQ. + +It's advisable to use separate connections for +Channel.Publish and Channel.Consume so not to have TCP pushback on publishing +affect the ability to consume messages, so this parameter is here mostly for +completeness. + +When noWait is true, do not wait for the server to confirm the request and +immediately begin deliveries. If it is not possible to consume, a channel +exception will be raised and the channel will be closed. + +Optional arguments can be provided that have specific semantics for the queue +or server. + +Inflight messages, limited by Channel.Qos will be buffered until received from +the returned chan. + +When the Channel or Connection is closed, all buffered and inflight messages will +be dropped. + +When the consumer tag is cancelled, all inflight messages will be delivered until +the returned chan is closed. +*/ +func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) { // When we return from ch.call, there may be a delivery already for the // consumer that hasn't been added to the consumer hash yet. Because of // this, we never rely on the server picking a consumer tag for us. @@ -1123,6 +1185,17 @@ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, return nil, err } + go func() { + select { + case <-ch.consumers.closed: + return + case <-ctx.Done(): + if ch != nil { + ch.Cancel(consumer, false) + } + } + }() + return deliveries, nil } diff --git a/integration_test.go b/integration_test.go index d9ec51f..0c17e7e 100644 --- a/integration_test.go +++ b/integration_test.go @@ -10,6 +10,7 @@ package amqp091 import ( "bytes" + "context" devrand "crypto/rand" "encoding/binary" "fmt" @@ -819,6 +820,49 @@ func TestIntegrationConsumeCancel(t *testing.T) { } } +func TestIntegrationConsumeCancelWithContext(t *testing.T) { + queue := "test.integration.consume-cancel-with-context" + + c := integrationConnection(t, "pub") + + if c != nil { + defer c.Close() + + ch, _ := c.Channel() + + if _, e := ch.QueueDeclare(queue, false, true, false, false, nil); e != nil { + t.Fatalf("error declaring queue %s: %v", queue, e) + } + + defer integrationQueueDelete(t, ch, queue) + + ctx, cancel := context.WithCancel(context.Background()) + messages, _ := ch.ConsumeWithContext(ctx, queue, "integration-tag-with-context", false, false, false, false, nil) + + if e := ch.Publish("", queue, false, false, Publishing{Body: []byte("1")}); e != nil { + t.Fatalf("error publishing: %v", e) + } + + assertConsumeBody(t, messages, []byte("1")) + + cancel() + <-time.After(100 * time.Millisecond) // wait to call cancel asynchronously + + if e := ch.Publish("", queue, false, false, Publishing{Body: []byte("2")}); e != nil { + t.Fatalf("error publishing: %v", e) + } + + select { + case <-time.After(100 * time.Millisecond): + t.Fatalf("Timeout on Close") + case _, ok := <-messages: + if ok { + t.Fatalf("Extra message on consumer when consumer should have been closed") + } + } + } +} + func (c *Connection) Generate(_ *rand.Rand, _ int) reflect.Value { urlStr := amqpURL From 1a875e1491157b1f77339794171c38406418538e Mon Sep 17 00:00:00 2001 From: Tetsuya Morimoto Date: Tue, 16 May 2023 09:41:33 +0900 Subject: [PATCH 2/3] Fix trivial link check --- channel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/channel.go b/channel.go index ca28f7d..7a628f2 100644 --- a/channel.go +++ b/channel.go @@ -1191,7 +1191,7 @@ func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer strin return case <-ctx.Done(): if ch != nil { - ch.Cancel(consumer, false) + _ = ch.Cancel(consumer, false) } } }() From d02f59062b43c5f826e6c7ed794f64e939889661 Mon Sep 17 00:00:00 2001 From: Tetsuya Morimoto Date: Mon, 12 Jun 2023 18:16:03 +0900 Subject: [PATCH 3/3] Update ConsumeWithContext() description and drop deprecating Consume() --- channel.go | 57 +++--------------------------------------------------- 1 file changed, 3 insertions(+), 54 deletions(-) diff --git a/channel.go b/channel.go index 7a628f2..a7ed8da 100644 --- a/channel.go +++ b/channel.go @@ -1089,8 +1089,6 @@ be dropped. When the consumer tag is cancelled, all inflight messages will be delivered until the returned chan is closed. - -Deprecated: Use ConsumeWithContext instead. */ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) { return ch.ConsumeWithContext(context.Background(), queue, consumer, autoAck, exclusive, noLocal, noWait, args) @@ -1099,58 +1097,9 @@ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, /* ConsumeWithContext immediately starts delivering queued messages. -Begin receiving on the returned chan Delivery before any other operation on the -Connection or Channel. - -Continues deliveries to the returned chan Delivery until Channel.Cancel, -Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must -range over the chan to ensure all deliveries are received. Unreceived -deliveries will block all methods on the same connection. - -All deliveries in AMQP must be acknowledged. It is expected of the consumer to -call Delivery.Ack after it has successfully processed the delivery. If the -consumer is cancelled or the channel or connection is closed any unacknowledged -deliveries will be requeued at the end of the same queue. - -The consumer is identified by a string that is unique and scoped for all -consumers on this channel. If you wish to eventually cancel the consumer, use -the same non-empty identifier in Channel.Cancel. An empty string will cause -the library to generate a unique identity. The consumer identity will be -included in every Delivery in the ConsumerTag field - -When autoAck (also known as noAck) is true, the server will acknowledge -deliveries to this consumer prior to writing the delivery to the network. When -autoAck is true, the consumer should not call Delivery.Ack. Automatically -acknowledging deliveries means that some deliveries may get lost if the -consumer is unable to process them after the server delivers them. -See http://www.rabbitmq.com/confirms.html for more details. - -When exclusive is true, the server will ensure that this is the sole consumer -from this queue. When exclusive is false, the server will fairly distribute -deliveries across multiple consumers. - -The noLocal flag is not supported by RabbitMQ. - -It's advisable to use separate connections for -Channel.Publish and Channel.Consume so not to have TCP pushback on publishing -affect the ability to consume messages, so this parameter is here mostly for -completeness. - -When noWait is true, do not wait for the server to confirm the request and -immediately begin deliveries. If it is not possible to consume, a channel -exception will be raised and the channel will be closed. - -Optional arguments can be provided that have specific semantics for the queue -or server. - -Inflight messages, limited by Channel.Qos will be buffered until received from -the returned chan. - -When the Channel or Connection is closed, all buffered and inflight messages will -be dropped. - -When the consumer tag is cancelled, all inflight messages will be delivered until -the returned chan is closed. +This is similar to Consume() function but has different semantics. +The caller can cancel via the given context, then call ch.Cancel() and stop +receiving messages. */ func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) { // When we return from ch.call, there may be a delivery already for the