Skip to content

Commit

Permalink
DE-111852 Add support for custom SQS message attributes (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorrogeray authored Aug 29, 2024
1 parent faeab36 commit 761730b
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 15 deletions.
12 changes: 12 additions & 0 deletions src/Jobs/JobInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,16 @@ public function getExecutionPlannedAt(): ?DateTimeImmutable;


public function setExecutionPlannedAt(DateTimeImmutable $executionPlannedAt): void;


/**
* @return array<string,array{DataType: string, StringValue?: string, BinaryValue?: string}>
*/
public function getMessageAttributes(): array;


/**
* @param array<string,array{DataType: string, StringValue?: string, BinaryValue?: string}> $messageAttributes
*/
public function setMessageAttributes(array $messageAttributes): void;
}
49 changes: 48 additions & 1 deletion src/Jobs/SimpleJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

use BE\QueueManagement\Jobs\Execution\MaximumAttemptsExceededException;
use BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionInterface;
use BE\QueueManagement\Queue\AWSSQS\SqsMessageAttributeDataType;
use BE\QueueManagement\Queue\AWSSQS\SqsMessageAttributeFields;
use BrandEmbassy\DateTime\DateTimeFormatter;
use DateTimeImmutable;
use Doctrine\Common\Collections\Collection;
Expand All @@ -29,24 +31,32 @@ class SimpleJob implements JobInterface

private ?DateTimeImmutable $executionPlannedAt;

/**
* @var array<string,array{DataType: string, StringValue?: string, BinaryValue?: string}>
*/
private array $messageAttributes;


/**
* @param Collection<string, mixed> $parameters
* @param array<string, mixed> $messageAttributes
*/
public function __construct(
string $uuid,
DateTimeImmutable $createdAt,
int $attempts,
JobDefinitionInterface $jobDefinition,
Collection $parameters,
?DateTimeImmutable $executionPlannedAt
?DateTimeImmutable $executionPlannedAt,
array $messageAttributes = [],
) {
$this->uuid = $uuid;
$this->createdAt = $createdAt;
$this->attempts = $attempts;
$this->parameters = $parameters;
$this->jobDefinition = $jobDefinition;
$this->executionPlannedAt = $executionPlannedAt;
$this->messageAttributes = $messageAttributes;
}


Expand Down Expand Up @@ -176,4 +186,41 @@ public function setExecutionPlannedAt(DateTimeImmutable $executionPlannedAt): vo
{
$this->executionPlannedAt = $executionPlannedAt;
}


/**
* @return array<string,array{DataType: string, StringValue?: string, BinaryValue?: string}>
*/
public function getMessageAttributes(): array
{
return $this->messageAttributes;
}


public function setMessageAttribute(
string $messageAttributeName,
string $messageAttributeValue,
SqsMessageAttributeDataType $messageAttributeDataType = SqsMessageAttributeDataType::STRING
): void {
$valueKey = $messageAttributeDataType === SqsMessageAttributeDataType::BINARY
? SqsMessageAttributeFields::BINARY_VALUE->value
: SqsMessageAttributeFields::STRING_VALUE->value;

/** @var array{DataType: string, StringValue?: string, BinaryValue?: string} $messageAttribute */
$messageAttribute = [
SqsMessageAttributeFields::DATA_TYPE->value => $messageAttributeDataType->value,
$valueKey => $messageAttributeValue,
];

$this->messageAttributes[$messageAttributeName] = $messageAttribute;
}


/**
* @param array<string,array{DataType: string, StringValue?: string, BinaryValue?: string}> $messageAttributes
*/
public function setMessageAttributes(array $messageAttributes): void
{
$this->messageAttributes = $messageAttributes;
}
}
12 changes: 12 additions & 0 deletions src/Queue/AWSSQS/SqsMessageAttributeDataType.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php declare(strict_types = 1);

namespace BE\QueueManagement\Queue\AWSSQS;

enum SqsMessageAttributeDataType: string
{
case STRING = 'String';

case NUMBER = 'Number';

case BINARY = 'Binary';
}
12 changes: 12 additions & 0 deletions src/Queue/AWSSQS/SqsMessageAttributeFields.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php declare(strict_types = 1);

namespace BE\QueueManagement\Queue\AWSSQS;

enum SqsMessageAttributeFields: string
{
case DATA_TYPE = 'DataType';

case BINARY_VALUE = 'BinaryValue';

case STRING_VALUE = 'StringValue';
}
15 changes: 7 additions & 8 deletions src/Queue/AWSSQS/SqsQueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -413,14 +413,13 @@ private function publishMessage(
throw SqsClientException::createFromInvalidDelaySeconds($delaySeconds);
}

