diff --git a/composer.json b/composer.json index 967323a..172d09b 100644 --- a/composer.json +++ b/composer.json @@ -26,6 +26,7 @@ "nunomaduro/larastan": "^2.2", "orchestra/testbench": "^7.13 || ^9.0", "php-parallel-lint/php-parallel-lint": "^1.3", + "phpunit/phpunit": "^11.1", "squizlabs/php_codesniffer": "^3.7" }, "minimum-stability": "dev", diff --git a/src/Queue/SqsJob.php b/src/Queue/SqsJob.php index 6a740bd..6c5b166 100644 --- a/src/Queue/SqsJob.php +++ b/src/Queue/SqsJob.php @@ -3,6 +3,7 @@ namespace Bref\LaravelBridge\Queue; use Illuminate\Queue\Jobs\SqsJob as LaravelSqsJob; +use Illuminate\Support\Str; class SqsJob extends LaravelSqsJob { @@ -21,11 +22,19 @@ public function release($delay = 0) 'ReceiptHandle' => $this->job['ReceiptHandle'], ]); - $this->sqs->sendMessage([ + $sqsMessage = [ 'QueueUrl' => $this->queue, 'MessageBody' => json_encode($payload), - 'DelaySeconds' => $this->secondsUntil($delay), - ]); + 'DelaySeconds' => $this->secondsUntil($delay) + ]; + + if (Str::endsWith($this->queue, '.fifo')) { + $sqsMessage['MessageGroupId'] = $this->job['Attributes']['MessageGroupId']; + $sqsMessage['MessageDeduplicationId'] = $this->parseDeduplicationId($payload['attempts']); + unset($sqsMessage["DelaySeconds"]); + } + + $this->sqs->sendMessage($sqsMessage); } /** @@ -35,4 +44,15 @@ public function attempts() { return ($this->payload()['attempts'] ?? 0) + 1; } + + /** + * Create new MessageDeduplicationId + * appending attempt at the end so the message will not be ignored + * + * https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html#API_SendMessage_RequestSyntax + */ + private function parseDeduplicationId($attempts) + { + return $this->job['Attributes']['MessageDeduplicationId'] . '-' . $attempts; + } } diff --git a/tests/Queue/SqsJobTest.php b/tests/Queue/SqsJobTest.php new file mode 100644 index 0000000..fe5c48b --- /dev/null +++ b/tests/Queue/SqsJobTest.php @@ -0,0 +1,136 @@ +account = '1234567891011'; + $this->queueName = 'emails'; + $this->baseUrl = 'https://sqs.someregion.amazonaws.com'; + $this->releaseDelay = 0; + + // This is how the modified getQueue builds the queueUrl + $this->queueUrl = $this->baseUrl.'/'.$this->account.'/'.$this->queueName; + + // Get a mock of the SqsClient + $this->mockedSqsClient = m::mock(SqsClient::class); + + // Use Mockery to mock the IoC Container + $this->mockedContainer = m::mock(Container::class); + + $this->mockedJob = 'foo'; + $this->mockedData = ['data']; + $this->mockedPayload = json_encode(['job' => $this->mockedJob, 'data' => $this->mockedData, 'attempts' => 1]); + $this->mockedMessageId = 'e3cd03ee-59a3-4ad8-b0aa-ee2e3808ac81'; + $this->mockedReceiptHandle = '0NNAq8PwvXuWv5gMtS9DJ8qEdyiUwbAjpp45w2m6M4SJ1Y+PxCh7R930NRB8ylSacEmoSnW18bgd4nK\/O6ctE+VFVul4eD23mA07vVoSnPI4F\/voI1eNCp6Iax0ktGmhlNVzBwaZHEr91BRtqTRM3QKd2ASF8u+IQaSwyl\/DGK+P1+dqUOodvOVtExJwdyDLy1glZVgm85Yw9Jf5yZEEErqRwzYz\/qSigdvW4sm2l7e4phRol\/+IjMtovOyH\/ukueYdlVbQ4OshQLENhUKe7RNN5i6bE\/e5x9bnPhfj2gbM'; + } + + protected function tearDown(): void + { + m::close(); + } + + public function testProperlyReleaseStandardSqs() + { + $job = $this->createJob(); + $job->getSqs() + ->shouldReceive('deleteMessage') + ->with(['QueueUrl' => $this->queueUrl, 'ReceiptHandle' => $this->mockedReceiptHandle]) + ->once(); + $job->getSqs() + ->shouldReceive('sendMessage') + ->with( + $this->logicalAnd( + $this->arrayHasKey("MessageBody"), + $this->arrayHasKey("QueueUrl"), + ), + ) + ->once(); + $job->release($this->releaseDelay); + $this->assertTrue($job->isReleased()); + } + + public function testProperlyReleaseFifoSqs() + { + $job = $this->createFifoJob(); + $job->getSqs() + ->shouldReceive('deleteMessage') + ->with(['QueueUrl' => $this->queueUrl.'.fifo', 'ReceiptHandle' => $this->mockedReceiptHandle]) + ->once(); + $job->getSqs() + ->shouldReceive('sendMessage') + ->with( + $this->logicalAnd( + $this->arrayHasKey("MessageBody"), + $this->arrayHasKey("QueueUrl"), + $this->arrayHasKey("MessageGroupId"), + $this->arrayHasKey("MessageDeduplicationId"), + ), + ) + ->once(); + $job->release($this->releaseDelay); + $this->assertTrue($job->isReleased()); + } + + protected function createJob() + { + $jobData = [ + 'Body' => $this->mockedPayload, + 'MD5OfBody' => md5($this->mockedPayload), + 'ReceiptHandle' => $this->mockedReceiptHandle, + 'MessageId' => $this->mockedMessageId, + 'Attributes' => ['ApproximateReceiveCount' => 1], + ]; + return new SqsJob( + $this->mockedContainer, + $this->mockedSqsClient, + $jobData, + 'connection-name', + $this->queueUrl + ); + } + + protected function createFifoJob() + { + $jobData = [ + 'Body' => $this->mockedPayload, + 'MD5OfBody' => md5($this->mockedPayload), + 'ReceiptHandle' => $this->mockedReceiptHandle, + 'MessageId' => $this->mockedMessageId, + 'Attributes' => [ + 'ApproximateReceiveCount' => 1, + 'MessageGroupId' => 'group1', + 'MessageDeduplicationId' => 'deduplication1' + ], + ]; + return new SqsJob( + $this->mockedContainer, + $this->mockedSqsClient, + $jobData, + 'connection-name', + $this->queueUrl.'.fifo' + ); + } +}