Skip to content

Commit

Permalink
add log for reconnection, max limit for MAX_RECONNECTS (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasJancar authored Sep 15, 2021
1 parent 706e176 commit 3c1aebe
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
17 changes: 13 additions & 4 deletions src/Queue/RabbitMQ/RabbitMQQueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use Psr\Log\LoggerInterface;
use Throwable;
use function count;
use function sprintf;

Expand All @@ -20,7 +21,7 @@ class RabbitMQQueueManager implements QueueManagerInterface
public const PREFETCH_COUNT = 'prefetchCount';
public const NO_ACK = 'noAck';
private const QUEUES_EXCHANGE_SUFFIX = '.sync';
private const MAX_RECONNECTS = 3;
private const MAX_RECONNECTS = 15;

/**
* @var ConnectionFactoryInterface
Expand Down Expand Up @@ -92,7 +93,7 @@ public function consumeMessages(callable $consumer, string $queueName, array $pa
['exception' => $exception]
);

$this->reconnect();
$this->reconnect($exception, $queueName);
$this->setUpChannel($prefetchCount, $queueName, $noAck, $consumer);
}
}
Expand Down Expand Up @@ -210,7 +211,7 @@ private function publishMessage(string $message, string $queueName, array $prope
try {
$this->getChannel()->basic_publish($amqpMessage, $this->getQueueExchangeName($queueName));
} catch (AMQPRuntimeException $exception) {
$this->reconnect();
$this->reconnect($exception, $queueName);

$this->getChannel()->basic_publish($amqpMessage, $this->getQueueExchangeName($queueName));
}
Expand All @@ -227,7 +228,7 @@ public function closeConnection(): void
/**
* @throws ConnectionException
*/
private function reconnect(): void
private function reconnect(Throwable $exception, string $queueName): void
{
if ($this->reconnectCounter >= self::MAX_RECONNECTS) {
throw ConnectionException::createMaximumReconnectLimitReached(self::MAX_RECONNECTS);
Expand All @@ -236,6 +237,14 @@ private function reconnect(): void
$this->connection = $this->createConnection();
$this->channel = $this->createChannel();
$this->reconnectCounter++;

$this->logger->warning(
'Reconnecting: ' . $exception->getMessage(),
[
'queueName' => $queueName,
'exception' => $exception->getTraceAsString(),
]
);
}


Expand Down
27 changes: 21 additions & 6 deletions tests/Queue/RabbitMQ/RabbitMQQueueManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ static function (AMQPMessage $message) use ($exampleJob): bool {
)
->once();

$this->loggerMock->shouldReceive('warning')
->with(
'Reconnecting: Broken pipe',
Mockery::hasKey('queueName')
)
->once();

$queueManager = $this->createQueueManager();
$queueManager->push($exampleJob);
}
Expand Down Expand Up @@ -259,6 +266,10 @@ static function () use ($amqpChannelMock): void {
->with('AMQPChannel disconnected: Broken pipe', ['exception' => $brokenPipeException])
->once();

$this->loggerMock->shouldReceive('warning')
->with('Reconnecting: Broken pipe', Mockery::hasKey('queueName'))
->once();

$queueManager = $this->createQueueManager();
$queueManager->consumeMessages(
$expectedCallback,
Expand All @@ -273,7 +284,7 @@ static function () use ($amqpChannelMock): void {

public function testConsumeWithMaximumReconnectLimitReached(): void
{
$this->expectSetUpConnection(4, 4);
$this->expectSetUpConnection(16, 16);

$expectedCallback = static function (AMQPMessage $message): void {
};
Expand All @@ -282,29 +293,33 @@ public function testConsumeWithMaximumReconnectLimitReached(): void

$amqpChannelMock->shouldReceive('basic_qos')
->with(0, 2, false)
->times(4);
->times(16);

$amqpChannelMock->shouldReceive('basic_consume')
->with(ExampleJobDefinition::QUEUE_NAME, '', false, true, false, false, $expectedCallback)
->times(4);
->times(16);

$callbackMock = static function (): void {
};

$amqpChannelMock->callbacks = [$callbackMock];
$brokenPipeException = new AMQPRuntimeException('Broken pipe');
$amqpChannelMock->shouldReceive('wait')
->times(4)
->times(16)
->andThrow($brokenPipeException);

$this->loggerMock->shouldReceive('warning')
->with('AMQPChannel disconnected: Broken pipe', ['exception' => $brokenPipeException])
->times(4);
->times(16);

$this->loggerMock->shouldReceive('warning')
->with('Reconnecting: Broken pipe', Mockery::hasKey('queueName'))
->times(15);

$queueManager = $this->createQueueManager();

$this->expectException(ConnectionException::class);
$this->expectExceptionMessage('Maximum reconnects limit (3) reached');
$this->expectExceptionMessage('Maximum reconnects limit (15) reached');

$queueManager->consumeMessages(
$expectedCallback,
Expand Down

0 comments on commit 3c1aebe

Please sign in to comment.