$messageAttributes = [
SqsSendingMessageFields::QUEUE_URL => [
'DataType' => 'String',
// queueName might be handy here if we want to consume
// from multiple queues in parallel via promises.
// Then we need queue in message directly so that we can delete it.
'StringValue' => $prefixedQueueName,
],
$messageAttributes = $job->getMessageAttributes();
$messageAttributes[SqsSendingMessageFields::QUEUE_URL] = [
SqsMessageAttributeFields::DATA_TYPE->value => SqsMessageAttributeDataType::STRING->value,
// queueName might be handy here if we want to consume
// from multiple queues in parallel via promises.
// Then we need queue in message directly so that we can delete it.
SqsMessageAttributeFields::STRING_VALUE->value => $prefixedQueueName,
];

if (SqsMessage::isTooBig($messageBody, $messageAttributes)) {
Expand Down
11 changes: 9 additions & 2 deletions tests/Jobs/ExampleJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ class ExampleJob extends SimpleJob
public const PARAMETER_FOO = 'foo';


public function __construct(?JobDefinitionInterface $jobDefinition = null, string $bar = 'bar')
{
/**
* @param array<string,array{DataType: string, StringValue?: string, BinaryValue?: string}> $messageAttributes
*/
public function __construct(
?JobDefinitionInterface $jobDefinition = null,
string $bar = 'bar',
array $messageAttributes = [],
) {
/**
* Prevent phpstan error Template type T on class Doctrine\Common\Collections\Collection is not covariant
* @var array<string,mixed> $parameters
Expand All @@ -43,6 +49,7 @@ public function __construct(?JobDefinitionInterface $jobDefinition = null, strin
$jobDefinition ?? ExampleJobDefinition::create(),
new ArrayCollection($parameters),
null,
$messageAttributes,
);
}

Expand Down
41 changes: 37 additions & 4 deletions tests/Queue/AWSSQS/SqsQueueManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ class SqsQueueManagerTest extends TestCase

private const QUEUE_URL = 'https://sqs.eu-central-1.amazonaws.com/583027123456/MyQueue1';

private const CUSTOM_MESSAGE_ATTRIBUTE = 'customMessageAttribute';

private const CUSTOM_MESSAGE_ATTRIBUTE_VALUE = 'customMessageAttributeValue';

public const CUSTOM_MESSAGE_ATTRIBUTES = [
self::CUSTOM_MESSAGE_ATTRIBUTE => [
'DataType' => 'String',
'StringValue' => self::CUSTOM_MESSAGE_ATTRIBUTE_VALUE,
],
];

private const RECEIPT_HANDLE = 'AQEBMJRLDYbo...BYSvLGdGU9t8Q==';

private const S3_BUCKET_NAME = 'thisIsS3Bucket';
Expand Down Expand Up @@ -116,12 +127,20 @@ public function testPushWithInvalidCharacters(string $queueName, string $queueNa
->withQueueName($queueName),
'This ￾is random text.',
);
$exampleJobWithInvalidCharacter->setMessageAttribute(
self::CUSTOM_MESSAGE_ATTRIBUTE,
self::CUSTOM_MESSAGE_ATTRIBUTE_VALUE,
);

$exampleJobWithValidCharacter = new ExampleJob(
ExampleJobDefinition::create()
->withQueueName($queueName),
'This is random text.',
);
$exampleJobWithValidCharacter->setMessageAttribute(
self::CUSTOM_MESSAGE_ATTRIBUTE,
self::CUSTOM_MESSAGE_ATTRIBUTE_VALUE,
);

$this->sqsClientMock->expects('sendMessage')
->with(
Expand Down Expand Up @@ -172,7 +191,7 @@ public function testPushWithTooBigMessage(string $queueName, string $queueNamePr
$this->sqsClientMock->expects('sendMessage')
->with(
Mockery::on(
fn(array $message): bool => $this->messageCheckOk($message, $messageBody, 0),
fn(array $message): bool => $this->messageCheckOk($message, $messageBody, 0, false),
),
)
->andReturn($this->createSqsSendMessageResultMock());
Expand Down Expand Up @@ -287,6 +306,10 @@ public function testPushDelayedWithJobDelayOverSqsMaxDelayLimitAndEventDispatche
->with(Mockery::on(fn($event) => $event instanceof MessageSentEvent
&& $event->delayInSeconds === 900
&& $event->messageAttributes === [
'customMessageAttribute' => [
'DataType' => 'String',
'StringValue' => 'customMessageAttributeValue',
],
'QueueUrl' => [
'DataType' => 'String',
'StringValue' => $queueNamePrefix . $queueName,
Expand Down Expand Up @@ -647,12 +670,20 @@ private function getSampleSqsMessages(): array
/**
* @param array<string, mixed> $message
*/
private function messageCheckOk(array $message, string $messageBody, int $delay): bool
{
private function messageCheckOk(
array $message,
string $messageBody,
int $delay,
bool $checkCustomAttribute = true
): bool {
return $message['MessageBody'] === $messageBody
&& $message[SqsSendingMessageFields::DELAY_SECONDS] === $delay
&& $message[SqsSendingMessageFields::QUEUE_URL] === self::QUEUE_URL
&& $message[SqsSendingMessageFields::MESSAGE_ATTRIBUTES][SqsSendingMessageFields::QUEUE_URL]['StringValue'] === self::QUEUE_URL;
&& $message[SqsSendingMessageFields::MESSAGE_ATTRIBUTES][SqsSendingMessageFields::QUEUE_URL]['StringValue'] === self::QUEUE_URL
&& (
$checkCustomAttribute === false
|| $message[SqsSendingMessageFields::MESSAGE_ATTRIBUTES][self::CUSTOM_MESSAGE_ATTRIBUTE]['StringValue'] === self::CUSTOM_MESSAGE_ATTRIBUTE_VALUE
);
}


Expand All @@ -661,6 +692,8 @@ private function createExampleJob(string $queueName): ExampleJob
return new ExampleJob(
ExampleJobDefinition::create()
->withQueueName($queueName),
'bar',
self::CUSTOM_MESSAGE_ATTRIBUTES,
);
}

Expand Down

0 comments on commit 761730b

Please sign in to comment.