From f96fd0c45892cc4d63517ef1ae0040facb87677e Mon Sep 17 00:00:00 2001 From: David Dan Date: Wed, 21 Dec 2016 18:36:44 -0500 Subject: [PATCH 01/12] RxPHP v2 compatibility --- composer.json | 8 +- examples/authentication.php | 26 ++-- examples/publish.php | 7 +- examples/register-progress.php | 32 ++--- examples/rpc-extended.php | 56 ++++---- examples/rpc-progress.php | 31 +++-- examples/rpc-publish.php | 34 +++++ examples/rpc-repeat-progress.php | 27 ++-- examples/rpc-repeat.php | 23 ++-- examples/rpc.php | 40 +++--- examples/subscribe.php | 16 +-- src/Client.php | 115 ++++++++-------- src/Observable/CallObservable.php | 30 +++-- src/Observable/RegisterObservable.php | 26 ++-- src/Observable/TopicObservable.php | 15 ++- src/ReactAsyncInteropLoop.php | 127 ++++++++++++++++++ src/ReactAsyncInteropTimer.php | 24 ++++ src/Subject/SessionReplaySubject.php | 21 ++- src/Subject/WebSocketSubject.php | 24 ++-- src/SwitchFirstOperator.php | 65 --------- .../Observable/CallObservableTest.php | 91 ++++++------- .../Observable/RegisterObservableTest.php | 90 ++++++------- tests/bootstrap.php | 8 +- 23 files changed, 537 insertions(+), 399 deletions(-) create mode 100644 examples/rpc-publish.php create mode 100644 src/ReactAsyncInteropLoop.php create mode 100644 src/ReactAsyncInteropTimer.php delete mode 100644 src/SwitchFirstOperator.php diff --git a/composer.json b/composer.json index ed046bb..380d934 100644 --- a/composer.json +++ b/composer.json @@ -28,16 +28,20 @@ } }, "require": { - "reactivex/rxphp": "^1.4.1", "voryx/thruway-common": "^1.0.0", "php": "^7.0", "ratchet/pawl": "^0.2.2", - "voryx/event-loop": "^0.2.0" + "wyrihaximus/react-async-interop-loop": "dev-master", + "reactivex/rxphp": "2.x-dev" }, "repositories": [ { "type": "vcs", "url": "git@github.com:voryx/ThruwayCommon.git" + }, + { + "type": "vcs", + "url": "https://github.com/davidwdan/RxPHP.git" } ] } diff --git a/examples/authentication.php b/examples/authentication.php index b98fbf2..ced57f7 100644 --- a/examples/authentication.php +++ b/examples/authentication.php @@ -5,23 +5,27 @@ require __DIR__ . '/../vendor/autoload.php'; -$client = new Client('ws://127.0.0.1:9090', "somerealm", ["authmethods" => ["simplysimple"]]); +$client = new Client('ws://127.0.0.1:9090', 'somerealm', ['authmethods' => ['simplysimple']]); $client->onChallenge(function (Observable $challenge) { return $challenge->map(function ($args) { list($method, $extra) = $args; - return "letMeIn"; + return 'letMeIn'; }); }); -$client->register('com.myapp.example', function ($x) { - return $x; -})->subscribeCallback(function () { - echo "Registered ", PHP_EOL; -}); +$client + ->register('com.myapp.example', function ($x) { + return $x; + }) + ->subscribe(function () { + echo 'Registered ', PHP_EOL; + }); -$client->call('com.myapp.example', [1234])->subscribeCallback(function ($res) { - list($args, $argskw, $details) = $res; +$client + ->call('com.myapp.example', [1234]) + ->subscribe(function ($res) { + list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; -}); + echo 'Call result: ', $args[0], PHP_EOL; + }); diff --git a/examples/publish.php b/examples/publish.php index ee3070f..1b459af 100644 --- a/examples/publish.php +++ b/examples/publish.php @@ -5,8 +5,11 @@ require __DIR__ . '/../vendor/autoload.php'; -$client = new Client('ws://127.0.0.1:9090', "realm1"); +$driver = new \Amp\Loop\LoopFactory(); +\Interop\Async\Loop::setFactory($driver); + +$client = new Client('ws://127.0.0.1:9090', 'realm1'); $source = Observable::interval(1000); -$x = $client->publish('com.myapp.hello', $source); +$client->publish('com.myapp.hello', $source); diff --git a/examples/register-progress.php b/examples/register-progress.php index 1c9111a..485b07d 100644 --- a/examples/register-progress.php +++ b/examples/register-progress.php @@ -1,25 +1,25 @@ register('com.myapp.example', function ($x) { - return \Rx\Observable::interval(300, new \Rx\Scheduler\EventLoopScheduler(\EventLoop\getLoop()))->doOnNext(function () { - echo "."; - }); -}, ["progress" => true])->subscribeCallback( - function () { - echo "Registered ", PHP_EOL; - }, - function (Exception $e) { - echo "Register error: ", $e->getMessage(), PHP_EOL; - }, - function () { - echo "Register completed", PHP_EOL; - } -); +$client + ->register('com.myapp.example', function ($x) { + return \Rx\Observable::interval(300)->doOnNext(function () { + echo '.'; + }); + }, ['progress' => true]) + ->subscribe( + function () { + echo 'Registered ', PHP_EOL; + }, + function (Exception $e) { + echo 'Register error: ', $e->getMessage(), PHP_EOL; + }, + function () { + echo 'Register completed', PHP_EOL; + }); diff --git a/examples/rpc-extended.php b/examples/rpc-extended.php index 459d256..ca256f8 100644 --- a/examples/rpc-extended.php +++ b/examples/rpc-extended.php @@ -1,37 +1,37 @@ registerExtended('com.myapp.example', function ($args, $argskw, $details, $invocationMsg) { - return 1234567; -})->subscribeCallback( - function () { - echo "Registered ", PHP_EOL; - }, - function (Exception $e) { - echo "Register error: ", $e->getMessage(), PHP_EOL; - }, - function () { - echo "Register completed", PHP_EOL; - } -); +$client + ->registerExtended('com.myapp.example', function ($args, $argskw, $details, $invocationMsg) { + return 1234567; + }) + ->subscribe( + function () { + echo 'Registered ', PHP_EOL; + }, + function (Exception $e) { + echo 'Register error: ', $e->getMessage(), PHP_EOL; + }, + function () { + echo 'Register completed', PHP_EOL; + }); -$client->call('com.myapp.example', [123], ["foo" => "bar"]) - ->subscribe(new CallbackObserver( - function ($res) { - list($args, $argskw, $details) = $res; +$client + ->call('com.myapp.example', [123], ['foo' => 'bar']) + ->subscribe( + function ($res) { + list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; - }, - function (Exception $e) { - echo "Call error: ", $e->getMessage(), PHP_EOL; - }, - function () { - echo "Call completed", PHP_EOL; - }) - ); + echo 'Call result: ', $args[0], PHP_EOL; + }, + function (Exception $e) { + echo 'Call error: ', $e->getMessage(), PHP_EOL; + }, + function () { + echo 'Call completed', PHP_EOL; + }); diff --git a/examples/rpc-progress.php b/examples/rpc-progress.php index 77c69ea..aa5abc5 100644 --- a/examples/rpc-progress.php +++ b/examples/rpc-progress.php @@ -1,24 +1,23 @@ progressiveCall('com.myapp.example', [1234], []) -// ->take(10) - ->subscribe(new CallbackObserver( - function ($res) { - list($args, $argskw, $details) = $res; +$client + ->progressiveCall('com.myapp.example', [1234], []) + ->take(10) + ->subscribe( + function ($res) { + list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; - }, - function (Exception $e) { - echo "Call error: ", $e->getMessage(), PHP_EOL; - }, - function () { - echo "Call completed", PHP_EOL; - }) - ); + echo 'Call result: ', $args[0], PHP_EOL; + }, + function (Exception $e) { + echo 'Call error: ', $e->getMessage(), PHP_EOL; + }, + function () { + echo 'Call completed', PHP_EOL; + }); diff --git a/examples/rpc-publish.php b/examples/rpc-publish.php new file mode 100644 index 0000000..caeb3af --- /dev/null +++ b/examples/rpc-publish.php @@ -0,0 +1,34 @@ +publish('com.myapp.hello', $source); + + +$client->register('com.myapp.example', function ($x) { + return $x; +})->subscribe(new \Rx\Observer\CallbackObserver( + function () { + echo "Registered ", PHP_EOL; + }, + function (Exception $e) { + echo "Register error: ", $e->getMessage(), PHP_EOL; + }, + function () { + echo "Register completed", PHP_EOL; + }), new \Rx\Scheduler\EventLoopScheduler(\EventLoop\getLoop()) + +); + +\EventLoop\addTimer(1, function () use ($client) { + echo "a"; +// $client->publish('com.myapp.hello', "hi"); + $client->close(); +}); \ No newline at end of file diff --git a/examples/rpc-repeat-progress.php b/examples/rpc-repeat-progress.php index c20ee2c..018f731 100644 --- a/examples/rpc-repeat-progress.php +++ b/examples/rpc-repeat-progress.php @@ -1,37 +1,36 @@ progressiveRegister('com.myapp.example', function () { - return Observable::interval(500); -})->subscribe(new CallbackObserver(), $scheduler); +$client = new Client('ws://127.0.0.1:9090', 'realm1'); +$client + ->progressiveRegister('com.myapp.example', function () { + return Observable::interval(500); + }) + ->subscribe(); -$client->progressiveCall('com.myapp.example', [], []) +$client + ->progressiveCall('com.myapp.example', [], []) ->take(5) ->repeatWhen(function (Observable $attempts) { return $attempts->delay(1000)->take(1); }) - ->subscribe(new CallbackObserver( + ->subscribe( function ($res) { list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; + echo 'Call result: ', $args[0], PHP_EOL; }, function (Exception $e) { - echo "Call error: ", $e->getMessage(), PHP_EOL; + echo 'Call error: ', $e->getMessage(), PHP_EOL; }, function () { - echo "Call completed", PHP_EOL; - }), $scheduler); + echo 'Call completed', PHP_EOL; + }); //Output diff --git a/examples/rpc-repeat.php b/examples/rpc-repeat.php index b17f858..e1382bf 100644 --- a/examples/rpc-repeat.php +++ b/examples/rpc-repeat.php @@ -1,9 +1,12 @@ mapTo($client->call('com.myapp.example')) - ->switchLatest() + ->switch() ->take(1) ->timeout(2000) - ->repeatWhen(function (\Rx\Observable $attempts) { + ->repeatWhen(function (Observable $attempts) { return $attempts->delay(1000); }) - ->retryWhen(function (\Rx\Observable $errors) { + ->retryWhen(function (Observable $errors) { return $errors->delay(1000); }); -$source->subscribe(new \Rx\Observer\CallbackObserver( +$source->subscribe( function ($res) { list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; + echo 'Call result: ', $args[0], PHP_EOL; }, function (Exception $e) { - echo "Call error: ", $e->getMessage(), PHP_EOL; + echo 'Call error: ', $e->getMessage(), PHP_EOL; }, function () { - echo "Call completed", PHP_EOL; - }), $scheduler); + echo 'Call completed', PHP_EOL; + }); diff --git a/examples/rpc.php b/examples/rpc.php index 7d5e768..6a98272 100644 --- a/examples/rpc.php +++ b/examples/rpc.php @@ -1,37 +1,39 @@ register('com.myapp.example', function ($x) { - return $x; -})->subscribeCallback( +$client + ->register('com.myapp.example', function ($x) { + return $x; + }) + ->subscribe( function () { - echo "Registered ", PHP_EOL; + echo 'Registered ', PHP_EOL; }, function (Exception $e) { - echo "Register error: ", $e->getMessage(), PHP_EOL; + echo 'Register error: ', $e->getMessage(), PHP_EOL; }, function () { - echo "Register completed", PHP_EOL; + echo 'Register completed', PHP_EOL; } ); -$client->call('com.myapp.example', [1234]) - ->subscribe(new CallbackObserver( - function ($res) { - list($args, $argskw, $details) = $res; +$client + ->call('com.myapp.example', [1234]) + ->subscribe( + function ($res) { + list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; - }, - function (Exception $e) { - echo "Call error: ", $e->getMessage(), PHP_EOL; - }, - function () { - echo "Call completed", PHP_EOL; - }) + echo 'Call result: ', $args[0], PHP_EOL; + }, + function (Exception $e) { + echo 'Call error: ', $e->getMessage(), PHP_EOL; + }, + function () { + echo 'Call completed', PHP_EOL; + } ); diff --git a/examples/subscribe.php b/examples/subscribe.php index 37935b6..1c940fe 100644 --- a/examples/subscribe.php +++ b/examples/subscribe.php @@ -1,22 +1,22 @@ topic('com.myapp.hello')->subscribe(new CallbackObserver( +$client + ->topic('com.myapp.hello') + ->subscribe( function ($res) { list($args, $argskw, $details) = $res; - echo "Result: ", $args[0], PHP_EOL; + echo 'Result: ', $args[0], PHP_EOL; }, function (Exception $e) { - echo "Error: ", $e->getMessage(), PHP_EOL; + echo 'Error: ', $e->getMessage(), PHP_EOL; }, function () { - echo "Completed", PHP_EOL; - }) -); + echo 'Completed', PHP_EOL; + }); diff --git a/src/Client.php b/src/Client.php index 9f586b0..d95a3d3 100644 --- a/src/Client.php +++ b/src/Client.php @@ -2,11 +2,11 @@ namespace Rx\Thruway; +use Rx\Observer\CallbackObserver; +use Rx\Scheduler; use Rx\Thruway\Subject\SessionReplaySubject; use Rx\Thruway\Subject\WebSocketSubject; use Rx\Disposable\CompositeDisposable; -use Rx\Scheduler\EventLoopScheduler; -use React\EventLoop\LoopInterface; use Rx\DisposableInterface; use Thruway\Common\Utils; use Rx\Subject\Subject; @@ -20,38 +20,31 @@ final class Client { - - private $url, $loop, $realm, $session, $options, $messages, $webSocket, $scheduler, $disposable, $challengeCallback; + private $session, $messages, $webSocket, $disposable, $challengeCallback; private $currentRetryCount = 0; - public function __construct(string $url, string $realm, array $options = [], LoopInterface $loop = null, Subject $webSocket = null, Observable $messages = null, Observable $session = null) + public function __construct(string $url, string $realm, array $options = [], Subject $webSocket = null, Observable $messages = null, Observable $session = null) { - $this->url = $url; - $this->realm = $realm; - $this->options = $options; - $this->loop = $loop ?? \EventLoop\getLoop(); - $this->scheduler = new EventLoopScheduler($this->loop); - $this->disposable = new CompositeDisposable(); - $this->challengeCallback = function () { + $this->disposable = new CompositeDisposable(); + $this->challengeCallback = function () { }; $open = new Subject(); $close = new Subject(); $this->webSocket = $webSocket ?: new WebSocketSubject($url, ['wamp.2.json'], $open, $close); - - $this->messages = $messages ?: $this->webSocket->retryWhen([$this, '_reconnect'])->shareReplay(0); + $this->messages = $messages ?: $this->webSocket->retryWhen([$this, '_reconnect'])->shareReplay(0); $open - ->doOnNext(function(){ + ->doOnNext(function () { $this->currentRetryCount = 0; }) - ->map(function () { - $this->options['roles'] = $this->roles(); - return new HelloMessage($this->realm, (object)$this->options); - })->subscribe($this->webSocket, $this->scheduler); + ->map(function () use ($realm, $options) { + $options['roles'] = $this->roles(); + return new HelloMessage($realm, (object)$options); + })->subscribe($this->webSocket); $challengeMsg = $this->messages ->filter(function (Message $msg) { @@ -69,20 +62,20 @@ public function __construct(string $url, string $realm, array $options = [], Loo ->map(function ($signature) { return new AuthenticateMessage($signature); }) - ->catchError(function (\Exception $ex) { + ->catch(function (\Exception $ex) { if ($ex instanceof WampChallengeException) { return Observable::just($ex->getErrorMessage()); } return Observable::error($ex); }) - ->doOnEach($this->webSocket); + ->do($this->webSocket); $this->session = $session ?: $this->messages ->merge($challengeMsg) ->filter(function (Message $msg) { return $msg instanceof WelcomeMessage; }) - ->multicast(new SessionReplaySubject($close))->refCount(); + ->multicast(new SessionReplaySubject($close, Scheduler::getAsync()))->refCount(); $this->disposable->add($this->webSocket); } @@ -94,12 +87,12 @@ public function __construct(string $url, string $realm, array $options = [], Loo * @param array $options * @return Observable */ - public function call(string $uri, array $args = [], array $argskw = [], array $options = null) :Observable + public function call(string $uri, array $args = [], array $argskw = [], array $options = null): Observable { return $this->session ->take(1) ->mapTo(new CallObservable($uri, $this->messages, $this->webSocket, $args, $argskw, $options)) - ->switchLatest(); + ->switch(); } /** @@ -108,7 +101,7 @@ public function call(string $uri, array $args = [], array $argskw = [], array $o * @param array $options * @return Observable */ - public function register(string $uri, callable $callback, array $options = []) :Observable + public function register(string $uri, callable $callback, array $options = []): Observable { return $this->registerExtended($uri, $callback, $options, false); } @@ -123,7 +116,7 @@ public function register(string $uri, callable $callback, array $options = []) : * @param array $options * @return Observable */ - public function progressiveCall(string $uri, array $args = [], array $argskw = [], array $options = null) :Observable + public function progressiveCall(string $uri, array $args = [], array $argskw = [], array $options = null): Observable { $options['receive_progress'] = true; @@ -135,7 +128,7 @@ public function progressiveCall(string $uri, array $args = [], array $argskw = [ ->mapTo($callObs->doOnCompleted(function () use ($completed) { $completed->onNext(0); })) - ->switchLatest(); + ->switch(); } /** @@ -144,7 +137,7 @@ public function progressiveCall(string $uri, array $args = [], array $argskw = [ * @param array $options * @return Observable */ - public function progressiveRegister(string $uri, callable $callback, array $options = []) :Observable + public function progressiveRegister(string $uri, callable $callback, array $options = []): Observable { $options['progress'] = true; @@ -160,11 +153,11 @@ public function progressiveRegister(string $uri, callable $callback, array $opti * @param bool $extended * @return Observable */ - public function registerExtended(string $uri, callable $callback, array $options = [], bool $extended = true) :Observable + public function registerExtended(string $uri, callable $callback, array $options = [], bool $extended = true): Observable { return $this->session ->mapTo(new RegisterObservable($uri, $callback, $this->messages, $this->webSocket, $options, $extended)) - ->switchLatest(); + ->switch(); } /** @@ -172,12 +165,11 @@ public function registerExtended(string $uri, callable $callback, array $options * @param array $options * @return Observable */ - public function topic(string $uri, array $options = []) :Observable + public function topic(string $uri, array $options = []): Observable { return $this->session ->mapTo(new TopicObservable($uri, $options, $this->messages, $this->webSocket)) - ->switchLatest() - ->subscribeOn($this->scheduler); + ->switch(); } /** @@ -185,8 +177,9 @@ public function topic(string $uri, array $options = []) :Observable * @param mixed | Observable $obs * @param array $options * @return DisposableInterface + * @throws \InvalidArgumentException */ - public function publish(string $uri, $obs, array $options = []) : DisposableInterface + public function publish(string $uri, $obs, array $options = []): DisposableInterface { $obs = $obs instanceof Observable ? $obs : Observable::just($obs); @@ -197,13 +190,11 @@ public function publish(string $uri, $obs, array $options = []) : DisposableInte ->mapTo($obs->doOnCompleted(function () use ($completed) { $completed->onNext(0); })) - ->lift(function () { - return new SwitchFirstOperator(); - }) + ->switchFirst() ->map(function ($value) use ($uri, $options) { return new PublishMessage(Utils::getUniqueId(), (object)$options, $uri, [$value]); }) - ->subscribe($this->webSocket, $this->scheduler); + ->subscribe($this->webSocket); } public function onChallenge(callable $challengeCallback) @@ -227,8 +218,8 @@ public function _reconnect(Observable $attempts) ->flatMap(function (\Exception $ex) use ($maxRetryDelay, $retryDelayGrowth, $initialRetryDelay) { $delay = min($maxRetryDelay, pow($retryDelayGrowth, ++$this->currentRetryCount) + $initialRetryDelay); $seconds = number_format((float)$delay / 1000, 3, '.', '');; - echo "Error: ", $ex->getMessage(), PHP_EOL, "Reconnecting in ${seconds} seconds...", PHP_EOL; - return Observable::timer((int)$delay, $this->scheduler); + echo 'Error: ', $ex->getMessage(), PHP_EOL, "Reconnecting in ${seconds} seconds...", PHP_EOL; + return Observable::timer((int)$delay); }) ->take($maxRetries); } @@ -236,34 +227,34 @@ public function _reconnect(Observable $attempts) private function roles() { return [ - "caller" => [ - "features" => [ - "caller_identification" => true, - "progressive_call_results" => true + 'caller' => [ + 'features' => [ + 'caller_identification' => true, + 'progressive_call_results' => true ] ], - "callee" => [ - "features" => [ - "call_canceling" => true, - "caller_identification" => true, - "pattern_based_registration" => true, - "shared_registration" => true, - "progressive_call_results" => true, - "registration_revocation" => true + 'callee' => [ + 'features' => [ + 'call_canceling' => true, + 'caller_identification' => true, + 'pattern_based_registration' => true, + 'shared_registration' => true, + 'progressive_call_results' => true, + 'registration_revocation' => true ] ], - "publisher" => [ - "features" => [ - "publisher_identification" => true, - "subscriber_blackwhite_listing" => true, - "publisher_exclusion" => true + 'publisher' => [ + 'features' => [ + 'publisher_identification' => true, + 'subscriber_blackwhite_listing' => true, + 'publisher_exclusion' => true ] ], - "subscriber" => [ - "features" => [ - "publisher_identification" => true, - "pattern_based_subscription" => true, - "subscription_revocation" => true + 'subscriber' => [ + 'features' => [ + 'publisher_identification' => true, + 'pattern_based_subscription' => true, + 'subscription_revocation' => true ] ] ]; diff --git a/src/Observable/CallObservable.php b/src/Observable/CallObservable.php index 57b0f57..6f99871 100644 --- a/src/Observable/CallObservable.php +++ b/src/Observable/CallObservable.php @@ -5,6 +5,9 @@ use Rx\Disposable\CallbackDisposable; use Rx\Disposable\CompositeDisposable; use Rx\Disposable\EmptyDisposable; +use Rx\DisposableInterface; +use Rx\Scheduler; +use Rx\SchedulerInterface; use Thruway\WampErrorException; use Rx\ObserverInterface; use Thruway\Common\Utils; @@ -16,10 +19,18 @@ final class CallObservable extends Observable { - private $uri, $args, $argskw, $options, $messages, $webSocket, $timeout, $completed; - - function __construct(string $uri, Observable $messages, Subject $webSocket, array $args = null, array $argskw = null, array $options = null, int $timeout = 300000) - { + private $uri, $args, $argskw, $options, $messages, $webSocket, $timeout, $completed, $scheduler; + + public function __construct( + string $uri, + Observable $messages, + Subject $webSocket, + array $args = null, + array $argskw = null, + array $options = null, + int $timeout = 300000, + SchedulerInterface $scheduler = null + ) { $this->uri = $uri; $this->args = $args; $this->argskw = $argskw; @@ -28,9 +39,10 @@ function __construct(string $uri, Observable $messages, Subject $webSocket, arra $this->webSocket = $webSocket; $this->timeout = $timeout; $this->completed = false; + $this->scheduler = $scheduler ?: Scheduler::getDefault(); } - public function subscribe(ObserverInterface $observer, $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { $requestId = Utils::getUniqueId(); $callMsg = new CallMessage($requestId, $this->options, $this->uri, $this->args, $this->argskw); @@ -52,8 +64,8 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) return Observable::fromArray([ new ResultMessage($msg->getRequestId(), $details, $msg->getArguments(), $msg->getArgumentsKw()), - new ResultMessage($msg->getRequestId(), (object)["progress" => false]) - ]); + new ResultMessage($msg->getRequestId(), (object)['progress' => false]) + ], $this->scheduler); } return Observable::just($msg); }) @@ -70,7 +82,7 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) return $msg instanceof ErrorMessage && $msg->getErrorRequestId() === $requestId; }) ->flatMap(function (ErrorMessage $msg) { - return Observable::error(new WampErrorException($msg->getErrorURI(), $msg->getArguments())); + return Observable::error(new WampErrorException($msg->getErrorURI(), $msg->getArguments()), $this->scheduler); }) ->takeUntil($resultMsg) ->take(1); @@ -101,7 +113,7 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) } }), - $result->subscribe($observer, $scheduler) + $result->subscribe($observer) ]); } } diff --git a/src/Observable/RegisterObservable.php b/src/Observable/RegisterObservable.php index 6528f93..acd1be8 100644 --- a/src/Observable/RegisterObservable.php +++ b/src/Observable/RegisterObservable.php @@ -2,8 +2,11 @@ namespace Rx\Thruway\Observable; +use Rx\DisposableInterface; use Rx\Observable; use Rx\ObserverInterface; +use Rx\Scheduler; +use Rx\SchedulerInterface; use Rx\Subject\Subject; use Thruway\Common\Utils; use Thruway\WampErrorException; @@ -16,9 +19,9 @@ final class RegisterObservable extends Observable { - private $uri, $options, $messages, $ws, $callback, $extended, $logSubject, $invocationErrors; + private $uri, $options, $messages, $ws, $callback, $extended, $logSubject, $invocationErrors, $scheduler; - function __construct(string $uri, callable $callback, Observable $messages, Subject $ws, array $options = [], bool $extended = false, Subject $logSubject = null) + public function __construct(string $uri, callable $callback, Observable $messages, Subject $ws, array $options = [], bool $extended = false, Subject $logSubject = null, SchedulerInterface $scheduler = null) { $this->uri = $uri; $this->options = $options; @@ -28,9 +31,10 @@ function __construct(string $uri, callable $callback, Observable $messages, Subj $this->extended = $extended; $this->logSubject = $logSubject ?: new Subject(); $this->invocationErrors = new Subject(); + $this->scheduler = $scheduler ?: Scheduler::getDefault(); } - public function subscribe(ObserverInterface $observer, $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { $requestId = Utils::getUniqueId(); $disposable = new CompositeDisposable(); @@ -65,7 +69,7 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) return $msg instanceof ErrorMessage && $msg->getErrorRequestId() === $requestId; }) ->flatMap(function (ErrorMessage $msg) { - return Observable::error(new WampErrorException($msg->getErrorURI(), $msg->getArguments())); + return Observable::error(new WampErrorException($msg->getErrorURI(), $msg->getArguments()), $this->scheduler); }) ->takeUntil($registeredMsg) ->take(1); @@ -91,7 +95,7 @@ function () use (&$completed, $observer, $unregister) { $completed = true; $observer->onCompleted(); } - ), $scheduler); + )); $invocationSubscription = $invocationMsg ->flatMap(function (InvocationMessage $msg) { @@ -104,10 +108,10 @@ function () use (&$completed, $observer, $unregister) { } } catch (\Exception $e) { $this->invocationErrors->onNext(new WampInvocationException($msg)); - return Observable::emptyObservable(); + return Observable::emptyObservable($this->scheduler); } - $resultObs = $result instanceof Observable ? $result : Observable::just($result); + $resultObs = $result instanceof Observable ? $result : Observable::just($result, $this->scheduler); if (($this->options['progress'] ?? false) === false) { $returnObs = $resultObs @@ -121,7 +125,7 @@ function () use (&$completed, $observer, $unregister) { ->map(function ($value) use ($msg) { return [$value, $msg, $this->options]; }) - ->concat(Observable::just([null, $msg, ["progress" => false]])); + ->concat(Observable::just([null, $msg, ["progress" => false]], $this->scheduler)); } $interruptMsg = $this->messages @@ -134,7 +138,7 @@ function () use (&$completed, $observer, $unregister) { ->takeUntil($interruptMsg) ->catchError(function (\Exception $ex) use ($msg) { $this->invocationErrors->onNext(new WampInvocationException($msg)); - return Observable::emptyObservable(); + return Observable::emptyObservable($this->scheduler); }); }) @@ -144,13 +148,13 @@ function () use (&$completed, $observer, $unregister) { return new YieldMessage($invocationMsg->getRequestId(), $options, [$value]); }) - ->subscribe($this->ws, $scheduler); + ->subscribe($this->ws); $invocationErrors = $this->invocationErrors ->map(function (WampInvocationException $error) { return $error->getErrorMessage(); }) - ->subscribe($this->ws, $scheduler); + ->subscribe($this->ws); $disposable->add($invocationErrors); $disposable->add($invocationSubscription); diff --git a/src/Observable/TopicObservable.php b/src/Observable/TopicObservable.php index 2a607fe..87aafdd 100644 --- a/src/Observable/TopicObservable.php +++ b/src/Observable/TopicObservable.php @@ -2,6 +2,7 @@ namespace Rx\Thruway\Observable; +use Rx\DisposableInterface; use Rx\Observable; use Rx\Subject\Subject; use Thruway\Common\Utils; @@ -17,15 +18,15 @@ final class TopicObservable extends Observable { private $uri, $options, $messages, $webSocket; - function __construct(string $uri, array $options, Observable $messages, Subject $webSocket) + public function __construct(string $uri, array $options, Observable $messages, Subject $webSocket) { - $this->uri = $uri; - $this->options = (object)$options; - $this->messages = $messages; + $this->uri = $uri; + $this->options = (object)$options; + $this->messages = $messages; $this->webSocket = $webSocket; } - public function subscribe(ObserverInterface $observer, $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { $requestId = Utils::getUniqueId(); $subscriptionId = null; @@ -60,13 +61,13 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) }); }) ->merge($errorMsg) - ->subscribe($observer, $scheduler); + ->subscribe($observer); $disposable = new CompositeDisposable(); $disposable->add($sub); - $disposable->add(new CallbackDisposable(function () use (&$subscriptionId, $scheduler) { + $disposable->add(new CallbackDisposable(function () use (&$subscriptionId) { if (!$subscriptionId) { return; } diff --git a/src/ReactAsyncInteropLoop.php b/src/ReactAsyncInteropLoop.php new file mode 100644 index 0000000..1950e56 --- /dev/null +++ b/src/ReactAsyncInteropLoop.php @@ -0,0 +1,127 @@ +readStreams[$key])) { + throw new \Exception('key set twice'); + } + $this->readStreams[$key] = Loop::get()->onReadable($stream, function () use ($listener, $stream) { + $listener($stream); + }); + } + + public function addWriteStream($stream, callable $listener) + { + $key = (int)$stream; + + if (isset($this->writeStreams[$key])) { + throw new \Exception('key set twice'); + } + + $this->writeStreams[$key] = Loop::get()->onWritable($stream, function () use ($listener, $stream) { + $listener($stream); + }); + } + + public function removeReadStream($stream) + { + $key = (int)$stream; + if (isset($this->readStreams[$key])) { + Loop::get()->cancel($this->readStreams[$key]); + unset($this->readStreams[$key]); + } + } + + public function removeWriteStream($stream) + { + $key = (int)$stream; + if (isset($this->writeStreams[$key])) { + Loop::get()->cancel($this->writeStreams[$key]); + unset($this->writeStreams[$key]); + } + } + + public function removeStream($stream) + { + $this->removeReadStream($stream); + $this->removeWriteStream($stream); + } + + private function addWrappedTimer($interval, callable $callback, $isPeriodic = false) + { + $wrappedCallback = function () use (&$timer, $callback) { + $callback($timer); + }; + $millis = $interval * 1000; + if ($isPeriodic) { + $timerKey = Loop::get()->repeat($millis, $wrappedCallback); + } else { + $timerKey = Loop::get()->delay($millis, $wrappedCallback); + } + $timer = new ReactAsyncInteropTimer( + $timerKey, + $interval, + $callback, + $this, + false + ); + return $timer; + } + + public function addTimer($interval, callable $callback) + { + return $this->addWrappedTimer($interval, $callback); + } + + public function addPeriodicTimer($interval, callable $callback) + { + return $this->addWrappedTimer($interval, $callback, true); + } + + public function cancelTimer(TimerInterface $timer) + { + $timer->cancel(); + } + + public function isTimerActive(TimerInterface $timer) + { + return $timer->isActive(); + } + + public function nextTick(callable $listener) + { + Loop::get()->defer($listener); + } + + public function futureTick(callable $listener) + { + Loop::get()->defer($listener); + } + + public function tick() + { + throw new \Exception("This is not a real react loop - no ticking - sorry."); + } + + public function run() + { + Loop::get()->run(); + } + + public function stop() + { + Loop::get()->stop(); + } +} diff --git a/src/ReactAsyncInteropTimer.php b/src/ReactAsyncInteropTimer.php new file mode 100644 index 0000000..f25a796 --- /dev/null +++ b/src/ReactAsyncInteropTimer.php @@ -0,0 +1,24 @@ +timerKey = $timerKey; + + parent::__construct($loop, $interval, $callback, $isPeriodic, $data); + } + + public function cancel() + { + Loop::get()->cancel($this->timerKey); + } +} diff --git a/src/Subject/SessionReplaySubject.php b/src/Subject/SessionReplaySubject.php index a52a887..66d49f7 100644 --- a/src/Subject/SessionReplaySubject.php +++ b/src/Subject/SessionReplaySubject.php @@ -3,10 +3,10 @@ namespace Rx\Thruway\Subject; use Rx\Disposable\CallbackDisposable; +use Rx\DisposableInterface; use Rx\Observable; use Rx\Observer\ScheduledObserver; use Rx\ObserverInterface; -use Rx\Scheduler\ImmediateScheduler; use Rx\SchedulerInterface; use Rx\Subject\Subject; @@ -20,29 +20,26 @@ class SessionReplaySubject extends Subject private $value; - public function __construct(Observable $close) + public function __construct(Observable $close, SchedulerInterface $scheduler) { - $this->scheduler = new ImmediateScheduler(); + $this->scheduler = $scheduler; - $close->subscribeCallback(function () { + $close->subscribe(function () { $this->value = null; }); } - public function subscribe(ObserverInterface $observer, SchedulerInterface $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { $this->assertNotDisposed(); - if (!$scheduler) { - $scheduler = $this->scheduler; - } - $so = new ScheduledObserver($scheduler, $observer); + $so = new ScheduledObserver($this->scheduler, $observer); $subscription = $this->createRemovableDisposable($this, $so); $this->observers[] = $so; - if ($this->value){ + if ($this->value) { $so->onNext($this->value); } @@ -76,12 +73,12 @@ public function onNext($value) } } - private function createRemovableDisposable($subject, $observer) + private function createRemovableDisposable(Subject $subject, ObserverInterface $observer): DisposableInterface { return new CallbackDisposable(function () use ($observer, $subject) { $observer->dispose(); if (!$subject->isDisposed()) { - array_splice($subject->observers, array_search($observer, $subject->observers), 1); + array_splice($subject->observers, array_search($observer, $subject->observers, true), 1); } }); } diff --git a/src/Subject/WebSocketSubject.php b/src/Subject/WebSocketSubject.php index 54360f0..d22f47d 100644 --- a/src/Subject/WebSocketSubject.php +++ b/src/Subject/WebSocketSubject.php @@ -6,10 +6,10 @@ use React\EventLoop\Timer\Timer; use Rx\Disposable\CallbackDisposable; use Rx\Disposable\CompositeDisposable; -use Rx\React\RejectedPromiseException; +use Rx\DisposableInterface; +use Rx\Thruway\ReactAsyncInteropLoop; use Thruway\Message\AbortMessage; use Thruway\Serializer\JsonSerializer; -use React\EventLoop\LoopInterface; use Ratchet\Client\Connector; use Ratchet\Client\WebSocket; use Rx\ObserverInterface; @@ -19,19 +19,19 @@ final class WebSocketSubject extends Subject { private $url, $protocols, $socket, $openObserver, $closeObserver, $output, $loop, $serializer; - public function __construct(string $url, array $protocols = [], Subject $openObserver = null, Subject $closeObserver = null, WebSocket $socket = null, LoopInterface $loop = null) + public function __construct(string $url, array $protocols = [], Subject $openObserver = null, Subject $closeObserver = null, WebSocket $socket = null) { $this->url = $url; $this->protocols = $protocols; $this->socket = $socket; $this->openObserver = $openObserver; $this->closeObserver = $closeObserver; - $this->loop = $loop ?: \EventLoop\getLoop(); $this->serializer = new JsonSerializer(); + $this->loop = new ReactAsyncInteropLoop(); $this->output = new Subject(); } - public function subscribe(ObserverInterface $observer, $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { $this->output = new Subject(); @@ -41,10 +41,10 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) $disposable = new CompositeDisposable(); - $disposable->add($this->output->subscribe($observer, $scheduler)); + $disposable->add($this->output->subscribe($observer)); $disposable->add(new CallbackDisposable(function () { - if (!$this->output->hasObservers() && $this->socket) { + if ($this->socket && !$this->output->hasObservers()) { $this->socket->close(); $this->socket = null; } @@ -66,7 +66,7 @@ function (WebSocket $ws) { $pingTimer = $this->loop->addPeriodicTimer(30, function (Timer $timer) use ($ws, &$lastReceivedPong) { static $sequence = 0; - if ($lastReceivedPong != $sequence) { + if ($lastReceivedPong !== $sequence) { $timer->cancel(); $ws->close(); } @@ -81,11 +81,11 @@ function (WebSocket $ws) { $lastReceivedPong = $frame->getPayload(); }); - $ws->on("error", function (\Exception $ex) use ($pingTimer) { + $ws->on('error', function (\Exception $ex) use ($pingTimer) { $pingTimer->cancel(); $this->output->onError($ex); }); - $ws->on("close", function ($reason) use ($pingTimer) { + $ws->on('close', function ($reason) use ($pingTimer) { $pingTimer->cancel(); if ($this->closeObserver) { $this->closeObserver->onNext($reason); @@ -98,7 +98,7 @@ function (WebSocket $ws) { $this->output->onError(new \Exception($reason)); }); - $ws->on("message", function ($message) { + $ws->on('message', function ($message) { $msg = $this->serializer->deserialize($message); @@ -115,7 +115,7 @@ function (WebSocket $ws) { } }, function ($error) { - $error = $error instanceof \Exception ? $error : new RejectedPromiseException($error); + $error = $error instanceof \Exception ? $error : new \Exception($error); $this->output->onError($error); }); diff --git a/src/SwitchFirstOperator.php b/src/SwitchFirstOperator.php deleted file mode 100644 index 857157a..0000000 --- a/src/SwitchFirstOperator.php +++ /dev/null @@ -1,65 +0,0 @@ -add($singleDisposable); - - $callbackObserver = new CallbackObserver( - function (Observable $x) use ($disposable, $scheduler, $observer) { - if ($this->hasCurrent) { - return; - } - $this->hasCurrent = true; - - $inner = new SingleAssignmentDisposable(); - $disposable->add($inner); - - $innerSub = $x->subscribe(new CallbackObserver( - [$observer, "onNext"], - [$observer, "onError"], - function () use ($disposable, $inner, $observer) { - $disposable->remove($inner); - $this->hasCurrent = false; - - if ($this->isStopped && $disposable->count() === 1) { - $observer->onCompleted(); - } - } - ), $scheduler); - - $inner->setDisposable($innerSub); - }, - [$observer, 'onError'], - function () use ($disposable, $observer) { - $this->isStopped = true; - if (!$this->hasCurrent && $disposable->count() === 1) { - $observer->onCompleted(); - } - } - ); - - $singleDisposable->setDisposable($observable->subscribe($callbackObserver, $scheduler)); - - return $disposable; - - } -} diff --git a/tests/Functional/Observable/CallObservableTest.php b/tests/Functional/Observable/CallObservableTest.php index 5f5a6eb..6bbecad 100644 --- a/tests/Functional/Observable/CallObservableTest.php +++ b/tests/Functional/Observable/CallObservableTest.php @@ -18,17 +18,17 @@ class CallObservableTest extends FunctionalTestCase /** * @test */ - function call_message_never() + public function call_message_never() { $messages = Observable::never(); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) { + $webSocket->subscribe(function ($msg) { $this->recordWampMessage($msg); }); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([], $results->getMessages()); @@ -42,7 +42,7 @@ function call_message_never() /** * @test */ - function call_messages_empty() + public function call_messages_empty() { $messages = $this->createHotObservable([ onNext(150, 1), @@ -50,12 +50,12 @@ function call_messages_empty() ]); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) { + $webSocket->subscribe(function ($msg) { $this->recordWampMessage($msg); }); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -66,13 +66,12 @@ function call_messages_empty() /** * @test */ - function call_one_no_args() + public function call_one_no_args() { - $resultMessage = new ResultMessage(null, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -85,7 +84,7 @@ function call_one_no_args() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -97,7 +96,7 @@ function call_one_no_args() /** * @test */ - function call_one_with_args() + public function call_one_with_args() { $args = ["testing"]; @@ -105,7 +104,7 @@ function call_one_with_args() $resultMessage = new ResultMessage(null, new \stdClass(), $args); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -118,7 +117,7 @@ function call_one_with_args() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -130,7 +129,7 @@ function call_one_with_args() /** * @test */ - function call_one_with_argskw() + public function call_one_with_argskw() { $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -138,7 +137,7 @@ function call_one_with_argskw() $resultMessage = new ResultMessage(null, new \stdClass(), $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -151,7 +150,7 @@ function call_one_with_argskw() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -164,9 +163,8 @@ function call_one_with_argskw() /** * @test */ - function call_one_with_details() + public function call_one_with_details() { - $args = ["testing"]; $argskw = (object)["foo" => "bar"]; $details = (object)["one" => "two"]; @@ -174,7 +172,7 @@ function call_one_with_details() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -187,7 +185,7 @@ function call_one_with_details() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -200,9 +198,8 @@ function call_one_with_details() /** * @test */ - function call_one_reconnect() + public function call_one_reconnect() { - $args = ["testing"]; $argskw = (object)["foo" => "bar"]; $details = (object)["one" => "two"]; @@ -210,7 +207,7 @@ function call_one_reconnect() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -226,7 +223,7 @@ function call_one_reconnect() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -239,9 +236,8 @@ function call_one_reconnect() /** * @test */ - function call_one_throw_error_before() + public function call_one_throw_error_before() { - $error = new \Exception("testing"); $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -250,7 +246,7 @@ function call_one_throw_error_before() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) use ($resultMessage) { + $webSocket->subscribe(function ($msg) use ($resultMessage) { $this->recordWampMessage($msg); if ($msg instanceof CallMessage) { $requestId = $msg->getRequestId(); @@ -267,7 +263,7 @@ function call_one_throw_error_before() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -284,9 +280,8 @@ function call_one_throw_error_before() /** * @test */ - function call_one_throw_error_after_welcome() + public function call_one_throw_error_after_welcome() { - $error = new \Exception("testing"); $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -295,7 +290,7 @@ function call_one_throw_error_after_welcome() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) use ($resultMessage) { + $webSocket->subscribe(function ($msg) use ($resultMessage) { $this->recordWampMessage($msg); if ($msg instanceof CallMessage) { $requestId = $msg->getRequestId(); @@ -312,7 +307,7 @@ function call_one_throw_error_after_welcome() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -328,9 +323,8 @@ function call_one_throw_error_after_welcome() /** * @test */ - function call_one_throw_error_after_result() + public function call_one_throw_error_after_result() { - $error = new \Exception("testing"); $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -339,7 +333,7 @@ function call_one_throw_error_after_result() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -353,7 +347,7 @@ function call_one_throw_error_after_result() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -365,7 +359,7 @@ function call_one_throw_error_after_result() /** * @test */ - function call_one_throw_sendMessage() + public function call_one_throw_sendMessage() { $error = new \Exception("testing"); @@ -376,7 +370,7 @@ function call_one_throw_sendMessage() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($error) { + $webSocket->subscribe(function (CallMessage $msg) use ($error) { throw $error; }); @@ -388,7 +382,7 @@ function call_one_throw_sendMessage() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -399,13 +393,12 @@ function call_one_throw_sendMessage() /** * @test */ - function call_one_error_message() + public function call_one_error_message() { - $errorMessage = new ErrorMessage(12345, null, new \stdClass(), 'some.server.error'); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) use ($errorMessage) { + $webSocket->subscribe(function ($msg) use ($errorMessage) { if ($msg instanceof CallMessage) { $requestId = $msg->getRequestId(); $errorMessage->setErrorRequestId($requestId); @@ -421,7 +414,7 @@ function call_one_error_message() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -437,7 +430,7 @@ function call_one_error_message() /** * @test */ - function call_one_dispose_before() + public function call_one_dispose_before() { $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -446,7 +439,7 @@ function call_one_dispose_before() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) use ($resultMessage) { + $webSocket->subscribe(function ($msg) use ($resultMessage) { if ($msg instanceof CallMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); @@ -462,7 +455,7 @@ function call_one_dispose_before() ]); $results = $this->scheduler->startWithDispose(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }, 220); $this->assertMessages([], $results->getMessages()); @@ -477,7 +470,7 @@ function call_one_dispose_before() /** * @test */ - function call_one_dispose_after() + public function call_one_dispose_after() { $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -486,7 +479,7 @@ function call_one_dispose_after() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -499,7 +492,7 @@ function call_one_dispose_after() ]); $results = $this->scheduler->startWithDispose(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }, 260); $this->assertMessages([ diff --git a/tests/Functional/Observable/RegisterObservableTest.php b/tests/Functional/Observable/RegisterObservableTest.php index 80327fc..bb29559 100644 --- a/tests/Functional/Observable/RegisterObservableTest.php +++ b/tests/Functional/Observable/RegisterObservableTest.php @@ -24,23 +24,23 @@ public function callable($first = 0, $second = 0) public function callableObs($first = 0, $second = 0) { - return Observable::just($first + $second); + return Observable::just($first + $second, $this->scheduler); } public function callableManyObs($first = 0, $second = 0) { - return Observable::fromArray([$first, $second]); + return Observable::fromArray([$first, $second], $this->scheduler); } /** * @test */ - function register_message_never() + public function register_message_never() { $messages = Observable::never(); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (RegisterMessage $msg) { + $webSocket->subscribe(function (RegisterMessage $msg) { $this->assertEquals($msg->getUri(), 'testing.uri'); }); @@ -54,7 +54,7 @@ function register_message_never() /** * @test */ - function register_messages_empty() + public function register_messages_empty() { $messages = $this->createHotObservable([ onNext(150, 1), @@ -62,7 +62,7 @@ function register_messages_empty() ]); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (RegisterMessage $msg) { + $webSocket->subscribe(function (RegisterMessage $msg) { $this->assertEquals($msg->getUri(), 'testing.uri'); }); @@ -82,12 +82,12 @@ function register_messages_empty() /** * @test */ - function register_with_no_invocation() + public function register_with_no_invocation() { $registeredMsg = new RegisteredMessage(null, 54321); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -122,12 +122,12 @@ function register_with_no_invocation() /** * @test */ - function register_with_no_invocation_no_complete() + public function register_with_no_invocation_no_complete() { $registeredMsg = new RegisteredMessage(null, 54321); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -159,13 +159,13 @@ function register_with_no_invocation_no_complete() /** * @test */ - function register_with_one_invocation_no_args() + public function register_with_one_invocation_no_args() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -182,7 +182,7 @@ function register_with_one_invocation_no_args() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -201,13 +201,13 @@ function register_with_one_invocation_no_args() /** * @test */ - function register_reconnect() + public function register_reconnect() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -226,7 +226,7 @@ function register_reconnect() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -245,13 +245,13 @@ function register_reconnect() /** * @test */ - function register_with_many_invocations_no_args() + public function register_with_many_invocations_no_args() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -271,7 +271,7 @@ function register_with_many_invocations_no_args() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -293,13 +293,13 @@ function register_with_many_invocations_no_args() /** * @test */ - function register_with_one_invocation_with_one_arg() + public function register_with_one_invocation_with_one_arg() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass(), [1]); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -316,7 +316,7 @@ function register_with_one_invocation_with_one_arg() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -335,13 +335,13 @@ function register_with_one_invocation_with_one_arg() /** * @test */ - function register_with_one_invocation_with_two_arg() + public function register_with_one_invocation_with_two_arg() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass(), [1, 2]); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -358,7 +358,7 @@ function register_with_one_invocation_with_two_arg() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -377,13 +377,13 @@ function register_with_one_invocation_with_two_arg() /** * @test */ - function register_with_one_invocation_with_two_arg_obs() + public function register_with_one_invocation_with_two_arg_obs() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass(), [1, 2]); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -400,7 +400,7 @@ function register_with_one_invocation_with_two_arg_obs() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callableObs'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callableObs'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -423,13 +423,13 @@ function register_with_one_invocation_with_two_arg_obs() /** * @test */ - function register_callback_throws() + public function register_callback_throws() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -448,7 +448,7 @@ function register_callback_throws() $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { return new RegisterObservable('testing.uri', function () { throw new \Exception('error'); - }, $messages, $webSocket); + }, $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -467,13 +467,13 @@ function register_callback_throws() /** * @test */ - function register_callback_throws_multiple() + public function register_callback_throws_multiple() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -495,7 +495,7 @@ function register_callback_throws_multiple() $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { return new RegisterObservable('testing.uri', function () { throw new \Exception('error'); - }, $messages, $webSocket); + }, $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -517,13 +517,13 @@ function register_callback_throws_multiple() /** * @test */ - function register_callback_errors() + public function register_callback_errors() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -541,8 +541,8 @@ function register_callback_errors() $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { return new RegisterObservable('testing.uri', function () { - return Observable::error(new \Exception('error')); - }, $messages, $webSocket); + return Observable::error(new \Exception('error'), $this->scheduler); + }, $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -561,13 +561,13 @@ function register_callback_errors() /** * @test */ - function register_callback_errors_multiple() + public function register_callback_errors_multiple() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -588,8 +588,8 @@ function register_callback_errors_multiple() $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { return new RegisterObservable('testing.uri', function () { - return Observable::error(new \Exception('error')); - }, $messages, $webSocket); + return Observable::error(new \Exception('error'), $this->scheduler); + }, $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -611,12 +611,12 @@ function register_callback_errors_multiple() /** * @test */ - function register_with_registration_error() + public function register_with_registration_error() { $errorMsg = new ErrorMessage(null, 54321, new \stdClass(), "registration.error.uri"); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($errorMsg) { + $webSocket->subscribe(function (Message $msg) use ($errorMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $errorMsg->setErrorRequestId($requestId); @@ -633,7 +633,7 @@ function register_with_registration_error() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ diff --git a/tests/bootstrap.php b/tests/bootstrap.php index a10c036..78aa35f 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -31,4 +31,10 @@ require_once $file; break; } -} \ No newline at end of file +} + +/** + * The default scheduler is the EventLoopScheduler, which is asynchronous. + * For testing we need to block at `subscribe`, so we need to switch the default to the ImmediateScheduler. + */ +\Rx\Scheduler::setDefault(new \Rx\Scheduler\ImmediateScheduler()); \ No newline at end of file From 7b1df198f09483e4a857c833b404d523fbb3ae7d Mon Sep 17 00:00:00 2001 From: David Dan Date: Thu, 22 Dec 2016 12:01:31 -0500 Subject: [PATCH 02/12] =?UTF-8?q?Moved=20ReactPHP=20asyc-interop=20loop=20?= =?UTF-8?q?bridge=20to=20it=E2=80=99s=20own=20project=20`voryx/pool-poretn?= =?UTF-8?q?i-cnysa-tcaer`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .travis.yml | 1 + composer.json | 10 ++- src/ReactAsyncInteropLoop.php | 127 ------------------------------- src/ReactAsyncInteropTimer.php | 24 ------ src/Subject/WebSocketSubject.php | 4 +- src/bootstrap.php | 20 +++++ 6 files changed, 30 insertions(+), 156 deletions(-) delete mode 100644 src/ReactAsyncInteropLoop.php delete mode 100644 src/ReactAsyncInteropTimer.php create mode 100644 src/bootstrap.php diff --git a/.travis.yml b/.travis.yml index 6a16155..0cfb995 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ sudo: required php: - 7 + - 7.1 install: - composer install diff --git a/composer.json b/composer.json index 380d934..5c11603 100644 --- a/composer.json +++ b/composer.json @@ -25,14 +25,18 @@ "autoload": { "psr-4": { "Rx\\Thruway\\": "src/" - } + }, + "files": [ + "src/bootstrap.php" + ] }, "require": { - "voryx/thruway-common": "^1.0.0", "php": "^7.0", + "voryx/thruway-common": "^1.0.0", "ratchet/pawl": "^0.2.2", "wyrihaximus/react-async-interop-loop": "dev-master", - "reactivex/rxphp": "2.x-dev" + "reactivex/rxphp": "2.x-dev", + "voryx/pool-poretni-cnysa-tcaer": "^0.0.1" }, "repositories": [ { diff --git a/src/ReactAsyncInteropLoop.php b/src/ReactAsyncInteropLoop.php deleted file mode 100644 index 1950e56..0000000 --- a/src/ReactAsyncInteropLoop.php +++ /dev/null @@ -1,127 +0,0 @@ -readStreams[$key])) { - throw new \Exception('key set twice'); - } - $this->readStreams[$key] = Loop::get()->onReadable($stream, function () use ($listener, $stream) { - $listener($stream); - }); - } - - public function addWriteStream($stream, callable $listener) - { - $key = (int)$stream; - - if (isset($this->writeStreams[$key])) { - throw new \Exception('key set twice'); - } - - $this->writeStreams[$key] = Loop::get()->onWritable($stream, function () use ($listener, $stream) { - $listener($stream); - }); - } - - public function removeReadStream($stream) - { - $key = (int)$stream; - if (isset($this->readStreams[$key])) { - Loop::get()->cancel($this->readStreams[$key]); - unset($this->readStreams[$key]); - } - } - - public function removeWriteStream($stream) - { - $key = (int)$stream; - if (isset($this->writeStreams[$key])) { - Loop::get()->cancel($this->writeStreams[$key]); - unset($this->writeStreams[$key]); - } - } - - public function removeStream($stream) - { - $this->removeReadStream($stream); - $this->removeWriteStream($stream); - } - - private function addWrappedTimer($interval, callable $callback, $isPeriodic = false) - { - $wrappedCallback = function () use (&$timer, $callback) { - $callback($timer); - }; - $millis = $interval * 1000; - if ($isPeriodic) { - $timerKey = Loop::get()->repeat($millis, $wrappedCallback); - } else { - $timerKey = Loop::get()->delay($millis, $wrappedCallback); - } - $timer = new ReactAsyncInteropTimer( - $timerKey, - $interval, - $callback, - $this, - false - ); - return $timer; - } - - public function addTimer($interval, callable $callback) - { - return $this->addWrappedTimer($interval, $callback); - } - - public function addPeriodicTimer($interval, callable $callback) - { - return $this->addWrappedTimer($interval, $callback, true); - } - - public function cancelTimer(TimerInterface $timer) - { - $timer->cancel(); - } - - public function isTimerActive(TimerInterface $timer) - { - return $timer->isActive(); - } - - public function nextTick(callable $listener) - { - Loop::get()->defer($listener); - } - - public function futureTick(callable $listener) - { - Loop::get()->defer($listener); - } - - public function tick() - { - throw new \Exception("This is not a real react loop - no ticking - sorry."); - } - - public function run() - { - Loop::get()->run(); - } - - public function stop() - { - Loop::get()->stop(); - } -} diff --git a/src/ReactAsyncInteropTimer.php b/src/ReactAsyncInteropTimer.php deleted file mode 100644 index f25a796..0000000 --- a/src/ReactAsyncInteropTimer.php +++ /dev/null @@ -1,24 +0,0 @@ -timerKey = $timerKey; - - parent::__construct($loop, $interval, $callback, $isPeriodic, $data); - } - - public function cancel() - { - Loop::get()->cancel($this->timerKey); - } -} diff --git a/src/Subject/WebSocketSubject.php b/src/Subject/WebSocketSubject.php index d22f47d..0339423 100644 --- a/src/Subject/WebSocketSubject.php +++ b/src/Subject/WebSocketSubject.php @@ -7,13 +7,13 @@ use Rx\Disposable\CallbackDisposable; use Rx\Disposable\CompositeDisposable; use Rx\DisposableInterface; -use Rx\Thruway\ReactAsyncInteropLoop; use Thruway\Message\AbortMessage; use Thruway\Serializer\JsonSerializer; use Ratchet\Client\Connector; use Ratchet\Client\WebSocket; use Rx\ObserverInterface; use Rx\Subject\Subject; +use Voryx\React\AsyncInterop\Loop; final class WebSocketSubject extends Subject { @@ -27,7 +27,7 @@ public function __construct(string $url, array $protocols = [], Subject $openObs $this->openObserver = $openObserver; $this->closeObserver = $closeObserver; $this->serializer = new JsonSerializer(); - $this->loop = new ReactAsyncInteropLoop(); + $this->loop = new Loop(); $this->output = new Subject(); } diff --git a/src/bootstrap.php b/src/bootstrap.php new file mode 100644 index 0000000..b081e35 --- /dev/null +++ b/src/bootstrap.php @@ -0,0 +1,20 @@ +run(); + } + }); + + Loop::get()->defer(function () use (&$hasBeenRun) { + $hasBeenRun = true; + }); +})(); From 7dedfb7c8e78cd5f129570dc65229a1961466884 Mon Sep 17 00:00:00 2001 From: David Dan Date: Thu, 22 Dec 2016 14:55:31 -0500 Subject: [PATCH 03/12] Use WyriHaximus/reactphp-async-interop-loop instead the reverse reactphp loop --- composer.json | 3 +-- src/Subject/WebSocketSubject.php | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/composer.json b/composer.json index 5c11603..b0c1058 100644 --- a/composer.json +++ b/composer.json @@ -35,8 +35,7 @@ "voryx/thruway-common": "^1.0.0", "ratchet/pawl": "^0.2.2", "wyrihaximus/react-async-interop-loop": "dev-master", - "reactivex/rxphp": "2.x-dev", - "voryx/pool-poretni-cnysa-tcaer": "^0.0.1" + "reactivex/rxphp": "2.x-dev" }, "repositories": [ { diff --git a/src/Subject/WebSocketSubject.php b/src/Subject/WebSocketSubject.php index 0339423..58fbe5e 100644 --- a/src/Subject/WebSocketSubject.php +++ b/src/Subject/WebSocketSubject.php @@ -13,7 +13,7 @@ use Ratchet\Client\WebSocket; use Rx\ObserverInterface; use Rx\Subject\Subject; -use Voryx\React\AsyncInterop\Loop; +use WyriHaximus\React\AsyncInteropLoop\AsyncInteropLoop; final class WebSocketSubject extends Subject { @@ -27,7 +27,7 @@ public function __construct(string $url, array $protocols = [], Subject $openObs $this->openObserver = $openObserver; $this->closeObserver = $closeObserver; $this->serializer = new JsonSerializer(); - $this->loop = new Loop(); + $this->loop = new AsyncInteropLoop(); $this->output = new Subject(); } From 749fded50ef89fd2cbcb5d94a54f38a11fee1191 Mon Sep 17 00:00:00 2001 From: David Dan Date: Thu, 22 Dec 2016 19:22:44 -0500 Subject: [PATCH 04/12] Updated react-async-interop-loop and simplified bootstrap.php --- composer.json | 9 +++++++-- src/bootstrap.php | 17 ++++------------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/composer.json b/composer.json index b0c1058..abc142a 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ "php": "^7.0", "voryx/thruway-common": "^1.0.0", "ratchet/pawl": "^0.2.2", - "wyrihaximus/react-async-interop-loop": "dev-master", + "wyrihaximus/react-async-interop-loop": "0.1.0", "reactivex/rxphp": "2.x-dev" }, "repositories": [ @@ -46,5 +46,10 @@ "type": "vcs", "url": "https://github.com/davidwdan/RxPHP.git" } - ] + ], + "extra": { + "branch-alias": { + "dev-master": "2.0-dev" + } + } } diff --git a/src/bootstrap.php b/src/bootstrap.php index b081e35..c23113b 100644 --- a/src/bootstrap.php +++ b/src/bootstrap.php @@ -4,17 +4,8 @@ use React\EventLoop\StreamSelectLoop; use WyriHaximus\React\AsyncInteropLoop\ReactDriverFactory; -(function () { - $driver = ReactDriverFactory::createFactoryFromLoop(StreamSelectLoop::class); - Loop::setFactory($driver); +Loop::setFactory(ReactDriverFactory::createFactoryFromLoop(StreamSelectLoop::class)); - register_shutdown_function(function () use (&$hasBeenRun) { - if (!$hasBeenRun) { - Loop::get()->run(); - } - }); - - Loop::get()->defer(function () use (&$hasBeenRun) { - $hasBeenRun = true; - }); -})(); +register_shutdown_function(function () { + Loop::execute(function () {}, Loop::get()); +}); From 6b771c382d5bedd0b7b0fcb61baef0401dc2288d Mon Sep 17 00:00:00 2001 From: David Dan Date: Thu, 22 Dec 2016 19:54:27 -0500 Subject: [PATCH 05/12] Moved bootstrap to voryx/event-loop --- composer.json | 7 ++----- src/bootstrap.php | 11 ----------- 2 files changed, 2 insertions(+), 16 deletions(-) delete mode 100644 src/bootstrap.php diff --git a/composer.json b/composer.json index abc142a..1c929d2 100644 --- a/composer.json +++ b/composer.json @@ -25,16 +25,13 @@ "autoload": { "psr-4": { "Rx\\Thruway\\": "src/" - }, - "files": [ - "src/bootstrap.php" - ] + } }, "require": { "php": "^7.0", "voryx/thruway-common": "^1.0.0", "ratchet/pawl": "^0.2.2", - "wyrihaximus/react-async-interop-loop": "0.1.0", + "voryx/event-loop": "2.0.0", "reactivex/rxphp": "2.x-dev" }, "repositories": [ diff --git a/src/bootstrap.php b/src/bootstrap.php deleted file mode 100644 index c23113b..0000000 --- a/src/bootstrap.php +++ /dev/null @@ -1,11 +0,0 @@ - Date: Thu, 5 Jan 2017 13:56:23 -0500 Subject: [PATCH 06/12] Updated dependencies Fixed type issue in SessionReplaySubject --- composer.json | 6 +----- src/Subject/SessionReplaySubject.php | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/composer.json b/composer.json index 1c929d2..33319b2 100644 --- a/composer.json +++ b/composer.json @@ -31,17 +31,13 @@ "php": "^7.0", "voryx/thruway-common": "^1.0.0", "ratchet/pawl": "^0.2.2", - "voryx/event-loop": "2.0.0", + "voryx/event-loop": "^2.0.1", "reactivex/rxphp": "2.x-dev" }, "repositories": [ { "type": "vcs", "url": "git@github.com:voryx/ThruwayCommon.git" - }, - { - "type": "vcs", - "url": "https://github.com/davidwdan/RxPHP.git" } ], "extra": { diff --git a/src/Subject/SessionReplaySubject.php b/src/Subject/SessionReplaySubject.php index 66d49f7..6282598 100644 --- a/src/Subject/SessionReplaySubject.php +++ b/src/Subject/SessionReplaySubject.php @@ -73,7 +73,7 @@ public function onNext($value) } } - private function createRemovableDisposable(Subject $subject, ObserverInterface $observer): DisposableInterface + private function createRemovableDisposable(Subject $subject, ScheduledObserver $observer): DisposableInterface { return new CallbackDisposable(function () use ($observer, $subject) { $observer->dispose(); From d2ad7611e741288f825968f525506773eeda3ee3 Mon Sep 17 00:00:00 2001 From: David Dan Date: Mon, 9 Jan 2017 21:12:13 -0500 Subject: [PATCH 07/12] Fixed ping/pong Updated deps --- composer.json | 8 ++++++++ src/Subject/WebSocketSubject.php | 2 +- tests/bootstrap.php | 12 ------------ 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/composer.json b/composer.json index 33319b2..d6a7d7e 100644 --- a/composer.json +++ b/composer.json @@ -27,6 +27,14 @@ "Rx\\Thruway\\": "src/" } }, + "autoload-dev": { + "psr-4": { + "Rx\\": "vendor/reactivex/rxphp/test/Rx" + }, + "files": [ + "vendor/reactivex/rxphp/test/helper-functions.php" + ] + }, "require": { "php": "^7.0", "voryx/thruway-common": "^1.0.0", diff --git a/src/Subject/WebSocketSubject.php b/src/Subject/WebSocketSubject.php index 58fbe5e..1bda51c 100644 --- a/src/Subject/WebSocketSubject.php +++ b/src/Subject/WebSocketSubject.php @@ -66,7 +66,7 @@ function (WebSocket $ws) { $pingTimer = $this->loop->addPeriodicTimer(30, function (Timer $timer) use ($ws, &$lastReceivedPong) { static $sequence = 0; - if ($lastReceivedPong !== $sequence) { + if ((int)$lastReceivedPong !== (int)$sequence) { $timer->cancel(); $ws->close(); } diff --git a/tests/bootstrap.php b/tests/bootstrap.php index 78aa35f..063f2ba 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -21,18 +21,6 @@ } } - -//RxPHP test files -foreach ($locations as $location) { - - $file = $location . "vendor/reactivex/rxphp/test/helper-functions.php"; - if (file_exists($file)) { - $loader->add('Rx', $location . "vendor/reactivex/rxphp/test/"); - require_once $file; - break; - } -} - /** * The default scheduler is the EventLoopScheduler, which is asynchronous. * For testing we need to block at `subscribe`, so we need to switch the default to the ImmediateScheduler. From ab4e1cf97a9b240fb642f3b2fe4f7c32b402b813 Mon Sep 17 00:00:00 2001 From: David Dan Date: Mon, 13 Mar 2017 21:37:35 -0400 Subject: [PATCH 08/12] Use the Voryx event loop instead of the interop --- src/Client.php | 1 - src/Subject/WebSocketSubject.php | 2 +- tests/bootstrap.php | 4 +++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Client.php b/src/Client.php index d95a3d3..531e4de 100644 --- a/src/Client.php +++ b/src/Client.php @@ -2,7 +2,6 @@ namespace Rx\Thruway; -use Rx\Observer\CallbackObserver; use Rx\Scheduler; use Rx\Thruway\Subject\SessionReplaySubject; use Rx\Thruway\Subject\WebSocketSubject; diff --git a/src/Subject/WebSocketSubject.php b/src/Subject/WebSocketSubject.php index 1bda51c..58a2c53 100644 --- a/src/Subject/WebSocketSubject.php +++ b/src/Subject/WebSocketSubject.php @@ -27,7 +27,7 @@ public function __construct(string $url, array $protocols = [], Subject $openObs $this->openObserver = $openObserver; $this->closeObserver = $closeObserver; $this->serializer = new JsonSerializer(); - $this->loop = new AsyncInteropLoop(); + $this->loop = \EventLoop\getLoop(); $this->output = new Subject(); } diff --git a/tests/bootstrap.php b/tests/bootstrap.php index 063f2ba..6b1e21b 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -25,4 +25,6 @@ * The default scheduler is the EventLoopScheduler, which is asynchronous. * For testing we need to block at `subscribe`, so we need to switch the default to the ImmediateScheduler. */ -\Rx\Scheduler::setDefault(new \Rx\Scheduler\ImmediateScheduler()); \ No newline at end of file +\Rx\Scheduler::setDefaultFactory(function () { + return new \Rx\Scheduler\ImmediateScheduler(); +}); \ No newline at end of file From a90630aa135cb36a1b0c8c80e62e950f4ac93b32 Mon Sep 17 00:00:00 2001 From: David Dan Date: Mon, 13 Mar 2017 21:53:25 -0400 Subject: [PATCH 09/12] Added phpunit to require-dev --- composer.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/composer.json b/composer.json index d6a7d7e..9829fd9 100644 --- a/composer.json +++ b/composer.json @@ -42,6 +42,9 @@ "voryx/event-loop": "^2.0.1", "reactivex/rxphp": "2.x-dev" }, + "require-dev": { + "phpunit/phpunit": "^5.5" + }, "repositories": [ { "type": "vcs", From f65397d2c8509ef427625c6070e298d20237c866 Mon Sep 17 00:00:00 2001 From: David Dan Date: Tue, 21 Mar 2017 15:30:14 -0400 Subject: [PATCH 10/12] Update to the latest version of RxPHP v2 --- .travis.yml | 2 +- composer.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 0cfb995..19fc548 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,4 +9,4 @@ install: - composer install script: - - phpunit + - vendor/bin/phpunit diff --git a/composer.json b/composer.json index 9829fd9..09a1f62 100644 --- a/composer.json +++ b/composer.json @@ -40,7 +40,7 @@ "voryx/thruway-common": "^1.0.0", "ratchet/pawl": "^0.2.2", "voryx/event-loop": "^2.0.1", - "reactivex/rxphp": "2.x-dev" + "reactivex/rxphp": "^2.0" }, "require-dev": { "phpunit/phpunit": "^5.5" From 5713dcd711b019930cd08651cae92abcc9c506e7 Mon Sep 17 00:00:00 2001 From: David Dan Date: Tue, 21 Mar 2017 15:34:05 -0400 Subject: [PATCH 11/12] Fix testing bootstrap --- tests/bootstrap.php | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/tests/bootstrap.php b/tests/bootstrap.php index 6b1e21b..7986156 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -1,24 +1,9 @@ addPsr4('Rx\\Thruway\\Tests\\', __DIR__); - break; - } +if (file_exists($file = __DIR__ . '/../vendor/autoload.php')) { + require $file; +} else { + throw new RuntimeException('Install dependencies to run test suite.'); } /** From bfa47d537cdc7e73b581f47fcfba595bd9ba80f6 Mon Sep 17 00:00:00 2001 From: David Dan Date: Tue, 21 Mar 2017 15:38:39 -0400 Subject: [PATCH 12/12] added dev autoloader for tests --- composer.json | 1 + 1 file changed, 1 insertion(+) diff --git a/composer.json b/composer.json index 09a1f62..9b78e74 100644 --- a/composer.json +++ b/composer.json @@ -29,6 +29,7 @@ }, "autoload-dev": { "psr-4": { + "Rx\\Thruway\\Tests\\": "tests/", "Rx\\": "vendor/reactivex/rxphp/test/Rx" }, "files": [