Skip to content

Commit

Permalink
Fix the deadlock in the dispatcher #1369 (#1370)
Browse files Browse the repository at this point in the history
* Fix the deadlock in the dispatcher

It occurs when the rabbitmq server is down while the dispatcher is
waiting for the response from the subscriber.

* Add comments
  • Loading branch information
ckyoog authored Apr 9, 2024
1 parent b59b8fd commit 5e97d06
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (a *Adapter) PollForMessages(stopCh <-chan struct{}) error {
false,
amqp.Table{},
); err == nil {
connNotifyChannel, chNotifyChannel := a.rmqHelper.GetConnection().NotifyClose(make(chan *amqp.Error)), channel.NotifyClose(make(chan *amqp.Error))
connNotifyChannel, chNotifyChannel := a.rmqHelper.GetConnection().NotifyClose(make(chan *amqp.Error, 1)), channel.NotifyClose(make(chan *amqp.Error, 1))
if msgs, err = a.ConsumeMessages(&queue, channel, logger); err == nil {
loop:
for {
Expand Down
18 changes: 13 additions & 5 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,27 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC
}()
}
// get connections notify channel to watch for any unexpected disconnection
connNotifyChannel, chNotifyChannel := conn.NotifyClose(make(chan *amqp.Error)), channel.NotifyClose(make(chan *amqp.Error))
connNotifyChannel, chNotifyChannel := conn.NotifyClose(make(chan *amqp.Error, 1)), channel.NotifyClose(make(chan *amqp.Error, 1))
for {
select {
case <-ctx.Done():
logging.FromContext(ctx).Info("context done, stopping message consumers")
finishConsuming(wg, workerQueue)
return ctx.Err()
case <-connNotifyChannel:
case err = <-connNotifyChannel:
finishConsuming(wg, workerQueue)
return amqp.ErrClosed
case <-chNotifyChannel:
// No error will be available in case of a graceful connection close
if err == nil {
err = amqp.ErrClosed
}
return err
case err = <-chNotifyChannel:
finishConsuming(wg, workerQueue)
return amqp.ErrClosed
// No error will be available in case of a graceful connection close
if err == nil {
err = amqp.ErrClosed
}
return err
case msg, ok := <-msgs:
if !ok {
finishConsuming(wg, workerQueue)
Expand Down
4 changes: 2 additions & 2 deletions pkg/rabbit/connections_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func (r *RabbitMQConnectionHandler) watchRabbitMQConnections(
r.logger.Info("stopped watching for rabbitmq connections")
r.closeRabbitMQConnections()
return
case <-r.GetConnection().NotifyClose(make(chan *amqp091.Error)):
case <-r.GetChannel().NotifyClose(make(chan *amqp091.Error)):
case <-r.GetConnection().NotifyClose(make(chan *amqp091.Error, 1)):
case <-r.GetChannel().NotifyClose(make(chan *amqp091.Error, 1)):
}
r.closeRabbitMQConnections()
if err := r.createConnectionAndChannel(ctx, rabbitMQURL, configFunction, dialFunc); err != nil {
Expand Down

0 comments on commit 5e97d06

Please sign in to comment.