From e45d07bb05a9103c012d59e02b4046f665a4e162 Mon Sep 17 00:00:00 2001 From: Tomas Majer Date: Sat, 6 Jul 2019 21:13:42 +0200 Subject: [PATCH] Added retry - use `RetryTrait` to handler when you would like to retry failed handlers - for proper retry funcionality you have to use driver that supports delayed executions with `executeAt` message param --- src/Dispatcher.php | 30 ++++++++++++++++++++++++++++-- src/Driver/RedisSetDriver.php | 1 - src/Handler/EchoHandler.php | 2 ++ src/Handler/RetryTrait.php | 17 +++++++++++++++++ src/Message.php | 26 +++++++++++++++++++++++--- src/MessageInterface.php | 7 +++++++ src/MessageSerializer.php | 7 ++++++- 7 files changed, 83 insertions(+), 7 deletions(-) create mode 100644 src/Handler/RetryTrait.php diff --git a/src/Dispatcher.php b/src/Dispatcher.php index 0aa574c..28a21de 100644 --- a/src/Dispatcher.php +++ b/src/Dispatcher.php @@ -108,7 +108,9 @@ public function handle(): void } catch (RestartException $e) { $this->log(LogLevel::NOTICE, 'Existing hermes dispatcher - restart'); } catch (Exception $exception) { - Debugger::log($exception, Debugger::EXCEPTION); + if (Debugger::isEnabled()) { + Debugger::log($exception, Debugger::EXCEPTION); + } } } @@ -169,12 +171,36 @@ private function handleMessage(HandlerInterface $handler, MessageInterface $mess "Handler " . get_class($handler) . " throws exception - {$e->getMessage()}", ['error' => $e, 'message' => $this->messageLoggerContext($message), 'exception' => $e] ); - Debugger::log($e, Debugger::EXCEPTION); + if (Debugger::isEnabled()) { + Debugger::log($e, Debugger::EXCEPTION); + } + + if (method_exists($handler, 'canRetry')) { + if ($message->getRetries() < $handler->maxRetry()) { + $executeAt = $this->nextRetry($message); + $newMessage = new Message($message->getType(), $message->getPayload(), $message->getId(), $message->getCreated(), $executeAt, $message->getRetries() + 1); + $this->driver->send($newMessage); + } + } + $result = false; } return $result; } + /** + * Calculate next retry + * + * Inspired by ruby sidekiq (https://github.com/mperham/sidekiq/wiki/Error-Handling#automatic-job-retry) + * + * @param MessageInterface $message + * @return float + */ + private function nextRetry(MessageInterface $message): float + { + return microtime(true) + pow($message->getRetries(), 4) + 15 + (rand(1, 30) * ($message->getRetries() + 1)); + } + /** * Check if actual dispatcher has handler for given type * diff --git a/src/Driver/RedisSetDriver.php b/src/Driver/RedisSetDriver.php index d7d753e..1868020 100644 --- a/src/Driver/RedisSetDriver.php +++ b/src/Driver/RedisSetDriver.php @@ -98,7 +98,6 @@ public function wait(Closure $callback): void } if ($this->redis instanceof \Redis) { $messagesString = $this->redis->zRangeByScore($this->scheduleKey, '-inf', microtime(true), ['limit' => [0, 1]]); - $messagesString = []; if (count($messagesString)) { foreach ($messagesString as $messageString) { $this->redis->zRem($this->scheduleKey, $messageString); diff --git a/src/Handler/EchoHandler.php b/src/Handler/EchoHandler.php index 2032d50..f9d0d8b 100644 --- a/src/Handler/EchoHandler.php +++ b/src/Handler/EchoHandler.php @@ -7,6 +7,8 @@ class EchoHandler implements HandlerInterface { + use RetryTrait; + /** * {@inheritdoc} */ diff --git a/src/Handler/RetryTrait.php b/src/Handler/RetryTrait.php new file mode 100644 index 0000000..cd1262d --- /dev/null +++ b/src/Handler/RetryTrait.php @@ -0,0 +1,17 @@ +messageId = $messageId; if (!$messageId) { - $this->messageId = Uuid::uuid4()->toString(); + try { + $this->messageId = Uuid::uuid4()->toString(); + } catch (\Exception $e) { + $this->messageId = rand(10000, 99999999); + } + } $this->created = $created; if (!$created) { @@ -54,6 +65,7 @@ public function __construct(string $type, array $payload = null, string $message $this->type = $type; $this->payload = $payload; $this->executeAt = $executeAt; + $this->retries = $retries; } /** @@ -95,4 +107,12 @@ public function getPayload(): ?array { return $this->payload; } + + /** + * {@inheritdoc} + */ + public function getRetries(): int + { + return $this->retries; + } } diff --git a/src/MessageInterface.php b/src/MessageInterface.php index 936f71b..0948cd3 100644 --- a/src/MessageInterface.php +++ b/src/MessageInterface.php @@ -50,4 +50,11 @@ public function getType(): string; * @return array */ public function getPayload(): ?array; + + /** + * Total retries for message + * + * @return int + */ + public function getRetries(): int; } diff --git a/src/MessageSerializer.php b/src/MessageSerializer.php index a667f46..41facde 100644 --- a/src/MessageSerializer.php +++ b/src/MessageSerializer.php @@ -17,6 +17,7 @@ public function serialize(MessageInterface $message): string 'created' => $message->getCreated(), 'payload' => $message->getPayload(), 'execute_at' => $message->getExecuteAt(), + 'retries' => $message->getRetries(), ] ]); } @@ -32,6 +33,10 @@ public function unserialize(string $string): MessageInterface if (isset($message['execute_at'])) { $executeAt = $message['execute_at']; } - return new Message($message['type'], $message['payload'], $message['id'], $message['created'], $executeAt); + $retries = 0; + if (isset($message['retries'])) { + $retries = intval($message['retries']); + } + return new Message($message['type'], $message['payload'], $message['id'], $message['created'], $executeAt, $retries); } }