Skip to content

Commit

Permalink
DE-106540 Add support for delayed message schedulers (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorrogeray authored Jun 4, 2024
1 parent 8943492 commit 1fd8aeb
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 11 deletions.
13 changes: 13 additions & 0 deletions src/Queue/AWSSQS/DelayedJobSchedulerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php declare(strict_types = 1);

namespace BE\QueueManagement\Queue\AWSSQS;

use BE\QueueManagement\Jobs\JobInterface;

interface DelayedJobSchedulerInterface
{
public function getSchedulerName(): string;


public function scheduleJob(JobInterface $job, string $prefixedQueueName): string;
}
27 changes: 26 additions & 1 deletion src/Queue/AWSSQS/SqsQueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class SqsQueueManager implements QueueManagerInterface

private DateTimeImmutableFactory $dateTimeImmutableFactory;

private ?DelayedJobSchedulerInterface $delayedJobScheduler;

private bool $isBeingTerminated = false;


Expand All @@ -77,6 +79,7 @@ public function __construct(
MessageKeyGeneratorInterface $messageKeyGenerator,
LoggerInterface $logger,
DateTimeImmutableFactory $dateTimeImmutableFactory,
?DelayedJobSchedulerInterface $delayedJobScheduler = null,
int $consumeLoopIterationsCount = self::CONSUME_LOOP_ITERATIONS_NO_LIMIT,
string $queueNamePrefix = ''
) {
Expand All @@ -90,6 +93,7 @@ public function __construct(
$this->consumeLoopIterationsCount = $consumeLoopIterationsCount;
$this->queueNamePrefix = $queueNamePrefix;
$this->dateTimeImmutableFactory = $dateTimeImmutableFactory;
$this->delayedJobScheduler = $delayedJobScheduler;
}


Expand Down Expand Up @@ -235,6 +239,27 @@ public function pushDelayed(JobInterface $job, int $delayInSeconds): void
$executionPlannedAt = $this->dateTimeImmutableFactory->getNow()->modify(
sprintf('+ %d seconds', $delayInSeconds),
);
$job->setExecutionPlannedAt($executionPlannedAt);

if ($this->delayedJobScheduler !== null) {
$scheduledEventId = $this->delayedJobScheduler->scheduleJob($job, $prefixedQueueName);

$this->logger->info(
sprintf(
'Requested delay is greater than SQS limit. Job execution has been planned using %s.',
$this->delayedJobScheduler->getSchedulerName(),
),
[
'executionPlannedAt' => DateTimeFormatter::format($executionPlannedAt),
'scheduledEventId' => $scheduledEventId,
LoggerContextField::JOB_QUEUE_NAME => $prefixedQueueName,
LoggerContextField::JOB_UUID => $job->getUuid(),
],
);

return;
}

$this->logger->info(
'Requested delay is greater than SQS limit. Job execution has been planned and will be requeued until then.',
[
Expand All @@ -243,7 +268,7 @@ public function pushDelayed(JobInterface $job, int $delayInSeconds): void
LoggerContextField::JOB_UUID => $job->getUuid(),
],
);
$job->setExecutionPlannedAt($executionPlannedAt);

$delayInSeconds = self::MAX_DELAY_SECONDS;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/Jobs/ExampleJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
class ExampleJob extends SimpleJob
{
public const UUID = 'some-job-uud';
public const UUID = 'some-job-uuid';

public const ATTEMPTS = JobInterface::INIT_ATTEMPTS;

Expand Down
2 changes: 1 addition & 1 deletion tests/Queue/AWSSQS/SqsConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public function testRejectBlacklistedJob(): void
->andThrow($blacklistedJobUuidException);

$this->loggerMock->hasWarning(
'Job removed from queue: Job some-job-uud blacklisted',
'Job removed from queue: Job some-job-uuid blacklisted',
);

$this->sqsClientMock->expects('deleteMessage')
Expand Down
51 changes: 43 additions & 8 deletions tests/Queue/AWSSQS/SqsQueueManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Aws\Result;
use Aws\S3\S3Client;
use Aws\Sqs\SqsClient;
use BE\QueueManagement\Jobs\JobInterface;
use BE\QueueManagement\Queue\AWSSQS\DelayedJobSchedulerInterface;
use BE\QueueManagement\Queue\AWSSQS\S3ClientFactory;
use BE\QueueManagement\Queue\AWSSQS\SqsClientFactory;
use BE\QueueManagement\Queue\AWSSQS\SqsMessage;
Expand All @@ -21,6 +23,7 @@
use PHPUnit\Framework\Assert;
use PHPUnit\Framework\TestCase;
use Psr\Log\Test\TestLogger;
use Ramsey\Uuid\Uuid;
use Tests\BE\QueueManagement\Jobs\ExampleJob;
use Tests\BE\QueueManagement\Jobs\JobDefinitions\ExampleJobDefinition;
use function sprintf;
Expand Down Expand Up @@ -80,7 +83,7 @@ public function testPush(string $queueName, string $queueNamePrefix): void
{
$queueManager = $this->createQueueManagerWithExpectations($queueNamePrefix);

$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uud] pushed into exampleJobQueue queue');
$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uuid] pushed into exampleJobQueue queue');

$exampleJob = $this->createExampleJob($queueName);

Expand All @@ -101,7 +104,7 @@ public function testPushWithInvalidCharacters(string $queueName, string $queueNa
{
$queueManager = $this->createQueueManagerWithExpectations($queueNamePrefix);

$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uud] pushed into exampleJobQueue queue');
$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uuid] pushed into exampleJobQueue queue');

$exampleJobWithInvalidCharacter = new ExampleJob(
ExampleJobDefinition::create()
Expand Down Expand Up @@ -136,7 +139,7 @@ public function testPushWithTooBigMessage(string $queueName, string $queueNamePr
{
$queueManager = $this->createQueueManagerWithExpectations($queueNamePrefix);

$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uud] pushed into exampleJobQueue queue');
$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uuid] pushed into exampleJobQueue queue');

$exampleJob = ExampleJob::createTooBigForSqs(
ExampleJobDefinition::create()
Expand Down Expand Up @@ -188,7 +191,7 @@ public function testPushDelayed(string $queueName, string $queueNamePrefix): voi
)
->andReturn($this->createSqsSendMessageResultMock());

$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uud] pushed into exampleJobQueue queue');
$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uuid] pushed into exampleJobQueue queue');

$queueManager->pushDelayed($exampleJob, 5);
}
Expand All @@ -202,7 +205,7 @@ public function testPushDelayedWithJobDelayOverSqsMaxDelayLimit(string $queueNam
$exampleJob = $this->createExampleJob($queueName);

$expectedMessageBody = [
'jobUuid' => 'some-job-uud',
'jobUuid' => 'some-job-uuid',
'jobName' => 'exampleJob',
'attempts' => 1,
'createdAt' => '2018-08-01T10:15:47+01:00',
Expand All @@ -225,7 +228,37 @@ public function testPushDelayedWithJobDelayOverSqsMaxDelayLimit(string $queueNam
$this->loggerMock->hasInfo(
'Requested delay is greater than SQS limit. Job execution has been planned and will be requeued until then.',
);
$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uud] pushed into exampleJobQueue queue');
$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uuid] pushed into exampleJobQueue queue');

$queueManager->pushDelayed($exampleJob, 1800);
}

