Skip to content

Commit

Permalink
Fixed reconnect timeout
Browse files Browse the repository at this point in the history
Added ping support
  • Loading branch information
davidwdan committed Dec 5, 2016
1 parent 8a03892 commit 44ae67d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 7 deletions.
19 changes: 14 additions & 5 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

final class Client
{

private $url, $loop, $realm, $session, $options, $messages, $webSocket, $scheduler, $disposable, $challengeCallback;

private $currentRetryCount = 0;

public function __construct(string $url, string $realm, array $options = [], LoopInterface $loop = null, Subject $webSocket = null, Observable $messages = null, Observable $session = null)
{
$this->url = $url;
Expand All @@ -41,7 +44,11 @@ public function __construct(string $url, string $realm, array $options = [], Loo

$this->messages = $messages ?: $this->webSocket->retryWhen([$this, '_reconnect'])->shareReplay(0);

$open->map(function () {
$open
->doOnNext(function(){
$this->currentRetryCount = 0;
})
->map(function () {
$this->options['roles'] = $this->roles();
return new HelloMessage($this->realm, (object)$this->options);
})->subscribe($this->webSocket, $this->scheduler);
Expand Down Expand Up @@ -211,15 +218,17 @@ public function close()

public function _reconnect(Observable $attempts)
{
$maxRetryDelay = 300000;
$maxRetryDelay = 150000;
$initialRetryDelay = 1500;
$retryDelayGrowth = 1.5;
$maxRetries = 150;
$exponent = 0;

return $attempts
->flatMap(function (\Exception $ex) use ($maxRetryDelay, $retryDelayGrowth, &$exponent, $initialRetryDelay) {
$delay = min($maxRetryDelay, pow($retryDelayGrowth, ++$exponent) + $initialRetryDelay);
->doOnNext(function(){

})
->flatMap(function (\Exception $ex) use ($maxRetryDelay, $retryDelayGrowth, $initialRetryDelay) {
$delay = min($maxRetryDelay, pow($retryDelayGrowth, ++$this->currentRetryCount) + $initialRetryDelay);
$seconds = number_format((float)$delay / 1000, 3, '.', '');;
echo "Error: ", $ex->getMessage(), PHP_EOL, "Reconnecting in ${seconds} seconds...", PHP_EOL;
return Observable::timer((int)$delay, $this->scheduler);
Expand Down
28 changes: 26 additions & 2 deletions src/Subject/WebSocketSubject.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Rx\Thruway\Subject;

use Ratchet\RFC6455\Messaging\Frame;
use React\EventLoop\Timer\Timer;
use Rx\Disposable\CallbackDisposable;
use Rx\Disposable\CompositeDisposable;
use Rx\React\RejectedPromiseException;
Expand Down Expand Up @@ -60,9 +62,31 @@ private function connectSocket()
function (WebSocket $ws) {
$this->socket = $ws;

$ws->on("error", [$this->output, "onError"]);
$ws->on("close", function ($reason) {
$lastReceivedPong = 0;
$pingTimer = $this->loop->addPeriodicTimer(30, function (Timer $timer) use ($ws, &$lastReceivedPong) {
static $sequence = 0;

if ($lastReceivedPong != $sequence) {
$timer->cancel();
$ws->close();
}

$sequence++;
$frame = new Frame($sequence, true, Frame::OP_PING);
$ws->send($frame);

});

$ws->on('pong', function (Frame $frame) use (&$lastReceivedPong) {
$lastReceivedPong = $frame->getPayload();
});

$ws->on("error", function (\Exception $ex) use ($pingTimer) {
$pingTimer->cancel();
$this->output->onError($ex);
});
$ws->on("close", function ($reason) use ($pingTimer) {
$pingTimer->cancel();
if ($this->closeObserver) {
$this->closeObserver->onNext($reason);
}
Expand Down

0 comments on commit 44ae67d

Please sign in to comment.