Skip to content

Commit

Permalink
Channel reconnect (#25)
Browse files Browse the repository at this point in the history
* set up channel after reconnect
  • Loading branch information
tomasJancar authored Apr 20, 2021
1 parent 6dcd10c commit c8e4cd7
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 27 deletions.
73 changes: 67 additions & 6 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ name: CI
on: [pull_request, workflow_dispatch]

jobs:
build:
phpstan:
name: CI PHPStan
runs-on: ubuntu-latest
strategy:
fail-fast: false

steps:
- name: Checkout
Expand All @@ -14,7 +17,7 @@ jobs:
uses: shivammathur/setup-php@v2
with:
php-version: 7.2
extensions: mbstring
extensions: mbstring, sockets

- name: Get composer cache directory
id: composercache
Expand All @@ -28,14 +31,72 @@ jobs:
restore-keys: ${{ runner.os }}-composer-

- name: Install dependencies
run: composer install --no-progress --prefer-dist --optimize-autoloader

run: |
composer install --no-progress --prefer-dist --optimize-autoloader
- name: Run phpstan
run: composer phpstan

- name: Run code-sniffer
code-sniffer:
name: CI Code sniffer
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: 7.2
extensions: mbstring, sockets

- name: Get composer cache directory
id: composercache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"

- name: Cache composer dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composercache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.lock') }}
restore-keys: ${{ runner.os }}-composer-

- name: Install dependencies
run: |
composer install --no-progress --prefer-dist --optimize-autoloader
- name: Run phpstan
run: composer phpcs

phpunit:
name: CI Tests
runs-on: ubuntu-latest

strategy:
matrix:
php-version: [ '7.2', '7.4' ]

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: 7.2
extensions: mbstring, sockets

- name: Get composer cache directory
id: composercache
run: echo "::set-output name=dir::$(composer config cache-files-dir)"

- name: Cache composer dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composercache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.lock') }}
restore-keys: ${{ runner.os }}-composer-

- name: Install dependencies
run: |
composer install --no-progress --prefer-dist --optimize-autoloader
- name: Run phpunit
run: composer phpunit

4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
"brandembassy/mockery-tools": "^2.0",
"mockery/mockery": "^1.2",
"phpunit/phpunit": "^8.5",
"roave/security-advisories": "dev-master"
"roave/security-advisories": "dev-latest"
},
"scripts": {
"phpcs": "./vendor/bin/phpcs --standard=BrandEmbassyCodingStandard src tests",
"phpcbf": "./vendor/bin/phpcbf --standard=BrandEmbassyCodingStandard src tests",
"phpstan": "./vendor/bin/phpstan analyze -c phpstan.neon src tests",
"phpstan": "./vendor/bin/phpstan analyze -c phpstan.neon src tests --memory-limit=-1",
"phpunit": "./vendor/bin/phpunit tests --no-coverage",
"phpunit-cc": "./vendor/bin/phpunit tests"
}
Expand Down
4 changes: 4 additions & 0 deletions phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@
<log type="coverage-html" target="./var/code-coverage" lowUpperBound="50" highLowerBound="80"/>
<log type="testdox-html" target="./var/code-coverage/index.html"/>
</logging>

<extensions>
<extension class="BrandEmbassy\MockeryTools\PhpUnit\BypassFinalHook"/>
</extensions>
</phpunit>
16 changes: 14 additions & 2 deletions src/Queue/RabbitMQ/RabbitMQQueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ public function __construct(ConnectionFactoryInterface $connectionFactory, Logge
}


private function setUpChannel(int $prefetchCount, string $queueName, bool $noAck, callable $consumer): void
{
$this->getChannel()->basic_qos(0, $prefetchCount, false);
$this->getChannel()->basic_consume($queueName, '', false, $noAck, false, false, $consumer);
}


