From 761730b9f0786a94d157d250aac05952e1a7b586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Bok?= Date: Thu, 29 Aug 2024 12:33:11 +0200 Subject: [PATCH] DE-111852 Add support for custom SQS message attributes (#102) --- src/Jobs/JobInterface.php | 12 +++++ src/Jobs/SimpleJob.php | 49 ++++++++++++++++++- .../AWSSQS/SqsMessageAttributeDataType.php | 12 +++++ .../AWSSQS/SqsMessageAttributeFields.php | 12 +++++ src/Queue/AWSSQS/SqsQueueManager.php | 15 +++--- tests/Jobs/ExampleJob.php | 11 ++++- tests/Queue/AWSSQS/SqsQueueManagerTest.php | 41 ++++++++++++++-- 7 files changed, 137 insertions(+), 15 deletions(-) create mode 100644 src/Queue/AWSSQS/SqsMessageAttributeDataType.php create mode 100644 src/Queue/AWSSQS/SqsMessageAttributeFields.php diff --git a/src/Jobs/JobInterface.php b/src/Jobs/JobInterface.php index fcbff1f..85b6488 100644 --- a/src/Jobs/JobInterface.php +++ b/src/Jobs/JobInterface.php @@ -61,4 +61,16 @@ public function getExecutionPlannedAt(): ?DateTimeImmutable; public function setExecutionPlannedAt(DateTimeImmutable $executionPlannedAt): void; + + + /** + * @return array + */ + public function getMessageAttributes(): array; + + + /** + * @param array $messageAttributes + */ + public function setMessageAttributes(array $messageAttributes): void; } diff --git a/src/Jobs/SimpleJob.php b/src/Jobs/SimpleJob.php index 331df68..6b1dbac 100644 --- a/src/Jobs/SimpleJob.php +++ b/src/Jobs/SimpleJob.php @@ -4,6 +4,8 @@ use BE\QueueManagement\Jobs\Execution\MaximumAttemptsExceededException; use BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionInterface; +use BE\QueueManagement\Queue\AWSSQS\SqsMessageAttributeDataType; +use BE\QueueManagement\Queue\AWSSQS\SqsMessageAttributeFields; use BrandEmbassy\DateTime\DateTimeFormatter; use DateTimeImmutable; use Doctrine\Common\Collections\Collection; @@ -29,9 +31,15 @@ class SimpleJob implements JobInterface private ?DateTimeImmutable $executionPlannedAt; + /** + * @var array + */ + private array $messageAttributes; + /** * @param Collection $parameters + * @param array $messageAttributes */ public function __construct( string $uuid, @@ -39,7 +47,8 @@ public function __construct( int $attempts, JobDefinitionInterface $jobDefinition, Collection $parameters, - ?DateTimeImmutable $executionPlannedAt + ?DateTimeImmutable $executionPlannedAt, + array $messageAttributes = [], ) { $this->uuid = $uuid; $this->createdAt = $createdAt; @@ -47,6 +56,7 @@ public function __construct( $this->parameters = $parameters; $this->jobDefinition = $jobDefinition; $this->executionPlannedAt = $executionPlannedAt; + $this->messageAttributes = $messageAttributes; } @@ -176,4 +186,41 @@ public function setExecutionPlannedAt(DateTimeImmutable $executionPlannedAt): vo { $this->executionPlannedAt = $executionPlannedAt; } + + + /** + * @return array + */ + public function getMessageAttributes(): array + { + return $this->messageAttributes; + } + + + public function setMessageAttribute( + string $messageAttributeName, + string $messageAttributeValue, + SqsMessageAttributeDataType $messageAttributeDataType = SqsMessageAttributeDataType::STRING + ): void { + $valueKey = $messageAttributeDataType === SqsMessageAttributeDataType::BINARY + ? SqsMessageAttributeFields::BINARY_VALUE->value + : SqsMessageAttributeFields::STRING_VALUE->value; + + /** @var array{DataType: string, StringValue?: string, BinaryValue?: string} $messageAttribute */ + $messageAttribute = [ + SqsMessageAttributeFields::DATA_TYPE->value => $messageAttributeDataType->value, + $valueKey => $messageAttributeValue, + ]; + + $this->messageAttributes[$messageAttributeName] = $messageAttribute; + } + + + /** + * @param array $messageAttributes + */ + public function setMessageAttributes(array $messageAttributes): void + { + $this->messageAttributes = $messageAttributes; + } } diff --git a/src/Queue/AWSSQS/SqsMessageAttributeDataType.php b/src/Queue/AWSSQS/SqsMessageAttributeDataType.php new file mode 100644 index 0000000..b12730f --- /dev/null +++ b/src/Queue/AWSSQS/SqsMessageAttributeDataType.php @@ -0,0 +1,12 @@ + [ - 'DataType' => 'String', - // queueName might be handy here if we want to consume - // from multiple queues in parallel via promises. - // Then we need queue in message directly so that we can delete it. - 'StringValue' => $prefixedQueueName, - ], + $messageAttributes = $job->getMessageAttributes(); + $messageAttributes[SqsSendingMessageFields::QUEUE_URL] = [ + SqsMessageAttributeFields::DATA_TYPE->value => SqsMessageAttributeDataType::STRING->value, + // queueName might be handy here if we want to consume + // from multiple queues in parallel via promises. + // Then we need queue in message directly so that we can delete it. + SqsMessageAttributeFields::STRING_VALUE->value => $prefixedQueueName, ]; if (SqsMessage::isTooBig($messageBody, $messageAttributes)) { diff --git a/tests/Jobs/ExampleJob.php b/tests/Jobs/ExampleJob.php index a06ceca..c7275e5 100644 --- a/tests/Jobs/ExampleJob.php +++ b/tests/Jobs/ExampleJob.php @@ -28,8 +28,14 @@ class ExampleJob extends SimpleJob public const PARAMETER_FOO = 'foo'; - public function __construct(?JobDefinitionInterface $jobDefinition = null, string $bar = 'bar') - { + /** + * @param array $messageAttributes + */ + public function __construct( + ?JobDefinitionInterface $jobDefinition = null, + string $bar = 'bar', + array $messageAttributes = [], + ) { /** * Prevent phpstan error Template type T on class Doctrine\Common\Collections\Collection is not covariant * @var array $parameters @@ -43,6 +49,7 @@ public function __construct(?JobDefinitionInterface $jobDefinition = null, strin $jobDefinition ?? ExampleJobDefinition::create(), new ArrayCollection($parameters), null, + $messageAttributes, ); } diff --git a/tests/Queue/AWSSQS/SqsQueueManagerTest.php b/tests/Queue/AWSSQS/SqsQueueManagerTest.php index 3ae20b1..8cce031 100644 --- a/tests/Queue/AWSSQS/SqsQueueManagerTest.php +++ b/tests/Queue/AWSSQS/SqsQueueManagerTest.php @@ -41,6 +41,17 @@ class SqsQueueManagerTest extends TestCase private const QUEUE_URL = 'https://sqs.eu-central-1.amazonaws.com/583027123456/MyQueue1'; + private const CUSTOM_MESSAGE_ATTRIBUTE = 'customMessageAttribute'; + + private const CUSTOM_MESSAGE_ATTRIBUTE_VALUE = 'customMessageAttributeValue'; + + public const CUSTOM_MESSAGE_ATTRIBUTES = [ + self::CUSTOM_MESSAGE_ATTRIBUTE => [ + 'DataType' => 'String', + 'StringValue' => self::CUSTOM_MESSAGE_ATTRIBUTE_VALUE, + ], + ]; + private const RECEIPT_HANDLE = 'AQEBMJRLDYbo...BYSvLGdGU9t8Q=='; private const S3_BUCKET_NAME = 'thisIsS3Bucket'; @@ -116,12 +127,20 @@ public function testPushWithInvalidCharacters(string $queueName, string $queueNa ->withQueueName($queueName), 'This ￾is random text.', ); + $exampleJobWithInvalidCharacter->setMessageAttribute( + self::CUSTOM_MESSAGE_ATTRIBUTE, + self::CUSTOM_MESSAGE_ATTRIBUTE_VALUE, + ); $exampleJobWithValidCharacter = new ExampleJob( ExampleJobDefinition::create() ->withQueueName($queueName), 'This is random text.', ); + $exampleJobWithValidCharacter->setMessageAttribute( + self::CUSTOM_MESSAGE_ATTRIBUTE, + self::CUSTOM_MESSAGE_ATTRIBUTE_VALUE, + ); $this->sqsClientMock->expects('sendMessage') ->with( @@ -172,7 +191,7 @@ public function testPushWithTooBigMessage(string $queueName, string $queueNamePr $this->sqsClientMock->expects('sendMessage') ->with( Mockery::on( - fn(array $message): bool => $this->messageCheckOk($message, $messageBody, 0), + fn(array $message): bool => $this->messageCheckOk($message, $messageBody, 0, false), ), ) ->andReturn($this->createSqsSendMessageResultMock()); @@ -287,6 +306,10 @@ public function testPushDelayedWithJobDelayOverSqsMaxDelayLimitAndEventDispatche ->with(Mockery::on(fn($event) => $event instanceof MessageSentEvent && $event->delayInSeconds === 900 && $event->messageAttributes === [ + 'customMessageAttribute' => [ + 'DataType' => 'String', + 'StringValue' => 'customMessageAttributeValue', + ], 'QueueUrl' => [ 'DataType' => 'String', 'StringValue' => $queueNamePrefix . $queueName, @@ -647,12 +670,20 @@ private function getSampleSqsMessages(): array /** * @param array $message */ - private function messageCheckOk(array $message, string $messageBody, int $delay): bool - { + private function messageCheckOk( + array $message, + string $messageBody, + int $delay, + bool $checkCustomAttribute = true + ): bool { return $message['MessageBody'] === $messageBody && $message[SqsSendingMessageFields::DELAY_SECONDS] === $delay && $message[SqsSendingMessageFields::QUEUE_URL] === self::QUEUE_URL - && $message[SqsSendingMessageFields::MESSAGE_ATTRIBUTES][SqsSendingMessageFields::QUEUE_URL]['StringValue'] === self::QUEUE_URL; + && $message[SqsSendingMessageFields::MESSAGE_ATTRIBUTES][SqsSendingMessageFields::QUEUE_URL]['StringValue'] === self::QUEUE_URL + && ( + $checkCustomAttribute === false + || $message[SqsSendingMessageFields::MESSAGE_ATTRIBUTES][self::CUSTOM_MESSAGE_ATTRIBUTE]['StringValue'] === self::CUSTOM_MESSAGE_ATTRIBUTE_VALUE + ); } @@ -661,6 +692,8 @@ private function createExampleJob(string $queueName): ExampleJob return new ExampleJob( ExampleJobDefinition::create() ->withQueueName($queueName), + 'bar', + self::CUSTOM_MESSAGE_ATTRIBUTES, ); }