#[\PHPUnit\Framework\Attributes\DataProvider('queueNameDataProvider')]
public function testPushDelayedWithJobDelayOverSqsMaxDelayLimitUsingDelayedJobScheduler(string $queueName, string $queueNamePrefix): void
{
$jobUuid = '86dac5fb-cd24-4f77-b3dd-409ebf5e4b9f';
$exampleJob = $this->createExampleJob($queueName);

/** @var DelayedJobSchedulerInterface&MockInterface $delayedJobSchedulerMock */
$delayedJobSchedulerMock = Mockery::mock(DelayedJobSchedulerInterface::class);
$fullQueueName = $queueNamePrefix . $queueName;
$delayedJobSchedulerMock
->expects('scheduleJob')
->with($exampleJob, $fullQueueName)
->andReturn($jobUuid);
$delayedJobSchedulerMock
->expects('getSchedulerName')
->andReturn('SQS Scheduler');

$queueManager = $this->createQueueManagerWithExpectations(
$queueNamePrefix,
1,
$delayedJobSchedulerMock
);

$this->loggerMock->hasInfo(
'Requested delay is greater than SQS limit. Job execution has been planned using SQS Scheduler.',
);

$queueManager->pushDelayed($exampleJob, 1800);
}
Expand Down Expand Up @@ -255,7 +288,7 @@ public function testPushWithReconnect(string $queueName, string $queueNamePrefix
{
$queueManager = $this->createQueueManagerWithExpectations($queueNamePrefix, 2);

$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uud] pushed into exampleJobQueue queue');
$this->loggerMock->hasInfo('Job (exampleJob) [some-job-uuid] pushed into exampleJobQueue queue');

$exampleJob = $this->createExampleJob($queueName);

Expand Down Expand Up @@ -441,7 +474,8 @@ private function createExampleJob(string $queueName): ExampleJob

private function createQueueManagerWithExpectations(
string $queueNamePrefix = '',
int $connectionIsCreatedTimes = 1
int $connectionIsCreatedTimes = 1,
?DelayedJobSchedulerInterface $delayedJobScheduler = null,
): SqsQueueManager
{
return new SqsQueueManager(
Expand All @@ -453,6 +487,7 @@ private function createQueueManagerWithExpectations(
new FrozenDateTimeImmutableFactory(
new DateTimeImmutable(self::FROZEN_DATE_TIME),
),
$delayedJobScheduler,
1,
$queueNamePrefix,
);
Expand Down

0 comments on commit 1fd8aeb

Please sign in to comment.