diff --git a/psalm.xml b/psalm.xml
index 6906996..08186e8 100644
--- a/psalm.xml
+++ b/psalm.xml
@@ -36,5 +36,10 @@
+
+
+
+
+
diff --git a/src/Command/ProcessUploadOrderRequestsCommand.php b/src/Command/ProcessUploadOrderRequestsCommand.php
index f747239..5a732a5 100644
--- a/src/Command/ProcessUploadOrderRequestsCommand.php
+++ b/src/Command/ProcessUploadOrderRequestsCommand.php
@@ -4,6 +4,7 @@
namespace Setono\SyliusPeakPlugin\Command;
+use Setono\SyliusPeakPlugin\Processor\FailedUploadOrderRequestProcessorInterface;
use Setono\SyliusPeakPlugin\Processor\UploadOrderRequestProcessorInterface;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
@@ -16,13 +17,17 @@
)]
final class ProcessUploadOrderRequestsCommand extends Command
{
- public function __construct(private readonly UploadOrderRequestProcessorInterface $uploadOrderRequestProcessor)
- {
+ public function __construct(
+ private readonly UploadOrderRequestProcessorInterface $uploadOrderRequestProcessor,
+ private readonly FailedUploadOrderRequestProcessorInterface $failedUploadOrderRequestProcessor,
+ ) {
parent::__construct();
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
+ $this->failedUploadOrderRequestProcessor->process();
+
$this->uploadOrderRequestProcessor->process();
return 0;
diff --git a/src/Event/FailedUploadOrderRequestsQueryBuilderCreatedEvent.php b/src/Event/FailedUploadOrderRequestsQueryBuilderCreatedEvent.php
new file mode 100644
index 0000000..f3696a7
--- /dev/null
+++ b/src/Event/FailedUploadOrderRequestsQueryBuilderCreatedEvent.php
@@ -0,0 +1,18 @@
+ 'incrementTries'];
+ }
+
+ public function incrementTries(CompletedEvent $event): void
+ {
+ /** @var UploadOrderRequestInterface|object $uploadOrderRequest */
+ $uploadOrderRequest = $event->getSubject();
+ Assert::isInstanceOf($uploadOrderRequest, UploadOrderRequestInterface::class);
+
+ $uploadOrderRequest->incrementTries();
+ }
+}
diff --git a/src/EventSubscriber/Workflow/UploadOrderRequest/RetrySubscriber.php b/src/EventSubscriber/Workflow/UploadOrderRequest/RetrySubscriber.php
new file mode 100644
index 0000000..6808fc5
--- /dev/null
+++ b/src/EventSubscriber/Workflow/UploadOrderRequest/RetrySubscriber.php
@@ -0,0 +1,36 @@
+ 'retry'];
+ }
+
+ public function retry(CompletedEvent $event): void
+ {
+ /** @var UploadOrderRequestInterface|object $uploadOrderRequest */
+ $uploadOrderRequest = $event->getSubject();
+ Assert::isInstanceOf($uploadOrderRequest, UploadOrderRequestInterface::class);
+
+ if ($uploadOrderRequest->getTries() >= $this->maxTries) {
+ return;
+ }
+
+ $event->getWorkflow()->apply($uploadOrderRequest, UploadOrderRequestWorkflow::TRANSITION_RESET);
+ }
+}
diff --git a/src/EventSubscriber/Workflow/UploadOrderRequest/StateUpdatedSubscriber.php b/src/EventSubscriber/Workflow/UploadOrderRequest/StateUpdatedSubscriber.php
new file mode 100644
index 0000000..e0382da
--- /dev/null
+++ b/src/EventSubscriber/Workflow/UploadOrderRequest/StateUpdatedSubscriber.php
@@ -0,0 +1,28 @@
+ 'updateTimestamp'];
+ }
+
+ public function updateTimestamp(CompletedEvent $event): void
+ {
+ /** @var UploadOrderRequestInterface|object $uploadOrderRequest */
+ $uploadOrderRequest = $event->getSubject();
+ Assert::isInstanceOf($uploadOrderRequest, UploadOrderRequestInterface::class);
+
+ $uploadOrderRequest->setStateUpdatedAt(new \DateTimeImmutable());
+ }
+}
diff --git a/src/Model/UploadOrderRequest.php b/src/Model/UploadOrderRequest.php
index ca1d47f..85587e4 100644
--- a/src/Model/UploadOrderRequest.php
+++ b/src/Model/UploadOrderRequest.php
@@ -12,6 +12,8 @@ class UploadOrderRequest implements UploadOrderRequestInterface
protected string $state = self::STATE_PENDING;
+ protected ?\DateTimeInterface $stateUpdatedAt = null;
+
protected ?OrderInterface $order = null;
protected ?string $request = null;
@@ -22,6 +24,8 @@ class UploadOrderRequest implements UploadOrderRequestInterface
protected ?int $peakOrderId = null;
+ protected int $tries = 0;
+
public function getId(): ?int
{
return $this->id;
@@ -47,6 +51,16 @@ public function setState(string $state): void
$this->state = $state;
}
+ public function getStateUpdatedAt(): ?\DateTimeInterface
+ {
+ return $this->stateUpdatedAt;
+ }
+
+ public function setStateUpdatedAt(\DateTimeInterface $stateUpdatedAt): void
+ {
+ $this->stateUpdatedAt = $stateUpdatedAt;
+ }
+
public function getOrder(): ?OrderInterface
{
return $this->order;
@@ -96,4 +110,19 @@ public function setPeakOrderId(?int $peakOrderId): void
{
$this->peakOrderId = $peakOrderId;
}
+
+ public function getTries(): int
+ {
+ return $this->tries;
+ }
+
+ public function setTries(int $tries): void
+ {
+ $this->tries = $tries;
+ }
+
+ public function incrementTries(): void
+ {
+ ++$this->tries;
+ }
}
diff --git a/src/Model/UploadOrderRequestInterface.php b/src/Model/UploadOrderRequestInterface.php
index 70a3adb..d949e6c 100644
--- a/src/Model/UploadOrderRequestInterface.php
+++ b/src/Model/UploadOrderRequestInterface.php
@@ -23,6 +23,10 @@ public function getState(): string;
public function setState(string $state): void;
+ public function getStateUpdatedAt(): ?\DateTimeInterface;
+
+ public function setStateUpdatedAt(\DateTimeInterface $stateUpdatedAt): void;
+
public function getOrder(): ?OrderInterface;
public function setOrder(?OrderInterface $order): void;
@@ -45,4 +49,10 @@ public function setError(?string $error): void;
public function getPeakOrderId(): ?int;
public function setPeakOrderId(?int $peakOrderId): void;
+
+ public function getTries(): int;
+
+ public function setTries(int $tries): void;
+
+ public function incrementTries(): void;
}
diff --git a/src/Processor/FailedUploadOrderRequestProcessor.php b/src/Processor/FailedUploadOrderRequestProcessor.php
new file mode 100644
index 0000000..2f0cec1
--- /dev/null
+++ b/src/Processor/FailedUploadOrderRequestProcessor.php
@@ -0,0 +1,43 @@
+managerRegistry = $managerRegistry;
+ }
+
+ public function process(): void
+ {
+ foreach ($this->failedUploadOrderRequestsProvider->getUploadOrderRequests() as $uploadOrderRequest) {
+ try {
+ $this->uploadOrderRequestWorkflow->apply($uploadOrderRequest, UploadOrderRequestWorkflow::TRANSITION_FAIL);
+ } catch (LogicException) {
+ continue;
+ }
+
+ try {
+ $this->getManager($uploadOrderRequest)->flush();
+ } catch (OptimisticLockException) {
+ continue;
+ }
+ }
+ }
+}
diff --git a/src/Processor/FailedUploadOrderRequestProcessorInterface.php b/src/Processor/FailedUploadOrderRequestProcessorInterface.php
new file mode 100644
index 0000000..e46b08d
--- /dev/null
+++ b/src/Processor/FailedUploadOrderRequestProcessorInterface.php
@@ -0,0 +1,10 @@
+preQualifiedUploadableOrdersProvider->getUploadOrderRequests() as $uploadOrderRequest) {
+ foreach ($this->preQualifiedUploadOrderRequestsProvider->getUploadOrderRequests() as $uploadOrderRequest) {
try {
$this->uploadOrderRequestWorkflow->apply($uploadOrderRequest, UploadOrderRequestWorkflow::TRANSITION_PROCESS);
} catch (LogicException) {
diff --git a/src/Provider/FailedUploadOrderRequestsProvider.php b/src/Provider/FailedUploadOrderRequestsProvider.php
new file mode 100644
index 0000000..5153a5f
--- /dev/null
+++ b/src/Provider/FailedUploadOrderRequestsProvider.php
@@ -0,0 +1,49 @@
+ $uploadOrderRequestClass
+ */
+ public function __construct(
+ ManagerRegistry $managerRegistry,
+ private readonly EventDispatcherInterface $eventDispatcher,
+ private readonly string $uploadOrderRequestClass,
+ private readonly string $processingTimeout = '1 hour',
+ ) {
+ $this->managerRegistry = $managerRegistry;
+ }
+
+ /**
+ * @return \Generator
+ */
+ public function getUploadOrderRequests(): \Generator
+ {
+ $qb = $this->getRepository($this->uploadOrderRequestClass)->createQueryBuilder('o')
+ ->andWhere('o.state = :orderState')
+ ->andWhere('o.stateUpdatedAt < :stateUpdatedAt')
+ ->setParameter('state', UploadOrderRequestInterface::STATE_PROCESSING)
+ ->setParameter('stateUpdatedAt', new \DateTimeImmutable('-' . $this->processingTimeout))
+ ;
+
+ $this->eventDispatcher->dispatch(new FailedUploadOrderRequestsQueryBuilderCreatedEvent($qb));
+
+ /** @var SelectBatchIteratorAggregate $iterator */
+ $iterator = SelectBatchIteratorAggregate::fromQuery($qb->getQuery(), 50);
+
+ yield from $iterator;
+ }
+}
diff --git a/src/Provider/FailedUploadOrderRequestsProviderInterface.php b/src/Provider/FailedUploadOrderRequestsProviderInterface.php
new file mode 100644
index 0000000..d22980d
--- /dev/null
+++ b/src/Provider/FailedUploadOrderRequestsProviderInterface.php
@@ -0,0 +1,15 @@
+
+ */
+ public function getUploadOrderRequests(): iterable;
+}
diff --git a/src/Resources/config/doctrine/model/UploadOrderRequest.orm.xml b/src/Resources/config/doctrine/model/UploadOrderRequest.orm.xml
index 9ec4fc1..55ce0ca 100644
--- a/src/Resources/config/doctrine/model/UploadOrderRequest.orm.xml
+++ b/src/Resources/config/doctrine/model/UploadOrderRequest.orm.xml
@@ -11,14 +11,20 @@
-
+
+
+
+
+
+
+
diff --git a/src/Resources/config/services/command.xml b/src/Resources/config/services/command.xml
index 8a4af7d..6ad8df3 100644
--- a/src/Resources/config/services/command.xml
+++ b/src/Resources/config/services/command.xml
@@ -12,6 +12,7 @@
+
diff --git a/src/Resources/config/services/event_subscriber.xml b/src/Resources/config/services/event_subscriber.xml
index ff70bcf..3d5a546 100644
--- a/src/Resources/config/services/event_subscriber.xml
+++ b/src/Resources/config/services/event_subscriber.xml
@@ -48,6 +48,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Resources/config/services/processor.xml b/src/Resources/config/services/processor.xml
index 5486295..02fc86d 100644
--- a/src/Resources/config/services/processor.xml
+++ b/src/Resources/config/services/processor.xml
@@ -2,6 +2,15 @@
+
+
+
+
+
+
+
+
diff --git a/src/Resources/config/services/provider.xml b/src/Resources/config/services/provider.xml
index e23625d..08db1c5 100644
--- a/src/Resources/config/services/provider.xml
+++ b/src/Resources/config/services/provider.xml
@@ -2,6 +2,15 @@
+
+
+
+
+
+ %setono_sylius_peak.model.upload_order_request.class%
+
+