Skip to content

Commit

Permalink
Merge pull request #72 from tyx/fix/unrecoverable-support
Browse files Browse the repository at this point in the history
Fix #34
  • Loading branch information
mnapoli authored Feb 7, 2023
2 parents d683c94 + 35ac81c commit 99f5415
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/Service/Sqs/SqsConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsXrayTraceHeaderStamp;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

Expand Down Expand Up @@ -89,6 +90,9 @@ public function handleSqs(SqsEvent $event, Context $context): void
$stamps[] = new AmazonSqsXrayTraceHeaderStamp($context->getTraceId());
}
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->transportName);
} catch (UnrecoverableExceptionInterface $exception) {
$this->logger->error(sprintf('SQS record with id "%s" failed to be processed. But failure was marked as unrecoverable. Message will be acknowledged.', $record->getMessageId()));
$this->logger->error($exception->getMessage());
} catch (\Throwable $exception) {
if ($this->partialBatchFailure === false) {
throw $exception;
Expand Down
40 changes: 38 additions & 2 deletions tests/Unit/Service/Sqs/SqsConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsXrayTraceHeaderStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Throwable;

class SqsConsumerTest extends TestCase
{
Expand Down Expand Up @@ -194,6 +196,40 @@ public function test_x_ray_header_is_dispatched_on_bus()
$this->assertEmpty($failures);
}

public function test_unrecoverable_exception_during_batch()
{
$transport = 'async';
$sqsRecords = [
$this->sqsRecordWillSuccessfullyBeHandled(
new TestMessage('test'),
'e00c848c-2579-4f6a-a006-ccdc2808ed64',
'Test message 1',
$transport,
true,
),
$this->sqsRecordWillFailDuringHandle(
new TestMessage('test2'),
'6c4b71a8-eb2e-4373-9d07-478982ff0905',
'Test message 2',
$transport,
true,
new UnrecoverableMessageHandlingException('no retry')
),
$this->sqsRecordWillSuccessfullyBeHandled(
new TestMessage('test3'),
'f8e71ae8-2ae3-4400-a7a0-1193c1a7210f',
'Test message 3',
$transport,
true,
),
];

$consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $transport, null, true);

$failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', ''));
$this->assertEmpty($failures);
}

private function sqsRecordWillSuccessfullyBeHandled(object $message, string $messageId, string $body, string $transport, bool $fifo = false): array
{
return $this->sqsRecordWillSuccessfullyBeHandledWithStamps(
Expand Down Expand Up @@ -246,7 +282,7 @@ private function sqsRecordWillSuccessfullyBeHandledWithStamps(object $message, s
return $this->aSqsRecord($messageId, $body, $specialHeaders, $fifo);
}

private function sqsRecordWillFailDuringHandle(object $message, string $messageId, string $body, string $transport, bool $fifo = false): array
private function sqsRecordWillFailDuringHandle(object $message, string $messageId, string $body, string $transport, bool $fifo = false, ?Throwable $failure = null): array
{
$specialHeaders = ['Special\Header\Name' => 'some data'];
$headers = array_merge($specialHeaders, [
Expand All @@ -265,7 +301,7 @@ private function sqsRecordWillFailDuringHandle(object $message, string $messageI
),
$transport
)
->willThrow(new \Exception('boom'))
->willThrow($failure ?? new \Exception('boom'))
;

return $this->aSqsRecord($messageId, $body, $specialHeaders, $fifo);
Expand Down

0 comments on commit 99f5415

Please sign in to comment.