diff --git a/src/Client.php b/src/Client.php index 3be52ed..d0529d1 100644 --- a/src/Client.php +++ b/src/Client.php @@ -20,8 +20,11 @@ final class Client { + private $url, $loop, $realm, $session, $options, $messages, $webSocket, $scheduler, $disposable, $challengeCallback; + private $currentRetryCount = 0; + public function __construct(string $url, string $realm, array $options = [], LoopInterface $loop = null, Subject $webSocket = null, Observable $messages = null, Observable $session = null) { $this->url = $url; @@ -41,7 +44,11 @@ public function __construct(string $url, string $realm, array $options = [], Loo $this->messages = $messages ?: $this->webSocket->retryWhen([$this, '_reconnect'])->shareReplay(0); - $open->map(function () { + $open + ->doOnNext(function(){ + $this->currentRetryCount = 0; + }) + ->map(function () { $this->options['roles'] = $this->roles(); return new HelloMessage($this->realm, (object)$this->options); })->subscribe($this->webSocket, $this->scheduler); @@ -211,15 +218,17 @@ public function close() public function _reconnect(Observable $attempts) { - $maxRetryDelay = 300000; + $maxRetryDelay = 150000; $initialRetryDelay = 1500; $retryDelayGrowth = 1.5; $maxRetries = 150; - $exponent = 0; return $attempts - ->flatMap(function (\Exception $ex) use ($maxRetryDelay, $retryDelayGrowth, &$exponent, $initialRetryDelay) { - $delay = min($maxRetryDelay, pow($retryDelayGrowth, ++$exponent) + $initialRetryDelay); + ->doOnNext(function(){ + + }) + ->flatMap(function (\Exception $ex) use ($maxRetryDelay, $retryDelayGrowth, $initialRetryDelay) { + $delay = min($maxRetryDelay, pow($retryDelayGrowth, ++$this->currentRetryCount) + $initialRetryDelay); $seconds = number_format((float)$delay / 1000, 3, '.', '');; echo "Error: ", $ex->getMessage(), PHP_EOL, "Reconnecting in ${seconds} seconds...", PHP_EOL; return Observable::timer((int)$delay, $this->scheduler); diff --git a/src/Subject/WebSocketSubject.php b/src/Subject/WebSocketSubject.php index f470eea..54360f0 100644 --- a/src/Subject/WebSocketSubject.php +++ b/src/Subject/WebSocketSubject.php @@ -2,6 +2,8 @@ namespace Rx\Thruway\Subject; +use Ratchet\RFC6455\Messaging\Frame; +use React\EventLoop\Timer\Timer; use Rx\Disposable\CallbackDisposable; use Rx\Disposable\CompositeDisposable; use Rx\React\RejectedPromiseException; @@ -60,9 +62,31 @@ private function connectSocket() function (WebSocket $ws) { $this->socket = $ws; - $ws->on("error", [$this->output, "onError"]); - $ws->on("close", function ($reason) { + $lastReceivedPong = 0; + $pingTimer = $this->loop->addPeriodicTimer(30, function (Timer $timer) use ($ws, &$lastReceivedPong) { + static $sequence = 0; + if ($lastReceivedPong != $sequence) { + $timer->cancel(); + $ws->close(); + } + + $sequence++; + $frame = new Frame($sequence, true, Frame::OP_PING); + $ws->send($frame); + + }); + + $ws->on('pong', function (Frame $frame) use (&$lastReceivedPong) { + $lastReceivedPong = $frame->getPayload(); + }); + + $ws->on("error", function (\Exception $ex) use ($pingTimer) { + $pingTimer->cancel(); + $this->output->onError($ex); + }); + $ws->on("close", function ($reason) use ($pingTimer) { + $pingTimer->cancel(); if ($this->closeObserver) { $this->closeObserver->onNext($reason); }