diff --git a/src/Queue/RabbitMQ/RabbitMQQueueManager.php b/src/Queue/RabbitMQ/RabbitMQQueueManager.php index 2c5e68f..30affb3 100644 --- a/src/Queue/RabbitMQ/RabbitMQQueueManager.php +++ b/src/Queue/RabbitMQ/RabbitMQQueueManager.php @@ -12,6 +12,7 @@ use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; use Psr\Log\LoggerInterface; +use Throwable; use function count; use function sprintf; @@ -20,7 +21,7 @@ class RabbitMQQueueManager implements QueueManagerInterface public const PREFETCH_COUNT = 'prefetchCount'; public const NO_ACK = 'noAck'; private const QUEUES_EXCHANGE_SUFFIX = '.sync'; - private const MAX_RECONNECTS = 3; + private const MAX_RECONNECTS = 15; /** * @var ConnectionFactoryInterface @@ -92,7 +93,7 @@ public function consumeMessages(callable $consumer, string $queueName, array $pa ['exception' => $exception] ); - $this->reconnect(); + $this->reconnect($exception, $queueName); $this->setUpChannel($prefetchCount, $queueName, $noAck, $consumer); } } @@ -210,7 +211,7 @@ private function publishMessage(string $message, string $queueName, array $prope try { $this->getChannel()->basic_publish($amqpMessage, $this->getQueueExchangeName($queueName)); } catch (AMQPRuntimeException $exception) { - $this->reconnect(); + $this->reconnect($exception, $queueName); $this->getChannel()->basic_publish($amqpMessage, $this->getQueueExchangeName($queueName)); } @@ -227,7 +228,7 @@ public function closeConnection(): void /** * @throws ConnectionException */ - private function reconnect(): void + private function reconnect(Throwable $exception, string $queueName): void { if ($this->reconnectCounter >= self::MAX_RECONNECTS) { throw ConnectionException::createMaximumReconnectLimitReached(self::MAX_RECONNECTS); @@ -236,6 +237,14 @@ private function reconnect(): void $this->connection = $this->createConnection(); $this->channel = $this->createChannel(); $this->reconnectCounter++; + + $this->logger->warning( + 'Reconnecting: ' . $exception->getMessage(), + [ + 'queueName' => $queueName, + 'exception' => $exception->getTraceAsString(), + ] + ); } diff --git a/tests/Queue/RabbitMQ/RabbitMQQueueManagerTest.php b/tests/Queue/RabbitMQ/RabbitMQQueueManagerTest.php index 23758ef..24d30d9 100644 --- a/tests/Queue/RabbitMQ/RabbitMQQueueManagerTest.php +++ b/tests/Queue/RabbitMQ/RabbitMQQueueManagerTest.php @@ -174,6 +174,13 @@ static function (AMQPMessage $message) use ($exampleJob): bool { ) ->once(); + $this->loggerMock->shouldReceive('warning') + ->with( + 'Reconnecting: Broken pipe', + Mockery::hasKey('queueName') + ) + ->once(); + $queueManager = $this->createQueueManager(); $queueManager->push($exampleJob); } @@ -259,6 +266,10 @@ static function () use ($amqpChannelMock): void { ->with('AMQPChannel disconnected: Broken pipe', ['exception' => $brokenPipeException]) ->once(); + $this->loggerMock->shouldReceive('warning') + ->with('Reconnecting: Broken pipe', Mockery::hasKey('queueName')) + ->once(); + $queueManager = $this->createQueueManager(); $queueManager->consumeMessages( $expectedCallback, @@ -273,7 +284,7 @@ static function () use ($amqpChannelMock): void { public function testConsumeWithMaximumReconnectLimitReached(): void { - $this->expectSetUpConnection(4, 4); + $this->expectSetUpConnection(16, 16); $expectedCallback = static function (AMQPMessage $message): void { }; @@ -282,11 +293,11 @@ public function testConsumeWithMaximumReconnectLimitReached(): void $amqpChannelMock->shouldReceive('basic_qos') ->with(0, 2, false) - ->times(4); + ->times(16); $amqpChannelMock->shouldReceive('basic_consume') ->with(ExampleJobDefinition::QUEUE_NAME, '', false, true, false, false, $expectedCallback) - ->times(4); + ->times(16); $callbackMock = static function (): void { }; @@ -294,17 +305,21 @@ public function testConsumeWithMaximumReconnectLimitReached(): void $amqpChannelMock->callbacks = [$callbackMock]; $brokenPipeException = new AMQPRuntimeException('Broken pipe'); $amqpChannelMock->shouldReceive('wait') - ->times(4) + ->times(16) ->andThrow($brokenPipeException); $this->loggerMock->shouldReceive('warning') ->with('AMQPChannel disconnected: Broken pipe', ['exception' => $brokenPipeException]) - ->times(4); + ->times(16); + + $this->loggerMock->shouldReceive('warning') + ->with('Reconnecting: Broken pipe', Mockery::hasKey('queueName')) + ->times(15); $queueManager = $this->createQueueManager(); $this->expectException(ConnectionException::class); - $this->expectExceptionMessage('Maximum reconnects limit (3) reached'); + $this->expectExceptionMessage('Maximum reconnects limit (15) reached'); $queueManager->consumeMessages( $expectedCallback,