/**
* @param mixed[] $parameters
*/
Expand All @@ -65,14 +72,19 @@ public function consumeMessages(callable $consumer, string $queueName, array $pa

$this->declareQueueIfNotDeclared($queueName);

$this->getChannel()->basic_qos(0, $prefetchCount, false);
$this->getChannel()->basic_consume($queueName, '', false, $noAck, false, false, $consumer);
$this->setUpChannel($prefetchCount, $queueName, $noAck, $consumer);

while (count($this->getChannel()->callbacks) > 0) {
try {
$this->getChannel()->wait();
} catch (AMQPRuntimeException $exception) {
$this->logger->warning(
'AMQPChannel disconnected: ' . $exception->getMessage(),
['exception' => $exception]
);

$this->reconnect();
$this->setUpChannel($prefetchCount, $queueName, $noAck, $consumer);
}
}

Expand Down
6 changes: 3 additions & 3 deletions tests/Jobs/JobDefinitions/JobDefinitionsContainerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public function testGetJobDefinition(): void

$simpleJobDefinition = $jobDefinitionContainer->get(self::SIMPLE_JOB_NAME);

self::assertTrue($jobDefinitionContainer->has(self::SIMPLE_JOB_NAME));
self::assertFalse($jobDefinitionContainer->has('unknownJobName'));
self::assertNull($simpleJobDefinition->getMaxAttempts());
Assert::assertTrue($jobDefinitionContainer->has(self::SIMPLE_JOB_NAME));
Assert::assertFalse($jobDefinitionContainer->has('unknownJobName'));
Assert::assertNull($simpleJobDefinition->getMaxAttempts());
Assert::assertSame(ExampleJobDefinition::QUEUE_NAME, $simpleJobDefinition->getQueueName());
Assert::assertSame(ExampleJobDefinition::QUEUE_NAME, $simpleJobDefinition->getQueueName());
Assert::assertSame($exampleJobProcessor, $simpleJobDefinition->getJobProcessor());
Expand Down
13 changes: 7 additions & 6 deletions tests/Jobs/JobTerminatorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Mockery;
use Mockery\Adapter\Phpunit\MockeryPHPUnitIntegration;
use Mockery\MockInterface;
use PHPUnit\Framework\Assert;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;

Expand All @@ -17,12 +18,12 @@ final class JobTerminatorTest extends TestCase
private const MINIMUM_ATTEMPTS = 20;

/**
* @var JobUuidBlacklistInterface|MockInterface
* @var JobUuidBlacklistInterface&MockInterface
*/
private $jobUuidBlacklistMock;

/**
* @var MockInterface|LoggerInterface
* @var MockInterface&LoggerInterface
*/
private $loggerMock;

Expand All @@ -39,9 +40,9 @@ public function testShouldNotBeTerminated(): void
{
$jobTerminator = $this->createJobTerminator();

self::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 20));
self::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 19));
self::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 1));
Assert::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 20));
Assert::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 19));
Assert::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 1));
}


Expand All @@ -54,7 +55,7 @@ public function testShouldBeTerminated(): void
->once()
->andReturnTrue();

self::assertTrue($jobTerminator->shouldBeTerminated('some-uuid', 21));
Assert::assertTrue($jobTerminator->shouldBeTerminated('some-uuid', 21));
}


Expand Down
6 changes: 3 additions & 3 deletions tests/Queue/RabbitMQ/RabbitMQConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ final class RabbitMQConsumerTest extends TestCase
private const AMQP_TAG = 'someAmqpTag';

/**
* @var LoggerInterface|MockInterface
* @var LoggerInterface&MockInterface
*/
private $loggerMock;

/**
* @var JobExecutorInterface|MockInterface
* @var JobExecutorInterface&MockInterface
*/
private $jobExecutorMock;

/**
* @var PushDelayedResolver|MockInterface
* @var PushDelayedResolver&MockInterface
*/
private $pushDelayedResolverMock;

Expand Down
11 changes: 8 additions & 3 deletions tests/Queue/RabbitMQ/RabbitMQQueueManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,20 @@ public function testConsumeWithReconnect(): void

$amqpChannelMock->shouldReceive('basic_qos')
->with(0, 2, false)
->once();
->twice();

$amqpChannelMock->shouldReceive('basic_consume')
->with(ExampleJobDefinition::QUEUE_NAME, '', false, true, false, false, $expectedCallback)
->once();
->twice();

$callbackMock = static function (): void {
};

$amqpChannelMock->callbacks = [$callbackMock];
$brokenPipeException = new AMQPRuntimeException('Broken pipe');
$amqpChannelMock->shouldReceive('wait')
->once()
->andThrow(new AMQPRuntimeException('Broken pipe'));
->andThrow($brokenPipeException);
$amqpChannelMock->shouldReceive('wait')
->once()
->andReturnUsing(
Expand All @@ -253,6 +254,10 @@ static function () use ($amqpChannelMock): void {
->withNoArgs()
->once();

$this->loggerMock->shouldReceive('warning')
->with('AMQPChannel disconnected: Broken pipe', ['exception' => $brokenPipeException])
->once();

$queueManager = $this->createQueueManager();
$queueManager->consumeMessages(
$expectedCallback,
Expand Down
4 changes: 2 additions & 2 deletions tests/Queue/RabbitMQ/RabbitMQWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ final class RabbitMQWorkerTest extends TestCase
use MockeryPHPUnitIntegration;

/**
* @var RabbitMQQueueManager|MockInterface
* @var RabbitMQQueueManager&MockInterface
*/
private $rabbitMQQueueManagerMock;

/**
* @var RabbitMQConsumer|MockInterface
* @var RabbitMQConsumer&MockInterface
*/
private $rabbitMQConsumerMock;

Expand Down

0 comments on commit c8e4cd7

Please sign in to comment.