Skip to content

Commit

Permalink
Added retry
Browse files Browse the repository at this point in the history
- use `RetryTrait` to handler when you would like to retry failed handlers
- for proper retry funcionality you have to use driver that supports delayed executions with `executeAt` message param
  • Loading branch information
tomaj committed Jul 6, 2019
1 parent e04e5a2 commit e45d07b
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 7 deletions.
30 changes: 28 additions & 2 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ public function handle(): void
} catch (RestartException $e) {
$this->log(LogLevel::NOTICE, 'Existing hermes dispatcher - restart');
} catch (Exception $exception) {
Debugger::log($exception, Debugger::EXCEPTION);
if (Debugger::isEnabled()) {
Debugger::log($exception, Debugger::EXCEPTION);
}
}
}

Expand Down Expand Up @@ -169,12 +171,36 @@ private function handleMessage(HandlerInterface $handler, MessageInterface $mess
"Handler " . get_class($handler) . " throws exception - {$e->getMessage()}",
['error' => $e, 'message' => $this->messageLoggerContext($message), 'exception' => $e]
);
Debugger::log($e, Debugger::EXCEPTION);
if (Debugger::isEnabled()) {
Debugger::log($e, Debugger::EXCEPTION);
}

if (method_exists($handler, 'canRetry')) {
if ($message->getRetries() < $handler->maxRetry()) {
$executeAt = $this->nextRetry($message);
$newMessage = new Message($message->getType(), $message->getPayload(), $message->getId(), $message->getCreated(), $executeAt, $message->getRetries() + 1);
$this->driver->send($newMessage);
}
}

$result = false;
}
return $result;
}

/**
* Calculate next retry
*
* Inspired by ruby sidekiq (https://github.com/mperham/sidekiq/wiki/Error-Handling#automatic-job-retry)
*
* @param MessageInterface $message
* @return float
*/
private function nextRetry(MessageInterface $message): float
{
return microtime(true) + pow($message->getRetries(), 4) + 15 + (rand(1, 30) * ($message->getRetries() + 1));
}

/**
* Check if actual dispatcher has handler for given type
*
Expand Down
1 change: 0 additions & 1 deletion src/Driver/RedisSetDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public function wait(Closure $callback): void
}
if ($this->redis instanceof \Redis) {
$messagesString = $this->redis->zRangeByScore($this->scheduleKey, '-inf', microtime(true), ['limit' => [0, 1]]);
$messagesString = [];
if (count($messagesString)) {
foreach ($messagesString as $messageString) {
$this->redis->zRem($this->scheduleKey, $messageString);
Expand Down
2 changes: 2 additions & 0 deletions src/Handler/EchoHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

class EchoHandler implements HandlerInterface
{
use RetryTrait;

/**
* {@inheritdoc}
*/
Expand Down
17 changes: 17 additions & 0 deletions src/Handler/RetryTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php
declare(strict_types=1);

namespace Tomaj\Hermes\Handler;

trait RetryTrait
{
public function canRetry(): bool
{
return true;
}

public function maxRetry(): int
{
return 25;
}
}
26 changes: 23 additions & 3 deletions src/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Message implements MessageInterface
private $messageId;

/**
* @var string
* @var float
*/
private $created;

Expand All @@ -32,6 +32,11 @@ class Message implements MessageInterface
*/
private $executeAt;

/**
* @var int
*/
private $retries;

/**
* Native implementation of message.
*
Expand All @@ -40,12 +45,18 @@ class Message implements MessageInterface
* @var string $messageId
* @var float $created timestamp (microtime(true))
* @var float $executeAt timestamp (microtime(true))
* @var int $retries
*/
public function __construct(string $type, array $payload = null, string $messageId = null, float $created = null, float $executeAt = null)
public function __construct(string $type, array $payload = null, string $messageId = null, float $created = null, float $executeAt = null, int $retries = 0)
{
$this->messageId = $messageId;
if (!$messageId) {
$this->messageId = Uuid::uuid4()->toString();
try {
$this->messageId = Uuid::uuid4()->toString();
} catch (\Exception $e) {
$this->messageId = rand(10000, 99999999);
}

}
$this->created = $created;
if (!$created) {
Expand All @@ -54,6 +65,7 @@ public function __construct(string $type, array $payload = null, string $message
$this->type = $type;
$this->payload = $payload;
$this->executeAt = $executeAt;
$this->retries = $retries;
}

/**
Expand Down Expand Up @@ -95,4 +107,12 @@ public function getPayload(): ?array
{
return $this->payload;
}

/**
* {@inheritdoc}
*/
public function getRetries(): int
{
return $this->retries;
}
}
7 changes: 7 additions & 0 deletions src/MessageInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,11 @@ public function getType(): string;
* @return array
*/
public function getPayload(): ?array;

/**
* Total retries for message
*
* @return int
*/
public function getRetries(): int;
}
7 changes: 6 additions & 1 deletion src/MessageSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public function serialize(MessageInterface $message): string
'created' => $message->getCreated(),
'payload' => $message->getPayload(),
'execute_at' => $message->getExecuteAt(),
'retries' => $message->getRetries(),
]
]);
}
Expand All @@ -32,6 +33,10 @@ public function unserialize(string $string): MessageInterface
if (isset($message['execute_at'])) {
$executeAt = $message['execute_at'];
}
return new Message($message['type'], $message['payload'], $message['id'], $message['created'], $executeAt);
$retries = 0;
if (isset($message['retries'])) {
$retries = intval($message['retries']);
}
return new Message($message['type'], $message['payload'], $message['id'], $message['created'], $executeAt, $retries);
}
}

0 comments on commit e45d07b

Please sign in to comment.