From 35ac81c4fdcb6afe22e632b7dfbe5ac3b9085a12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timoth=C3=A9e=20Barray?= Date: Mon, 6 Feb 2023 17:18:47 +0100 Subject: [PATCH] fix: Add support for unrecoverable exception --- src/Service/Sqs/SqsConsumer.php | 4 +++ tests/Unit/Service/Sqs/SqsConsumerTest.php | 40 ++++++++++++++++++++-- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/Service/Sqs/SqsConsumer.php b/src/Service/Sqs/SqsConsumer.php index 111f8e0..f02816a 100644 --- a/src/Service/Sqs/SqsConsumer.php +++ b/src/Service/Sqs/SqsConsumer.php @@ -10,6 +10,7 @@ use Psr\Log\NullLogger; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsXrayTraceHeaderStamp; +use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -89,6 +90,9 @@ public function handleSqs(SqsEvent $event, Context $context): void $stamps[] = new AmazonSqsXrayTraceHeaderStamp($context->getTraceId()); } $this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->transportName); + } catch (UnrecoverableExceptionInterface $exception) { + $this->logger->error(sprintf('SQS record with id "%s" failed to be processed. But failure was marked as unrecoverable. Message will be acknowledged.', $record->getMessageId())); + $this->logger->error($exception->getMessage()); } catch (\Throwable $exception) { if ($this->partialBatchFailure === false) { throw $exception; diff --git a/tests/Unit/Service/Sqs/SqsConsumerTest.php b/tests/Unit/Service/Sqs/SqsConsumerTest.php index 6005f59..0d224a7 100644 --- a/tests/Unit/Service/Sqs/SqsConsumerTest.php +++ b/tests/Unit/Service/Sqs/SqsConsumerTest.php @@ -14,9 +14,11 @@ use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsXrayTraceHeaderStamp; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Throwable; class SqsConsumerTest extends TestCase { @@ -194,6 +196,40 @@ public function test_x_ray_header_is_dispatched_on_bus() $this->assertEmpty($failures); } + public function test_unrecoverable_exception_during_batch() + { + $transport = 'async'; + $sqsRecords = [ + $this->sqsRecordWillSuccessfullyBeHandled( + new TestMessage('test'), + 'e00c848c-2579-4f6a-a006-ccdc2808ed64', + 'Test message 1', + $transport, + true, + ), + $this->sqsRecordWillFailDuringHandle( + new TestMessage('test2'), + '6c4b71a8-eb2e-4373-9d07-478982ff0905', + 'Test message 2', + $transport, + true, + new UnrecoverableMessageHandlingException('no retry') + ), + $this->sqsRecordWillSuccessfullyBeHandled( + new TestMessage('test3'), + 'f8e71ae8-2ae3-4400-a7a0-1193c1a7210f', + 'Test message 3', + $transport, + true, + ), + ]; + + $consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $transport, null, true); + + $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); + $this->assertEmpty($failures); + } + private function sqsRecordWillSuccessfullyBeHandled(object $message, string $messageId, string $body, string $transport, bool $fifo = false): array { return $this->sqsRecordWillSuccessfullyBeHandledWithStamps( @@ -246,7 +282,7 @@ private function sqsRecordWillSuccessfullyBeHandledWithStamps(object $message, s return $this->aSqsRecord($messageId, $body, $specialHeaders, $fifo); } - private function sqsRecordWillFailDuringHandle(object $message, string $messageId, string $body, string $transport, bool $fifo = false): array + private function sqsRecordWillFailDuringHandle(object $message, string $messageId, string $body, string $transport, bool $fifo = false, ?Throwable $failure = null): array { $specialHeaders = ['Special\Header\Name' => 'some data']; $headers = array_merge($specialHeaders, [ @@ -265,7 +301,7 @@ private function sqsRecordWillFailDuringHandle(object $message, string $messageI ), $transport ) - ->willThrow(new \Exception('boom')) + ->willThrow($failure ?? new \Exception('boom')) ; return $this->aSqsRecord($messageId, $body, $specialHeaders, $fifo);