Skip to content

Commit

Permalink
DE-111852 Improve observability events (provide before & after) (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorrogeray authored Aug 14, 2024
1 parent 237c9bd commit faeab36
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 91 deletions.
23 changes: 23 additions & 0 deletions src/Observability/AfterExecutionPlannedEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php declare(strict_types = 1);

namespace BE\QueueManagement\Observability;

use BE\QueueManagement\Jobs\JobInterface;
use Ramsey\Uuid\UuidInterface;

class AfterExecutionPlannedEvent
{
/**
* @param UuidInterface $executionPlannedId Same id is used for BeforeExecutionPlannedEvent and AfterExecutionPlannedEvent
*/
public function __construct(
public readonly UuidInterface $executionPlannedId,
public readonly JobInterface $job,
public readonly string $prefixedQueueName,
public readonly int $delayInSeconds,
public readonly PlannedExecutionStrategyEnum $plannedExecutionStrategy,
public readonly ?string $scheduledEventId,
public readonly ?string $messageId,
) {
}
}
21 changes: 21 additions & 0 deletions src/Observability/BeforeExecutionPlannedEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php declare(strict_types = 1);

namespace BE\QueueManagement\Observability;

use BE\QueueManagement\Jobs\JobInterface;
use Ramsey\Uuid\UuidInterface;

class BeforeExecutionPlannedEvent
{
/**
* @param UuidInterface $executionPlannedId Same id is used for BeforeExecutionPlannedEvent and AfterExecutionPlannedEvent
*/
public function __construct(
public readonly UuidInterface $executionPlannedId,
public readonly JobInterface $job,
public readonly string $prefixedQueueName,
public readonly int $delayInSeconds,
public readonly PlannedExecutionStrategyEnum $plannedExecutionStrategy,
) {
}
}
18 changes: 0 additions & 18 deletions src/Observability/ExecutionPlannedEvent.php

This file was deleted.

1 change: 1 addition & 0 deletions src/Observability/MessageSentEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class MessageSentEvent
*/
public function __construct(
public readonly int $delayInSeconds,
public readonly string $messageId,
public readonly array $messageAttributes,
public readonly string $messageBody,
) {
Expand Down
10 changes: 10 additions & 0 deletions src/Observability/PlannedExecutionStrategyEnum.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php declare(strict_types = 1);

namespace BE\QueueManagement\Observability;

enum PlannedExecutionStrategyEnum: string
{
case SQS_DELIVERY_DELAY = 'SQS_DELIVERY_DELAY';

case DELAYED_JOB_SCHEDULER = 'DELAYED_JOB_SCHEDULER';
}
131 changes: 87 additions & 44 deletions src/Queue/AWSSQS/SqsQueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
use BE\QueueManagement\Jobs\JobType;
use BE\QueueManagement\Logging\LoggerContextField;
use BE\QueueManagement\Logging\LoggerHelper;
use BE\QueueManagement\Observability\ExecutionPlannedEvent;
use BE\QueueManagement\Observability\AfterExecutionPlannedEvent;
use BE\QueueManagement\Observability\BeforeExecutionPlannedEvent;
use BE\QueueManagement\Observability\MessageSentEvent;
use BE\QueueManagement\Observability\PlannedExecutionStrategyEnum;
use BE\QueueManagement\Queue\QueueManagerInterface;
use BrandEmbassy\DateTime\DateTimeFormatter;
use BrandEmbassy\DateTime\DateTimeImmutableFactory;
Expand All @@ -20,6 +22,7 @@
use Nette\Utils\Json;
use Nette\Utils\Validators;
use Psr\Log\LoggerInterface;
use Ramsey\Uuid\Uuid;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Throwable;
use function assert;
Expand Down Expand Up @@ -257,12 +260,11 @@ public function pushDelayed(JobInterface $job, int $delayInSeconds, int $maxDela
$executionPlannedAt = $this->dateTimeImmutableFactory->getNow()->modify(
sprintf('+ %d seconds', $delayInSeconds),
);
$job->setExecutionPlannedAt($executionPlannedAt);

$finalDelayInSeconds = $delayInSeconds;

if ($delayInSeconds > $maxDelayInSeconds) {
$job->setExecutionPlannedAt($executionPlannedAt);

if ($this->delayedJobScheduler !== null) {
$this->scheduleJob($job, $prefixedQueueName, $executionPlannedAt, $delayInSeconds, $maxDelayInSeconds);

Expand All @@ -283,17 +285,41 @@ public function pushDelayed(JobInterface $job, int $delayInSeconds, int $maxDela
$finalDelayInSeconds = self::MAX_DELAY_IN_SECONDS;
}

$this->planExecutionUsingSqsDeliveryDelay($job, $prefixedQueueName, $delayInSeconds, $finalDelayInSeconds);
}


private function planExecutionUsingSqsDeliveryDelay(
JobInterface $job,
string $prefixedQueueName,
int $delayInSeconds,
int $finalDelayInSeconds
): void {
$parameters = [self::DELAY_SECONDS => $finalDelayInSeconds];

$beforeExecutionPlannedEvent = null;
if ($this->eventDispatcher !== null) {
$beforeExecutionPlannedEvent = new BeforeExecutionPlannedEvent(
Uuid::uuid4(),
$job,
$prefixedQueueName,
$delayInSeconds,
PlannedExecutionStrategyEnum::SQS_DELIVERY_DELAY,
);
$this->eventDispatcher->dispatch($beforeExecutionPlannedEvent);
}

$sqsMessageId = $this->publishMessage($job, $prefixedQueueName, $parameters);

if ($this->eventDispatcher !== null) {
$this->eventDispatcher->dispatch(new ExecutionPlannedEvent(
$this->eventDispatcher->dispatch(new AfterExecutionPlannedEvent(
$beforeExecutionPlannedEvent->executionPlannedId,
$job,
$executionPlannedAt,
$delayInSeconds,
$prefixedQueueName,
$delayInSeconds,
PlannedExecutionStrategyEnum::SQS_DELIVERY_DELAY,
null,
$sqsMessageId,
));
}

Expand All @@ -308,6 +334,58 @@ public function pushDelayed(JobInterface $job, int $delayInSeconds, int $maxDela
}


private function scheduleJob(
JobInterface $job,
string $prefixedQueueName,
DateTimeImmutable $executionPlannedAt,
int $delayInSeconds,
int $maxDelayInSeconds
): void {
assert($this->delayedJobScheduler !== null, 'Delayed job scheduler must be set to schedule a job.');

$beforeExecutionPlannedEvent = null;
if ($this->eventDispatcher !== null) {
$beforeExecutionPlannedEvent = new BeforeExecutionPlannedEvent(
Uuid::uuid4(),
$job,
$prefixedQueueName,
$delayInSeconds,
PlannedExecutionStrategyEnum::DELAYED_JOB_SCHEDULER,
);
$this->eventDispatcher->dispatch($beforeExecutionPlannedEvent);
}

$scheduledEventId = $this->delayedJobScheduler->scheduleJob($job, $prefixedQueueName);

$this->logger->info(
sprintf(
'Requested delay is greater than SQS limit. Job execution has been planned using %s.',
$this->delayedJobScheduler->getSchedulerName(),
),
[
'executionPlannedAt' => DateTimeFormatter::format($executionPlannedAt),
'scheduledEventId' => $scheduledEventId,
'delayInSeconds' => $delayInSeconds,
'maxDelayInSeconds' => $maxDelayInSeconds,
LoggerContextField::JOB_QUEUE_NAME => $prefixedQueueName,
LoggerContextField::JOB_UUID => $job->getUuid(),
],
);

if ($this->eventDispatcher !== null) {
$this->eventDispatcher->dispatch(new AfterExecutionPlannedEvent(
$beforeExecutionPlannedEvent->executionPlannedId,
$job,
$prefixedQueueName,
$delayInSeconds,
PlannedExecutionStrategyEnum::DELAYED_JOB_SCHEDULER,
$scheduledEventId,
null,
));
}
}


/**
* @param array<mixed> $properties
*
Expand Down Expand Up @@ -380,16 +458,18 @@ private function publishMessage(
private function sendMessage(array $messageToSend): string
{
$result = $this->sqsClient->sendMessage($messageToSend);
$messageId = $result->get(SqsMessageFields::MESSAGE_ID);

if ($this->eventDispatcher !== null) {
$this->eventDispatcher->dispatch(new MessageSentEvent(
$messageToSend[SqsSendingMessageFields::DELAY_SECONDS],
$messageId,
$messageToSend[SqsSendingMessageFields::MESSAGE_ATTRIBUTES],
$messageToSend[SqsSendingMessageFields::MESSAGE_BODY],
));
}

return $result->get(SqsMessageFields::MESSAGE_ID);
return $messageId;
}


Expand Down Expand Up @@ -419,43 +499,6 @@ public function checkConnection(): bool
}


private function scheduleJob(
JobInterface $job,
string $prefixedQueueName,
DateTimeImmutable $executionPlannedAt,
int $delayInSeconds,
int $maxDelayInSeconds
): void {
assert($this->delayedJobScheduler !== null, 'Delayed job scheduler must be set to schedule a job.');
$scheduledEventId = $this->delayedJobScheduler->scheduleJob($job, $prefixedQueueName);

$this->logger->info(
sprintf(
'Requested delay is greater than SQS limit. Job execution has been planned using %s.',
$this->delayedJobScheduler->getSchedulerName(),
),
[
'executionPlannedAt' => DateTimeFormatter::format($executionPlannedAt),
'scheduledEventId' => $scheduledEventId,
'delayInSeconds' => $delayInSeconds,
'maxDelayInSeconds' => $maxDelayInSeconds,
LoggerContextField::JOB_QUEUE_NAME => $prefixedQueueName,
LoggerContextField::JOB_UUID => $job->getUuid(),
],
);

if ($this->eventDispatcher !== null) {
$this->eventDispatcher->dispatch(new ExecutionPlannedEvent(
$job,
$executionPlannedAt,
$delayInSeconds,
$prefixedQueueName,
$scheduledEventId,
));
}
}


private function getPrefixedQueueName(string $queueName): string
{
$prefixedQueueName = $this->queueNamePrefix . $queueName;
Expand Down
Loading

0 comments on commit faeab36

Please sign in to comment.