Skip to content

Commit

Permalink
DE-111852 Add event dispatcher for better observability (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorrogeray authored Aug 13, 2024
1 parent 19a9a3e commit 237c9bd
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 23 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
"aws/aws-sdk-php": "^3.209",
"predis/predis": "^1.1 || ^2.1.2",
"ramsey/uuid": "^4.2",
"tracy/tracy": "^2.9"
"tracy/tracy": "^2.9",
"symfony/event-dispatcher-contracts": "^3.5"
},
"require-dev": {
"brandembassy/coding-standard": "^11.1",
Expand Down
18 changes: 18 additions & 0 deletions src/Observability/ExecutionPlannedEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php declare(strict_types = 1);

namespace BE\QueueManagement\Observability;

use BE\QueueManagement\Jobs\JobInterface;
use DateTimeImmutable;

class ExecutionPlannedEvent
{
public function __construct(
public readonly JobInterface $job,
public readonly DateTimeImmutable $executionPlannedAt,
public readonly int $delayInSeconds,
public readonly string $prefixedQueueName,
public readonly ?string $scheduledEventId,
) {
}
}
16 changes: 16 additions & 0 deletions src/Observability/MessageSentEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php declare(strict_types = 1);

namespace BE\QueueManagement\Observability;

class MessageSentEvent
{
/**
* @param mixed[] $messageAttributes
*/
public function __construct(
public readonly int $delayInSeconds,
public readonly array $messageAttributes,
public readonly string $messageBody,
) {
}
}
96 changes: 74 additions & 22 deletions src/Queue/AWSSQS/SqsQueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@
use BE\QueueManagement\Jobs\JobType;
use BE\QueueManagement\Logging\LoggerContextField;
use BE\QueueManagement\Logging\LoggerHelper;
use BE\QueueManagement\Observability\ExecutionPlannedEvent;
use BE\QueueManagement\Observability\MessageSentEvent;
use BE\QueueManagement\Queue\QueueManagerInterface;
use BrandEmbassy\DateTime\DateTimeFormatter;
use BrandEmbassy\DateTime\DateTimeImmutableFactory;
use DateTimeImmutable;
use GuzzleHttp\Psr7\Stream;
use LogicException;
use Nette\Utils\Json;
use Nette\Utils\Validators;
use Psr\Log\LoggerInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Throwable;
use function assert;
use function count;
Expand Down Expand Up @@ -69,6 +73,8 @@ class SqsQueueManager implements QueueManagerInterface

private ?DelayedJobSchedulerInterface $delayedJobScheduler;

private ?EventDispatcherInterface $eventDispatcher;

private bool $isBeingTerminated = false;


Expand All @@ -80,6 +86,7 @@ public function __construct(
LoggerInterface $logger,
DateTimeImmutableFactory $dateTimeImmutableFactory,
?DelayedJobSchedulerInterface $delayedJobScheduler = null,
?EventDispatcherInterface $eventDispatcher = null,
int $consumeLoopIterationsCount = self::CONSUME_LOOP_ITERATIONS_NO_LIMIT,
string $queueNamePrefix = ''
) {
Expand All @@ -94,6 +101,7 @@ public function __construct(
$this->queueNamePrefix = $queueNamePrefix;
$this->dateTimeImmutableFactory = $dateTimeImmutableFactory;
$this->delayedJobScheduler = $delayedJobScheduler;
$this->eventDispatcher = $eventDispatcher;
}


Expand Down Expand Up @@ -246,29 +254,17 @@ public function pushDelayed(JobInterface $job, int $delayInSeconds, int $maxDela

$prefixedQueueName = $this->getPrefixedQueueName($job->getJobDefinition()->getQueueName());

$executionPlannedAt = $this->dateTimeImmutableFactory->getNow()->modify(
sprintf('+ %d seconds', $delayInSeconds),
);

$finalDelayInSeconds = $delayInSeconds;

if ($delayInSeconds > $maxDelayInSeconds) {
$executionPlannedAt = $this->dateTimeImmutableFactory->getNow()->modify(
sprintf('+ %d seconds', $delayInSeconds),
);
$job->setExecutionPlannedAt($executionPlannedAt);

if ($this->delayedJobScheduler !== null) {
$scheduledEventId = $this->delayedJobScheduler->scheduleJob($job, $prefixedQueueName);

$this->logger->info(
sprintf(
'Requested delay is greater than SQS limit. Job execution has been planned using %s.',
$this->delayedJobScheduler->getSchedulerName(),
),
[
'executionPlannedAt' => DateTimeFormatter::format($executionPlannedAt),
'scheduledEventId' => $scheduledEventId,
'delayInSeconds' => $delayInSeconds,
'maxDelayInSeconds' => $maxDelayInSeconds,
LoggerContextField::JOB_QUEUE_NAME => $prefixedQueueName,
LoggerContextField::JOB_UUID => $job->getUuid(),
],
);
$this->scheduleJob($job, $prefixedQueueName, $executionPlannedAt, $delayInSeconds, $maxDelayInSeconds);

return;
}
Expand All @@ -284,18 +280,29 @@ public function pushDelayed(JobInterface $job, int $delayInSeconds, int $maxDela
],
);

$delayInSeconds = self::MAX_DELAY_IN_SECONDS;
$finalDelayInSeconds = self::MAX_DELAY_IN_SECONDS;
}

$parameters = [self::DELAY_SECONDS => $delayInSeconds];
$parameters = [self::DELAY_SECONDS => $finalDelayInSeconds];

$sqsMessageId = $this->publishMessage($job, $prefixedQueueName, $parameters);

if ($this->eventDispatcher !== null) {
$this->eventDispatcher->dispatch(new ExecutionPlannedEvent(
$job,
$executionPlannedAt,
$delayInSeconds,
$prefixedQueueName,
null,
));
}

LoggerHelper::logJobPushedIntoQueue(
$job,
$prefixedQueueName,
$this->logger,
JobType::get(JobType::SQS),
$delayInSeconds,
$finalDelayInSeconds,
$sqsMessageId,
);
}
Expand Down Expand Up @@ -374,6 +381,14 @@ private function sendMessage(array $messageToSend): string
{
$result = $this->sqsClient->sendMessage($messageToSend);

if ($this->eventDispatcher !== null) {
$this->eventDispatcher->dispatch(new MessageSentEvent(
$messageToSend[SqsSendingMessageFields::DELAY_SECONDS],
$messageToSend[SqsSendingMessageFields::MESSAGE_ATTRIBUTES],
$messageToSend[SqsSendingMessageFields::MESSAGE_BODY],
));
}

return $result->get(SqsMessageFields::MESSAGE_ID);
}

Expand Down Expand Up @@ -404,6 +419,43 @@ public function checkConnection(): bool
}


