diff --git a/_examples/consumer/consumer.go b/_examples/consumer/consumer.go index 63a7b4f..fcf81c4 100644 --- a/_examples/consumer/consumer.go +++ b/_examples/consumer/consumer.go @@ -96,7 +96,7 @@ func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) ( } go func() { - Log.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error))) + Log.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error, 1))) }() Log.Printf("got Connection, getting Channel") diff --git a/channel.go b/channel.go index 96ebd0b..bdd30eb 100644 --- a/channel.go +++ b/channel.go @@ -504,8 +504,14 @@ graceful close, no error will be sent. In case of a non graceful close the error will be notified synchronously by the library so that it will be necessary to consume the Channel from the caller in order to avoid deadlocks + +The chan provided must be a buffered channel of size 1. */ func (ch *Channel) NotifyClose(c chan *Error) chan *Error { + if cap(c) != 1 { + panic("channel.NotifyClose expectes cap=1 buffered channel") + } + ch.notifyM.Lock() defer ch.notifyM.Unlock() diff --git a/client_test.go b/client_test.go index 7d65881..988aec4 100644 --- a/client_test.go +++ b/client_test.go @@ -632,7 +632,41 @@ func TestNotifyClosesReusedPublisherConfirmChan(t *testing.T) { } } -func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) { +func TestConnectionNotifyCloseAcceptsOnlyBufferedChannels(t *testing.T) { + rwc, srv := newSession(t) + t.Cleanup(func() { rwc.Close() }) + + go func() { + srv.connectionOpen() + + srv.recv(0, &connectionClose{}) + srv.send(0, &connectionCloseOk{}) + }() + + c, err := Open(rwc, defaultConfig()) + if err != nil { + t.Fatalf("could not create connection: %v (%s)", c, err) + } + + + if err := c.Close(); err != nil { + t.Fatalf("could not close connection: %v (%s)", c, err) + } + + defer func() { + _ = recover() + }() + + select { + case <-c.NotifyClose(make(chan *Error)): + case <-time.After(time.Millisecond): + t.Errorf("expected to close NotifyClose chan after Connection.Close") + } + + t.Errorf("connection.NotifyClose shouldn't accept unbuffered channels") +} + +func TestChannelNotifyClosesAllChansAfterConnectionClose(t *testing.T) { rwc, srv := newSession(t) go func() { @@ -657,8 +691,10 @@ func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) { t.Fatalf("could not close connection: %v (%s)", c, err) } + defer func() { _ = recover() }() + select { - case <-c.NotifyClose(make(chan *Error)): + case <-c.NotifyClose(make(chan *Error, 1)): case <-time.After(time.Millisecond): t.Errorf("expected to close NotifyClose chan after Connection.Close") } @@ -669,6 +705,46 @@ func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) { t.Errorf("expected to close Connection.NotifyClose chan after Connection.Close") } + t.Errorf("connection.NotifyClose shouldn't accept unbuffered channels") +} + +func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) { + rwc, srv := newSession(t) + + go func() { + srv.connectionOpen() + srv.channelOpen(1) + + srv.recv(0, &connectionClose{}) + srv.send(0, &connectionCloseOk{}) + }() + + c, err := Open(rwc, defaultConfig()) + if err != nil { + t.Fatalf("could not create connection: %v (%s)", c, err) + } + + ch, err := c.Channel() + if err != nil { + t.Fatalf("could not open channel: %v (%s)", ch, err) + } + + if err := c.Close(); err != nil { + t.Fatalf("could not close connection: %v (%s)", c, err) + } + + select { + case <-c.NotifyClose(make(chan *Error, 1)): + case <-time.After(time.Millisecond): + t.Errorf("expected to close NotifyClose chan after Connection.Close") + } + + select { + case <-ch.NotifyClose(make(chan *Error, 1)): + case <-time.After(time.Millisecond): + t.Errorf("expected to close Connection.NotifyClose chan after Connection.Close") + } + select { case <-ch.NotifyFlow(make(chan bool)): case <-time.After(time.Millisecond): diff --git a/connection.go b/connection.go index f072bd8..2a518e3 100644 --- a/connection.go +++ b/connection.go @@ -368,8 +368,14 @@ so that it will be necessary to consume the Channel from the caller in order to To reconnect after a transport or protocol error, register a listener here and re-run your setup process. + +The chan provided must be a buffered channel of size 1. */ func (c *Connection) NotifyClose(receiver chan *Error) chan *Error { + if cap(receiver) != 1 { + panic("channel.NotifyClose expectes cap=1 buffered channel") + } + c.m.Lock() defer c.m.Unlock() diff --git a/doc.go b/doc.go index 8cb0b64..21cfd2e 100644 --- a/doc.go +++ b/doc.go @@ -110,40 +110,11 @@ In order to be notified when a connection or channel gets closed, both structures offer the possibility to register channels using [Channel.NotifyClose] and [Connection.NotifyClose] functions: - notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error)) + notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1)) No errors will be sent in case of a graceful connection close. In case of a non-graceful closure due to e.g. network issue, or forced connection closure -from the Management UI, the error will be notified synchronously by the library. - -The error is sent synchronously to the channel, so that the flow will wait until -the receiver consumes from the channel. To avoid deadlocks in the library, it is -necessary to consume from the channels. This could be done inside a -different goroutine with a select listening on the two channels inside a for -loop like: - - go func() { - for notifyConnClose != nil || notifyChanClose != nil { - select { - case err, ok := <-notifyConnClose: - if !ok { - notifyConnClose = nil - } else { - fmt.Printf("connection closed, error %s", err) - } - case err, ok := <-notifyChanClose: - if !ok { - notifyChanClose = nil - } else { - fmt.Printf("channel closed, error %s", err) - } - } - } - }() - -Another approach is to use buffered channels: - - notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1)) +from the Management UI, the error will be notified by the library. The library sends to notification channels just once. After sending a notification to all channels, the library closes all registered notification channels. After diff --git a/integration_test.go b/integration_test.go index b600d59..095767e 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1644,7 +1644,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) { t.Cleanup(func() { conn.Close() }) go func() { - for err := range conn.NotifyClose(make(chan *Error)) { + for err := range conn.NotifyClose(make(chan *Error, 1)) { t.Log(err.Error()) } }() @@ -1655,7 +1655,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) { } go func() { - for err := range c1.NotifyClose(make(chan *Error)) { + for err := range c1.NotifyClose(make(chan *Error, 1)) { t.Log("Channel1 Close: " + err.Error()) } }() @@ -1666,7 +1666,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) { } go func() { - for err := range c2.NotifyClose(make(chan *Error)) { + for err := range c2.NotifyClose(make(chan *Error, 1)) { t.Log("Channel2 Close: " + err.Error()) } }()