diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 9e164c7..34ee892 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -3,8 +3,11 @@ name: CI
on: [pull_request, workflow_dispatch]
jobs:
- build:
+ phpstan:
+ name: CI PHPStan
runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
steps:
- name: Checkout
@@ -14,7 +17,7 @@ jobs:
uses: shivammathur/setup-php@v2
with:
php-version: 7.2
- extensions: mbstring
+ extensions: mbstring, sockets
- name: Get composer cache directory
id: composercache
@@ -28,14 +31,72 @@ jobs:
restore-keys: ${{ runner.os }}-composer-
- name: Install dependencies
- run: composer install --no-progress --prefer-dist --optimize-autoloader
-
+ run: |
+ composer install --no-progress --prefer-dist --optimize-autoloader
- name: Run phpstan
run: composer phpstan
- - name: Run code-sniffer
+ code-sniffer:
+ name: CI Code sniffer
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v2
+
+ - name: Setup PHP
+ uses: shivammathur/setup-php@v2
+ with:
+ php-version: 7.2
+ extensions: mbstring, sockets
+
+ - name: Get composer cache directory
+ id: composercache
+ run: echo "::set-output name=dir::$(composer config cache-files-dir)"
+
+ - name: Cache composer dependencies
+ uses: actions/cache@v2
+ with:
+ path: ${{ steps.composercache.outputs.dir }}
+ key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.lock') }}
+ restore-keys: ${{ runner.os }}-composer-
+
+ - name: Install dependencies
+ run: |
+ composer install --no-progress --prefer-dist --optimize-autoloader
+ - name: Run phpstan
run: composer phpcs
+ phpunit:
+ name: CI Tests
+ runs-on: ubuntu-latest
+
+ strategy:
+ matrix:
+ php-version: [ '7.2', '7.4' ]
+
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v2
+
+ - name: Setup PHP
+ uses: shivammathur/setup-php@v2
+ with:
+ php-version: 7.2
+ extensions: mbstring, sockets
+
+ - name: Get composer cache directory
+ id: composercache
+ run: echo "::set-output name=dir::$(composer config cache-files-dir)"
+
+ - name: Cache composer dependencies
+ uses: actions/cache@v2
+ with:
+ path: ${{ steps.composercache.outputs.dir }}
+ key: ${{ runner.os }}-composer-${{ hashFiles('**/composer.lock') }}
+ restore-keys: ${{ runner.os }}-composer-
+
+ - name: Install dependencies
+ run: |
+ composer install --no-progress --prefer-dist --optimize-autoloader
- name: Run phpunit
run: composer phpunit
-
diff --git a/composer.json b/composer.json
index f6a7dcd..c067b6f 100644
--- a/composer.json
+++ b/composer.json
@@ -23,12 +23,12 @@
"brandembassy/mockery-tools": "^2.0",
"mockery/mockery": "^1.2",
"phpunit/phpunit": "^8.5",
- "roave/security-advisories": "dev-master"
+ "roave/security-advisories": "dev-latest"
},
"scripts": {
"phpcs": "./vendor/bin/phpcs --standard=BrandEmbassyCodingStandard src tests",
"phpcbf": "./vendor/bin/phpcbf --standard=BrandEmbassyCodingStandard src tests",
- "phpstan": "./vendor/bin/phpstan analyze -c phpstan.neon src tests",
+ "phpstan": "./vendor/bin/phpstan analyze -c phpstan.neon src tests --memory-limit=-1",
"phpunit": "./vendor/bin/phpunit tests --no-coverage",
"phpunit-cc": "./vendor/bin/phpunit tests"
}
diff --git a/phpunit.xml b/phpunit.xml
index 4ad54dc..ca0cbe9 100644
--- a/phpunit.xml
+++ b/phpunit.xml
@@ -25,4 +25,8 @@
+
+
+
+
diff --git a/src/Queue/RabbitMQ/RabbitMQQueueManager.php b/src/Queue/RabbitMQ/RabbitMQQueueManager.php
index 3567ed4..6d8af2f 100644
--- a/src/Queue/RabbitMQ/RabbitMQQueueManager.php
+++ b/src/Queue/RabbitMQ/RabbitMQQueueManager.php
@@ -55,6 +55,13 @@ public function __construct(ConnectionFactoryInterface $connectionFactory, Logge
}
+ private function setUpChannel(int $prefetchCount, string $queueName, bool $noAck, callable $consumer): void
+ {
+ $this->getChannel()->basic_qos(0, $prefetchCount, false);
+ $this->getChannel()->basic_consume($queueName, '', false, $noAck, false, false, $consumer);
+ }
+
+
/**
* @param mixed[] $parameters
*/
@@ -65,14 +72,19 @@ public function consumeMessages(callable $consumer, string $queueName, array $pa
$this->declareQueueIfNotDeclared($queueName);
- $this->getChannel()->basic_qos(0, $prefetchCount, false);
- $this->getChannel()->basic_consume($queueName, '', false, $noAck, false, false, $consumer);
+ $this->setUpChannel($prefetchCount, $queueName, $noAck, $consumer);
while (count($this->getChannel()->callbacks) > 0) {
try {
$this->getChannel()->wait();
} catch (AMQPRuntimeException $exception) {
+ $this->logger->warning(
+ 'AMQPChannel disconnected: ' . $exception->getMessage(),
+ ['exception' => $exception]
+ );
+
$this->reconnect();
+ $this->setUpChannel($prefetchCount, $queueName, $noAck, $consumer);
}
}
diff --git a/tests/Jobs/JobDefinitions/JobDefinitionsContainerTest.php b/tests/Jobs/JobDefinitions/JobDefinitionsContainerTest.php
index c34ae59..2cf2c2f 100644
--- a/tests/Jobs/JobDefinitions/JobDefinitionsContainerTest.php
+++ b/tests/Jobs/JobDefinitions/JobDefinitionsContainerTest.php
@@ -41,9 +41,9 @@ public function testGetJobDefinition(): void
$simpleJobDefinition = $jobDefinitionContainer->get(self::SIMPLE_JOB_NAME);
- self::assertTrue($jobDefinitionContainer->has(self::SIMPLE_JOB_NAME));
- self::assertFalse($jobDefinitionContainer->has('unknownJobName'));
- self::assertNull($simpleJobDefinition->getMaxAttempts());
+ Assert::assertTrue($jobDefinitionContainer->has(self::SIMPLE_JOB_NAME));
+ Assert::assertFalse($jobDefinitionContainer->has('unknownJobName'));
+ Assert::assertNull($simpleJobDefinition->getMaxAttempts());
Assert::assertSame(ExampleJobDefinition::QUEUE_NAME, $simpleJobDefinition->getQueueName());
Assert::assertSame(ExampleJobDefinition::QUEUE_NAME, $simpleJobDefinition->getQueueName());
Assert::assertSame($exampleJobProcessor, $simpleJobDefinition->getJobProcessor());
diff --git a/tests/Jobs/JobTerminatorTest.php b/tests/Jobs/JobTerminatorTest.php
index fe2a9bb..1b6dad8 100644
--- a/tests/Jobs/JobTerminatorTest.php
+++ b/tests/Jobs/JobTerminatorTest.php
@@ -7,6 +7,7 @@
use Mockery;
use Mockery\Adapter\Phpunit\MockeryPHPUnitIntegration;
use Mockery\MockInterface;
+use PHPUnit\Framework\Assert;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
@@ -17,12 +18,12 @@ final class JobTerminatorTest extends TestCase
private const MINIMUM_ATTEMPTS = 20;
/**
- * @var JobUuidBlacklistInterface|MockInterface
+ * @var JobUuidBlacklistInterface&MockInterface
*/
private $jobUuidBlacklistMock;
/**
- * @var MockInterface|LoggerInterface
+ * @var MockInterface&LoggerInterface
*/
private $loggerMock;
@@ -39,9 +40,9 @@ public function testShouldNotBeTerminated(): void
{
$jobTerminator = $this->createJobTerminator();
- self::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 20));
- self::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 19));
- self::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 1));
+ Assert::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 20));
+ Assert::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 19));
+ Assert::assertFalse($jobTerminator->shouldBeTerminated('some-uuid', 1));
}
@@ -54,7 +55,7 @@ public function testShouldBeTerminated(): void
->once()
->andReturnTrue();
- self::assertTrue($jobTerminator->shouldBeTerminated('some-uuid', 21));
+ Assert::assertTrue($jobTerminator->shouldBeTerminated('some-uuid', 21));
}
diff --git a/tests/Queue/RabbitMQ/RabbitMQConsumerTest.php b/tests/Queue/RabbitMQ/RabbitMQConsumerTest.php
index c2094bf..37bdf4f 100644
--- a/tests/Queue/RabbitMQ/RabbitMQConsumerTest.php
+++ b/tests/Queue/RabbitMQ/RabbitMQConsumerTest.php
@@ -27,17 +27,17 @@ final class RabbitMQConsumerTest extends TestCase
private const AMQP_TAG = 'someAmqpTag';
/**
- * @var LoggerInterface|MockInterface
+ * @var LoggerInterface&MockInterface
*/
private $loggerMock;
/**
- * @var JobExecutorInterface|MockInterface
+ * @var JobExecutorInterface&MockInterface
*/
private $jobExecutorMock;
/**
- * @var PushDelayedResolver|MockInterface
+ * @var PushDelayedResolver&MockInterface
*/
private $pushDelayedResolverMock;
diff --git a/tests/Queue/RabbitMQ/RabbitMQQueueManagerTest.php b/tests/Queue/RabbitMQ/RabbitMQQueueManagerTest.php
index 5fac34e..29476da 100644
--- a/tests/Queue/RabbitMQ/RabbitMQQueueManagerTest.php
+++ b/tests/Queue/RabbitMQ/RabbitMQQueueManagerTest.php
@@ -224,19 +224,20 @@ public function testConsumeWithReconnect(): void
$amqpChannelMock->shouldReceive('basic_qos')
->with(0, 2, false)
- ->once();
+ ->twice();
$amqpChannelMock->shouldReceive('basic_consume')
->with(ExampleJobDefinition::QUEUE_NAME, '', false, true, false, false, $expectedCallback)
- ->once();
+ ->twice();
$callbackMock = static function (): void {
};
$amqpChannelMock->callbacks = [$callbackMock];
+ $brokenPipeException = new AMQPRuntimeException('Broken pipe');
$amqpChannelMock->shouldReceive('wait')
->once()
- ->andThrow(new AMQPRuntimeException('Broken pipe'));
+ ->andThrow($brokenPipeException);
$amqpChannelMock->shouldReceive('wait')
->once()
->andReturnUsing(
@@ -253,6 +254,10 @@ static function () use ($amqpChannelMock): void {
->withNoArgs()
->once();
+ $this->loggerMock->shouldReceive('warning')
+ ->with('AMQPChannel disconnected: Broken pipe', ['exception' => $brokenPipeException])
+ ->once();
+
$queueManager = $this->createQueueManager();
$queueManager->consumeMessages(
$expectedCallback,
diff --git a/tests/Queue/RabbitMQ/RabbitMQWorkerTest.php b/tests/Queue/RabbitMQ/RabbitMQWorkerTest.php
index 9a738a6..c71b599 100644
--- a/tests/Queue/RabbitMQ/RabbitMQWorkerTest.php
+++ b/tests/Queue/RabbitMQ/RabbitMQWorkerTest.php
@@ -16,12 +16,12 @@ final class RabbitMQWorkerTest extends TestCase
use MockeryPHPUnitIntegration;
/**
- * @var RabbitMQQueueManager|MockInterface
+ * @var RabbitMQQueueManager&MockInterface
*/
private $rabbitMQQueueManagerMock;
/**
- * @var RabbitMQConsumer|MockInterface
+ * @var RabbitMQConsumer&MockInterface
*/
private $rabbitMQConsumerMock;