private function scheduleJob(
JobInterface $job,
string $prefixedQueueName,
DateTimeImmutable $executionPlannedAt,
int $delayInSeconds,
int $maxDelayInSeconds
): void {
assert($this->delayedJobScheduler !== null, 'Delayed job scheduler must be set to schedule a job.');
$scheduledEventId = $this->delayedJobScheduler->scheduleJob($job, $prefixedQueueName);

$this->logger->info(
sprintf(
'Requested delay is greater than SQS limit. Job execution has been planned using %s.',
$this->delayedJobScheduler->getSchedulerName(),
),
[
'executionPlannedAt' => DateTimeFormatter::format($executionPlannedAt),
'scheduledEventId' => $scheduledEventId,
'delayInSeconds' => $delayInSeconds,
'maxDelayInSeconds' => $maxDelayInSeconds,
LoggerContextField::JOB_QUEUE_NAME => $prefixedQueueName,
LoggerContextField::JOB_UUID => $job->getUuid(),
],
);

if ($this->eventDispatcher !== null) {
$this->eventDispatcher->dispatch(new ExecutionPlannedEvent(
$job,
$executionPlannedAt,
$delayInSeconds,
$prefixedQueueName,
$scheduledEventId,
));
}
}


private function getPrefixedQueueName(string $queueName): string
{
$prefixedQueueName = $this->queueNamePrefix . $queueName;
Expand Down
122 changes: 122 additions & 0 deletions tests/Queue/AWSSQS/SqsQueueManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Aws\Result;
use Aws\S3\S3Client;
use Aws\Sqs\SqsClient;
use BE\QueueManagement\Observability\ExecutionPlannedEvent;
use BE\QueueManagement\Observability\MessageSentEvent;
use BE\QueueManagement\Queue\AWSSQS\DelayedJobSchedulerInterface;
use BE\QueueManagement\Queue\AWSSQS\S3ClientFactory;
use BE\QueueManagement\Queue\AWSSQS\SqsClientFactory;
Expand All @@ -23,6 +25,7 @@
use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\TestCase;
use Psr\Log\Test\TestLogger;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Tests\BE\QueueManagement\Jobs\ExampleJob;
use Tests\BE\QueueManagement\Jobs\JobDefinitions\ExampleJobDefinition;
use function sprintf;
Expand Down Expand Up @@ -236,6 +239,75 @@ public function testPushDelayedWithJobDelayOverSqsMaxDelayLimit(string $queueNam
}


#[DataProvider('queueNameDataProvider')]
public function testPushDelayedWithJobDelayOverSqsMaxDelayLimitAndEventDispatcher(string $queueName, string $queueNamePrefix): void
{
$exampleJob = $this->createExampleJob($queueName);

/** @var ExecutionPlannedEvent&MockInterface $executionPlannedEventMock */
$executionPlannedEventMock = Mockery::mock(ExecutionPlannedEvent::class);

/** @var EventDispatcherInterface&MockInterface $eventDispatcherMock */
$eventDispatcherMock = Mockery::mock(EventDispatcherInterface::class);
$eventDispatcherMock
->shouldReceive('dispatch')
->once()
->with(Mockery::on(fn($event) => $event instanceof ExecutionPlannedEvent
&& $event->job === $exampleJob
&& $event->executionPlannedAt->getTimestamp() === (new DateTimeImmutable(self::FROZEN_DATE_TIME))->modify('+ 1800 seconds')->getTimestamp()
&& $event->delayInSeconds === 1800
&& $event->prefixedQueueName === $queueNamePrefix . $queueName
&& $event->scheduledEventId === null))
->andReturn($executionPlannedEventMock);

$eventDispatcherMock
->shouldReceive('dispatch')
->once()
->with(Mockery::on(fn($event) => $event instanceof MessageSentEvent
&& $event->delayInSeconds === 900
&& $event->messageAttributes === [
'QueueUrl' => [
'DataType' => 'String',
'StringValue' => $queueNamePrefix . $queueName,
],
]
&& $event->messageBody === '{"jobUuid":"some-job-uuid","jobName":"exampleJob","attempts":1,"createdAt":"2018-08-01T10:15:47+01:00","jobParameters":{"foo":"bar"},"executionPlannedAt":"2016-08-15T15:30:00+00:00"}'))
->andReturn($executionPlannedEventMock);

$queueManager = $this->createQueueManagerWithExpectations($queueNamePrefix, 1, null, $eventDispatcherMock);

$expectedMessageBody = [
'jobUuid' => 'some-job-uuid',
'jobName' => 'exampleJob',
'attempts' => 1,
'createdAt' => '2018-08-01T10:15:47+01:00',
'jobParameters' => [
'foo' => 'bar',
],
'executionPlannedAt' => '2016-08-15T15:30:00+00:00',
];

$this->sqsClientMock->expects('sendMessage')
->with(
Mockery::on(
fn(array $message): bool => $this->messageCheckOk(
$message,
Json::encode($expectedMessageBody),
900,
),
),
)
->andReturn($this->createSqsSendMessageResultMock());

$this->loggerMock->hasInfo(
'Requested delay is greater than SQS limit. Job execution has been planned and will be requeued until then.',
);
$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uuid] pushed into exampleJobQueue queue');

$queueManager->pushDelayed($exampleJob, 1800);
}


#[DataProvider('queueNameDataProvider')]
public function testPushDelayedWithJobDelayOverSqsMaxDelayLimitUsingDelayedJobScheduler(string $queueName, string $queueNamePrefix): void
{
Expand Down Expand Up @@ -298,6 +370,54 @@ public function testPushDelayedWithJobDelayOverCustomSqsMaxDelayLimitUsingDelaye
}


#[DataProvider('queueNameDataProvider')]
public function testPushDelayedWithJobDelayOverCustomSqsMaxDelayLimitUsingDelayedJobSchedulerAndEventDispatcher(string $queueName, string $queueNamePrefix): void
{
$scheduledEventUuid = '86dac5fb-cd24-4f77-b3dd-409ebf5e4b9f';
$exampleJob = $this->createExampleJob($queueName);

/** @var DelayedJobSchedulerInterface&MockInterface $delayedJobSchedulerMock */
$delayedJobSchedulerMock = Mockery::mock(DelayedJobSchedulerInterface::class);

$fullQueueName = $queueNamePrefix . $queueName;
$delayedJobSchedulerMock
->expects('scheduleJob')
->with($exampleJob, $fullQueueName)
->andReturn($scheduledEventUuid);
$delayedJobSchedulerMock
->expects('getSchedulerName')
->andReturn('SQS Scheduler');

/** @var ExecutionPlannedEvent&MockInterface $executionPlannedEventMock */
$executionPlannedEventMock = Mockery::mock(ExecutionPlannedEvent::class);

/** @var EventDispatcherInterface&MockInterface $eventDispatcherMock */
$eventDispatcherMock = Mockery::mock(EventDispatcherInterface::class);
$eventDispatcherMock
->expects('dispatch')
->with(Mockery::on(fn($event) => $event instanceof ExecutionPlannedEvent
&& $event->job === $exampleJob
&& $event->executionPlannedAt->getTimestamp() === (new DateTimeImmutable(self::FROZEN_DATE_TIME))->modify('+ 65 seconds')->getTimestamp()
&& $event->delayInSeconds === 65
&& $event->prefixedQueueName === $fullQueueName
&& $event->scheduledEventId === $scheduledEventUuid))
->andReturn($executionPlannedEventMock);

$queueManager = $this->createQueueManagerWithExpectations(
$queueNamePrefix,
1,
$delayedJobSchedulerMock,
$eventDispatcherMock,
);

$this->loggerMock->hasInfo(
'Requested delay is greater than SQS limit. Job execution has been planned using SQS Scheduler.',
);

$queueManager->pushDelayed($exampleJob, 65, 60);
}


#[DataProvider('queueNameDataProvider')]
public function testPushDelayedWithMilliSeconds(string $queueName, string $queueNamePrefix): void
{
Expand Down Expand Up @@ -510,6 +630,7 @@ private function createQueueManagerWithExpectations(
string $queueNamePrefix = '',
int $connectionIsCreatedTimes = 1,
?DelayedJobSchedulerInterface $delayedJobScheduler = null,
?EventDispatcherInterface $eventDispatcher = null,
): SqsQueueManager {
return new SqsQueueManager(
self::S3_BUCKET_NAME,
Expand All @@ -521,6 +642,7 @@ private function createQueueManagerWithExpectations(
new DateTimeImmutable(self::FROZEN_DATE_TIME),
),
$delayedJobScheduler,
$eventDispatcher,
1,
$queueNamePrefix,
);
Expand Down

0 comments on commit 237c9bd

Please sign in to comment.