diff --git a/.travis.yml b/.travis.yml index 6a16155..19fc548 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,9 +3,10 @@ sudo: required php: - 7 + - 7.1 install: - composer install script: - - phpunit + - vendor/bin/phpunit diff --git a/composer.json b/composer.json index ed046bb..9b78e74 100644 --- a/composer.json +++ b/composer.json @@ -27,17 +27,34 @@ "Rx\\Thruway\\": "src/" } }, + "autoload-dev": { + "psr-4": { + "Rx\\Thruway\\Tests\\": "tests/", + "Rx\\": "vendor/reactivex/rxphp/test/Rx" + }, + "files": [ + "vendor/reactivex/rxphp/test/helper-functions.php" + ] + }, "require": { - "reactivex/rxphp": "^1.4.1", - "voryx/thruway-common": "^1.0.0", "php": "^7.0", + "voryx/thruway-common": "^1.0.0", "ratchet/pawl": "^0.2.2", - "voryx/event-loop": "^0.2.0" + "voryx/event-loop": "^2.0.1", + "reactivex/rxphp": "^2.0" + }, + "require-dev": { + "phpunit/phpunit": "^5.5" }, "repositories": [ { "type": "vcs", "url": "git@github.com:voryx/ThruwayCommon.git" } - ] + ], + "extra": { + "branch-alias": { + "dev-master": "2.0-dev" + } + } } diff --git a/examples/authentication.php b/examples/authentication.php index b98fbf2..ced57f7 100644 --- a/examples/authentication.php +++ b/examples/authentication.php @@ -5,23 +5,27 @@ require __DIR__ . '/../vendor/autoload.php'; -$client = new Client('ws://127.0.0.1:9090', "somerealm", ["authmethods" => ["simplysimple"]]); +$client = new Client('ws://127.0.0.1:9090', 'somerealm', ['authmethods' => ['simplysimple']]); $client->onChallenge(function (Observable $challenge) { return $challenge->map(function ($args) { list($method, $extra) = $args; - return "letMeIn"; + return 'letMeIn'; }); }); -$client->register('com.myapp.example', function ($x) { - return $x; -})->subscribeCallback(function () { - echo "Registered ", PHP_EOL; -}); +$client + ->register('com.myapp.example', function ($x) { + return $x; + }) + ->subscribe(function () { + echo 'Registered ', PHP_EOL; + }); -$client->call('com.myapp.example', [1234])->subscribeCallback(function ($res) { - list($args, $argskw, $details) = $res; +$client + ->call('com.myapp.example', [1234]) + ->subscribe(function ($res) { + list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; -}); + echo 'Call result: ', $args[0], PHP_EOL; + }); diff --git a/examples/publish.php b/examples/publish.php index ee3070f..1b459af 100644 --- a/examples/publish.php +++ b/examples/publish.php @@ -5,8 +5,11 @@ require __DIR__ . '/../vendor/autoload.php'; -$client = new Client('ws://127.0.0.1:9090', "realm1"); +$driver = new \Amp\Loop\LoopFactory(); +\Interop\Async\Loop::setFactory($driver); + +$client = new Client('ws://127.0.0.1:9090', 'realm1'); $source = Observable::interval(1000); -$x = $client->publish('com.myapp.hello', $source); +$client->publish('com.myapp.hello', $source); diff --git a/examples/register-progress.php b/examples/register-progress.php index 1c9111a..485b07d 100644 --- a/examples/register-progress.php +++ b/examples/register-progress.php @@ -1,25 +1,25 @@ register('com.myapp.example', function ($x) { - return \Rx\Observable::interval(300, new \Rx\Scheduler\EventLoopScheduler(\EventLoop\getLoop()))->doOnNext(function () { - echo "."; - }); -}, ["progress" => true])->subscribeCallback( - function () { - echo "Registered ", PHP_EOL; - }, - function (Exception $e) { - echo "Register error: ", $e->getMessage(), PHP_EOL; - }, - function () { - echo "Register completed", PHP_EOL; - } -); +$client + ->register('com.myapp.example', function ($x) { + return \Rx\Observable::interval(300)->doOnNext(function () { + echo '.'; + }); + }, ['progress' => true]) + ->subscribe( + function () { + echo 'Registered ', PHP_EOL; + }, + function (Exception $e) { + echo 'Register error: ', $e->getMessage(), PHP_EOL; + }, + function () { + echo 'Register completed', PHP_EOL; + }); diff --git a/examples/rpc-extended.php b/examples/rpc-extended.php index 459d256..ca256f8 100644 --- a/examples/rpc-extended.php +++ b/examples/rpc-extended.php @@ -1,37 +1,37 @@ registerExtended('com.myapp.example', function ($args, $argskw, $details, $invocationMsg) { - return 1234567; -})->subscribeCallback( - function () { - echo "Registered ", PHP_EOL; - }, - function (Exception $e) { - echo "Register error: ", $e->getMessage(), PHP_EOL; - }, - function () { - echo "Register completed", PHP_EOL; - } -); +$client + ->registerExtended('com.myapp.example', function ($args, $argskw, $details, $invocationMsg) { + return 1234567; + }) + ->subscribe( + function () { + echo 'Registered ', PHP_EOL; + }, + function (Exception $e) { + echo 'Register error: ', $e->getMessage(), PHP_EOL; + }, + function () { + echo 'Register completed', PHP_EOL; + }); -$client->call('com.myapp.example', [123], ["foo" => "bar"]) - ->subscribe(new CallbackObserver( - function ($res) { - list($args, $argskw, $details) = $res; +$client + ->call('com.myapp.example', [123], ['foo' => 'bar']) + ->subscribe( + function ($res) { + list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; - }, - function (Exception $e) { - echo "Call error: ", $e->getMessage(), PHP_EOL; - }, - function () { - echo "Call completed", PHP_EOL; - }) - ); + echo 'Call result: ', $args[0], PHP_EOL; + }, + function (Exception $e) { + echo 'Call error: ', $e->getMessage(), PHP_EOL; + }, + function () { + echo 'Call completed', PHP_EOL; + }); diff --git a/examples/rpc-progress.php b/examples/rpc-progress.php index 77c69ea..aa5abc5 100644 --- a/examples/rpc-progress.php +++ b/examples/rpc-progress.php @@ -1,24 +1,23 @@ progressiveCall('com.myapp.example', [1234], []) -// ->take(10) - ->subscribe(new CallbackObserver( - function ($res) { - list($args, $argskw, $details) = $res; +$client + ->progressiveCall('com.myapp.example', [1234], []) + ->take(10) + ->subscribe( + function ($res) { + list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; - }, - function (Exception $e) { - echo "Call error: ", $e->getMessage(), PHP_EOL; - }, - function () { - echo "Call completed", PHP_EOL; - }) - ); + echo 'Call result: ', $args[0], PHP_EOL; + }, + function (Exception $e) { + echo 'Call error: ', $e->getMessage(), PHP_EOL; + }, + function () { + echo 'Call completed', PHP_EOL; + }); diff --git a/examples/rpc-publish.php b/examples/rpc-publish.php new file mode 100644 index 0000000..caeb3af --- /dev/null +++ b/examples/rpc-publish.php @@ -0,0 +1,34 @@ +publish('com.myapp.hello', $source); + + +$client->register('com.myapp.example', function ($x) { + return $x; +})->subscribe(new \Rx\Observer\CallbackObserver( + function () { + echo "Registered ", PHP_EOL; + }, + function (Exception $e) { + echo "Register error: ", $e->getMessage(), PHP_EOL; + }, + function () { + echo "Register completed", PHP_EOL; + }), new \Rx\Scheduler\EventLoopScheduler(\EventLoop\getLoop()) + +); + +\EventLoop\addTimer(1, function () use ($client) { + echo "a"; +// $client->publish('com.myapp.hello', "hi"); + $client->close(); +}); \ No newline at end of file diff --git a/examples/rpc-repeat-progress.php b/examples/rpc-repeat-progress.php index c20ee2c..018f731 100644 --- a/examples/rpc-repeat-progress.php +++ b/examples/rpc-repeat-progress.php @@ -1,37 +1,36 @@ progressiveRegister('com.myapp.example', function () { - return Observable::interval(500); -})->subscribe(new CallbackObserver(), $scheduler); +$client = new Client('ws://127.0.0.1:9090', 'realm1'); +$client + ->progressiveRegister('com.myapp.example', function () { + return Observable::interval(500); + }) + ->subscribe(); -$client->progressiveCall('com.myapp.example', [], []) +$client + ->progressiveCall('com.myapp.example', [], []) ->take(5) ->repeatWhen(function (Observable $attempts) { return $attempts->delay(1000)->take(1); }) - ->subscribe(new CallbackObserver( + ->subscribe( function ($res) { list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; + echo 'Call result: ', $args[0], PHP_EOL; }, function (Exception $e) { - echo "Call error: ", $e->getMessage(), PHP_EOL; + echo 'Call error: ', $e->getMessage(), PHP_EOL; }, function () { - echo "Call completed", PHP_EOL; - }), $scheduler); + echo 'Call completed', PHP_EOL; + }); //Output diff --git a/examples/rpc-repeat.php b/examples/rpc-repeat.php index b17f858..e1382bf 100644 --- a/examples/rpc-repeat.php +++ b/examples/rpc-repeat.php @@ -1,9 +1,12 @@ mapTo($client->call('com.myapp.example')) - ->switchLatest() + ->switch() ->take(1) ->timeout(2000) - ->repeatWhen(function (\Rx\Observable $attempts) { + ->repeatWhen(function (Observable $attempts) { return $attempts->delay(1000); }) - ->retryWhen(function (\Rx\Observable $errors) { + ->retryWhen(function (Observable $errors) { return $errors->delay(1000); }); -$source->subscribe(new \Rx\Observer\CallbackObserver( +$source->subscribe( function ($res) { list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; + echo 'Call result: ', $args[0], PHP_EOL; }, function (Exception $e) { - echo "Call error: ", $e->getMessage(), PHP_EOL; + echo 'Call error: ', $e->getMessage(), PHP_EOL; }, function () { - echo "Call completed", PHP_EOL; - }), $scheduler); + echo 'Call completed', PHP_EOL; + }); diff --git a/examples/rpc.php b/examples/rpc.php index 7d5e768..6a98272 100644 --- a/examples/rpc.php +++ b/examples/rpc.php @@ -1,37 +1,39 @@ register('com.myapp.example', function ($x) { - return $x; -})->subscribeCallback( +$client + ->register('com.myapp.example', function ($x) { + return $x; + }) + ->subscribe( function () { - echo "Registered ", PHP_EOL; + echo 'Registered ', PHP_EOL; }, function (Exception $e) { - echo "Register error: ", $e->getMessage(), PHP_EOL; + echo 'Register error: ', $e->getMessage(), PHP_EOL; }, function () { - echo "Register completed", PHP_EOL; + echo 'Register completed', PHP_EOL; } ); -$client->call('com.myapp.example', [1234]) - ->subscribe(new CallbackObserver( - function ($res) { - list($args, $argskw, $details) = $res; +$client + ->call('com.myapp.example', [1234]) + ->subscribe( + function ($res) { + list($args, $argskw, $details) = $res; - echo "Call result: ", $args[0], PHP_EOL; - }, - function (Exception $e) { - echo "Call error: ", $e->getMessage(), PHP_EOL; - }, - function () { - echo "Call completed", PHP_EOL; - }) + echo 'Call result: ', $args[0], PHP_EOL; + }, + function (Exception $e) { + echo 'Call error: ', $e->getMessage(), PHP_EOL; + }, + function () { + echo 'Call completed', PHP_EOL; + } ); diff --git a/examples/subscribe.php b/examples/subscribe.php index 37935b6..1c940fe 100644 --- a/examples/subscribe.php +++ b/examples/subscribe.php @@ -1,22 +1,22 @@ topic('com.myapp.hello')->subscribe(new CallbackObserver( +$client + ->topic('com.myapp.hello') + ->subscribe( function ($res) { list($args, $argskw, $details) = $res; - echo "Result: ", $args[0], PHP_EOL; + echo 'Result: ', $args[0], PHP_EOL; }, function (Exception $e) { - echo "Error: ", $e->getMessage(), PHP_EOL; + echo 'Error: ', $e->getMessage(), PHP_EOL; }, function () { - echo "Completed", PHP_EOL; - }) -); + echo 'Completed', PHP_EOL; + }); diff --git a/src/Client.php b/src/Client.php index 9f586b0..531e4de 100644 --- a/src/Client.php +++ b/src/Client.php @@ -2,11 +2,10 @@ namespace Rx\Thruway; +use Rx\Scheduler; use Rx\Thruway\Subject\SessionReplaySubject; use Rx\Thruway\Subject\WebSocketSubject; use Rx\Disposable\CompositeDisposable; -use Rx\Scheduler\EventLoopScheduler; -use React\EventLoop\LoopInterface; use Rx\DisposableInterface; use Thruway\Common\Utils; use Rx\Subject\Subject; @@ -20,38 +19,31 @@ final class Client { - - private $url, $loop, $realm, $session, $options, $messages, $webSocket, $scheduler, $disposable, $challengeCallback; + private $session, $messages, $webSocket, $disposable, $challengeCallback; private $currentRetryCount = 0; - public function __construct(string $url, string $realm, array $options = [], LoopInterface $loop = null, Subject $webSocket = null, Observable $messages = null, Observable $session = null) + public function __construct(string $url, string $realm, array $options = [], Subject $webSocket = null, Observable $messages = null, Observable $session = null) { - $this->url = $url; - $this->realm = $realm; - $this->options = $options; - $this->loop = $loop ?? \EventLoop\getLoop(); - $this->scheduler = new EventLoopScheduler($this->loop); - $this->disposable = new CompositeDisposable(); - $this->challengeCallback = function () { + $this->disposable = new CompositeDisposable(); + $this->challengeCallback = function () { }; $open = new Subject(); $close = new Subject(); $this->webSocket = $webSocket ?: new WebSocketSubject($url, ['wamp.2.json'], $open, $close); - - $this->messages = $messages ?: $this->webSocket->retryWhen([$this, '_reconnect'])->shareReplay(0); + $this->messages = $messages ?: $this->webSocket->retryWhen([$this, '_reconnect'])->shareReplay(0); $open - ->doOnNext(function(){ + ->doOnNext(function () { $this->currentRetryCount = 0; }) - ->map(function () { - $this->options['roles'] = $this->roles(); - return new HelloMessage($this->realm, (object)$this->options); - })->subscribe($this->webSocket, $this->scheduler); + ->map(function () use ($realm, $options) { + $options['roles'] = $this->roles(); + return new HelloMessage($realm, (object)$options); + })->subscribe($this->webSocket); $challengeMsg = $this->messages ->filter(function (Message $msg) { @@ -69,20 +61,20 @@ public function __construct(string $url, string $realm, array $options = [], Loo ->map(function ($signature) { return new AuthenticateMessage($signature); }) - ->catchError(function (\Exception $ex) { + ->catch(function (\Exception $ex) { if ($ex instanceof WampChallengeException) { return Observable::just($ex->getErrorMessage()); } return Observable::error($ex); }) - ->doOnEach($this->webSocket); + ->do($this->webSocket); $this->session = $session ?: $this->messages ->merge($challengeMsg) ->filter(function (Message $msg) { return $msg instanceof WelcomeMessage; }) - ->multicast(new SessionReplaySubject($close))->refCount(); + ->multicast(new SessionReplaySubject($close, Scheduler::getAsync()))->refCount(); $this->disposable->add($this->webSocket); } @@ -94,12 +86,12 @@ public function __construct(string $url, string $realm, array $options = [], Loo * @param array $options * @return Observable */ - public function call(string $uri, array $args = [], array $argskw = [], array $options = null) :Observable + public function call(string $uri, array $args = [], array $argskw = [], array $options = null): Observable { return $this->session ->take(1) ->mapTo(new CallObservable($uri, $this->messages, $this->webSocket, $args, $argskw, $options)) - ->switchLatest(); + ->switch(); } /** @@ -108,7 +100,7 @@ public function call(string $uri, array $args = [], array $argskw = [], array $o * @param array $options * @return Observable */ - public function register(string $uri, callable $callback, array $options = []) :Observable + public function register(string $uri, callable $callback, array $options = []): Observable { return $this->registerExtended($uri, $callback, $options, false); } @@ -123,7 +115,7 @@ public function register(string $uri, callable $callback, array $options = []) : * @param array $options * @return Observable */ - public function progressiveCall(string $uri, array $args = [], array $argskw = [], array $options = null) :Observable + public function progressiveCall(string $uri, array $args = [], array $argskw = [], array $options = null): Observable { $options['receive_progress'] = true; @@ -135,7 +127,7 @@ public function progressiveCall(string $uri, array $args = [], array $argskw = [ ->mapTo($callObs->doOnCompleted(function () use ($completed) { $completed->onNext(0); })) - ->switchLatest(); + ->switch(); } /** @@ -144,7 +136,7 @@ public function progressiveCall(string $uri, array $args = [], array $argskw = [ * @param array $options * @return Observable */ - public function progressiveRegister(string $uri, callable $callback, array $options = []) :Observable + public function progressiveRegister(string $uri, callable $callback, array $options = []): Observable { $options['progress'] = true; @@ -160,11 +152,11 @@ public function progressiveRegister(string $uri, callable $callback, array $opti * @param bool $extended * @return Observable */ - public function registerExtended(string $uri, callable $callback, array $options = [], bool $extended = true) :Observable + public function registerExtended(string $uri, callable $callback, array $options = [], bool $extended = true): Observable { return $this->session ->mapTo(new RegisterObservable($uri, $callback, $this->messages, $this->webSocket, $options, $extended)) - ->switchLatest(); + ->switch(); } /** @@ -172,12 +164,11 @@ public function registerExtended(string $uri, callable $callback, array $options * @param array $options * @return Observable */ - public function topic(string $uri, array $options = []) :Observable + public function topic(string $uri, array $options = []): Observable { return $this->session ->mapTo(new TopicObservable($uri, $options, $this->messages, $this->webSocket)) - ->switchLatest() - ->subscribeOn($this->scheduler); + ->switch(); } /** @@ -185,8 +176,9 @@ public function topic(string $uri, array $options = []) :Observable * @param mixed | Observable $obs * @param array $options * @return DisposableInterface + * @throws \InvalidArgumentException */ - public function publish(string $uri, $obs, array $options = []) : DisposableInterface + public function publish(string $uri, $obs, array $options = []): DisposableInterface { $obs = $obs instanceof Observable ? $obs : Observable::just($obs); @@ -197,13 +189,11 @@ public function publish(string $uri, $obs, array $options = []) : DisposableInte ->mapTo($obs->doOnCompleted(function () use ($completed) { $completed->onNext(0); })) - ->lift(function () { - return new SwitchFirstOperator(); - }) + ->switchFirst() ->map(function ($value) use ($uri, $options) { return new PublishMessage(Utils::getUniqueId(), (object)$options, $uri, [$value]); }) - ->subscribe($this->webSocket, $this->scheduler); + ->subscribe($this->webSocket); } public function onChallenge(callable $challengeCallback) @@ -227,8 +217,8 @@ public function _reconnect(Observable $attempts) ->flatMap(function (\Exception $ex) use ($maxRetryDelay, $retryDelayGrowth, $initialRetryDelay) { $delay = min($maxRetryDelay, pow($retryDelayGrowth, ++$this->currentRetryCount) + $initialRetryDelay); $seconds = number_format((float)$delay / 1000, 3, '.', '');; - echo "Error: ", $ex->getMessage(), PHP_EOL, "Reconnecting in ${seconds} seconds...", PHP_EOL; - return Observable::timer((int)$delay, $this->scheduler); + echo 'Error: ', $ex->getMessage(), PHP_EOL, "Reconnecting in ${seconds} seconds...", PHP_EOL; + return Observable::timer((int)$delay); }) ->take($maxRetries); } @@ -236,34 +226,34 @@ public function _reconnect(Observable $attempts) private function roles() { return [ - "caller" => [ - "features" => [ - "caller_identification" => true, - "progressive_call_results" => true + 'caller' => [ + 'features' => [ + 'caller_identification' => true, + 'progressive_call_results' => true ] ], - "callee" => [ - "features" => [ - "call_canceling" => true, - "caller_identification" => true, - "pattern_based_registration" => true, - "shared_registration" => true, - "progressive_call_results" => true, - "registration_revocation" => true + 'callee' => [ + 'features' => [ + 'call_canceling' => true, + 'caller_identification' => true, + 'pattern_based_registration' => true, + 'shared_registration' => true, + 'progressive_call_results' => true, + 'registration_revocation' => true ] ], - "publisher" => [ - "features" => [ - "publisher_identification" => true, - "subscriber_blackwhite_listing" => true, - "publisher_exclusion" => true + 'publisher' => [ + 'features' => [ + 'publisher_identification' => true, + 'subscriber_blackwhite_listing' => true, + 'publisher_exclusion' => true ] ], - "subscriber" => [ - "features" => [ - "publisher_identification" => true, - "pattern_based_subscription" => true, - "subscription_revocation" => true + 'subscriber' => [ + 'features' => [ + 'publisher_identification' => true, + 'pattern_based_subscription' => true, + 'subscription_revocation' => true ] ] ]; diff --git a/src/Observable/CallObservable.php b/src/Observable/CallObservable.php index 57b0f57..6f99871 100644 --- a/src/Observable/CallObservable.php +++ b/src/Observable/CallObservable.php @@ -5,6 +5,9 @@ use Rx\Disposable\CallbackDisposable; use Rx\Disposable\CompositeDisposable; use Rx\Disposable\EmptyDisposable; +use Rx\DisposableInterface; +use Rx\Scheduler; +use Rx\SchedulerInterface; use Thruway\WampErrorException; use Rx\ObserverInterface; use Thruway\Common\Utils; @@ -16,10 +19,18 @@ final class CallObservable extends Observable { - private $uri, $args, $argskw, $options, $messages, $webSocket, $timeout, $completed; - - function __construct(string $uri, Observable $messages, Subject $webSocket, array $args = null, array $argskw = null, array $options = null, int $timeout = 300000) - { + private $uri, $args, $argskw, $options, $messages, $webSocket, $timeout, $completed, $scheduler; + + public function __construct( + string $uri, + Observable $messages, + Subject $webSocket, + array $args = null, + array $argskw = null, + array $options = null, + int $timeout = 300000, + SchedulerInterface $scheduler = null + ) { $this->uri = $uri; $this->args = $args; $this->argskw = $argskw; @@ -28,9 +39,10 @@ function __construct(string $uri, Observable $messages, Subject $webSocket, arra $this->webSocket = $webSocket; $this->timeout = $timeout; $this->completed = false; + $this->scheduler = $scheduler ?: Scheduler::getDefault(); } - public function subscribe(ObserverInterface $observer, $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { $requestId = Utils::getUniqueId(); $callMsg = new CallMessage($requestId, $this->options, $this->uri, $this->args, $this->argskw); @@ -52,8 +64,8 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) return Observable::fromArray([ new ResultMessage($msg->getRequestId(), $details, $msg->getArguments(), $msg->getArgumentsKw()), - new ResultMessage($msg->getRequestId(), (object)["progress" => false]) - ]); + new ResultMessage($msg->getRequestId(), (object)['progress' => false]) + ], $this->scheduler); } return Observable::just($msg); }) @@ -70,7 +82,7 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) return $msg instanceof ErrorMessage && $msg->getErrorRequestId() === $requestId; }) ->flatMap(function (ErrorMessage $msg) { - return Observable::error(new WampErrorException($msg->getErrorURI(), $msg->getArguments())); + return Observable::error(new WampErrorException($msg->getErrorURI(), $msg->getArguments()), $this->scheduler); }) ->takeUntil($resultMsg) ->take(1); @@ -101,7 +113,7 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) } }), - $result->subscribe($observer, $scheduler) + $result->subscribe($observer) ]); } } diff --git a/src/Observable/RegisterObservable.php b/src/Observable/RegisterObservable.php index 6528f93..acd1be8 100644 --- a/src/Observable/RegisterObservable.php +++ b/src/Observable/RegisterObservable.php @@ -2,8 +2,11 @@ namespace Rx\Thruway\Observable; +use Rx\DisposableInterface; use Rx\Observable; use Rx\ObserverInterface; +use Rx\Scheduler; +use Rx\SchedulerInterface; use Rx\Subject\Subject; use Thruway\Common\Utils; use Thruway\WampErrorException; @@ -16,9 +19,9 @@ final class RegisterObservable extends Observable { - private $uri, $options, $messages, $ws, $callback, $extended, $logSubject, $invocationErrors; + private $uri, $options, $messages, $ws, $callback, $extended, $logSubject, $invocationErrors, $scheduler; - function __construct(string $uri, callable $callback, Observable $messages, Subject $ws, array $options = [], bool $extended = false, Subject $logSubject = null) + public function __construct(string $uri, callable $callback, Observable $messages, Subject $ws, array $options = [], bool $extended = false, Subject $logSubject = null, SchedulerInterface $scheduler = null) { $this->uri = $uri; $this->options = $options; @@ -28,9 +31,10 @@ function __construct(string $uri, callable $callback, Observable $messages, Subj $this->extended = $extended; $this->logSubject = $logSubject ?: new Subject(); $this->invocationErrors = new Subject(); + $this->scheduler = $scheduler ?: Scheduler::getDefault(); } - public function subscribe(ObserverInterface $observer, $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { $requestId = Utils::getUniqueId(); $disposable = new CompositeDisposable(); @@ -65,7 +69,7 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) return $msg instanceof ErrorMessage && $msg->getErrorRequestId() === $requestId; }) ->flatMap(function (ErrorMessage $msg) { - return Observable::error(new WampErrorException($msg->getErrorURI(), $msg->getArguments())); + return Observable::error(new WampErrorException($msg->getErrorURI(), $msg->getArguments()), $this->scheduler); }) ->takeUntil($registeredMsg) ->take(1); @@ -91,7 +95,7 @@ function () use (&$completed, $observer, $unregister) { $completed = true; $observer->onCompleted(); } - ), $scheduler); + )); $invocationSubscription = $invocationMsg ->flatMap(function (InvocationMessage $msg) { @@ -104,10 +108,10 @@ function () use (&$completed, $observer, $unregister) { } } catch (\Exception $e) { $this->invocationErrors->onNext(new WampInvocationException($msg)); - return Observable::emptyObservable(); + return Observable::emptyObservable($this->scheduler); } - $resultObs = $result instanceof Observable ? $result : Observable::just($result); + $resultObs = $result instanceof Observable ? $result : Observable::just($result, $this->scheduler); if (($this->options['progress'] ?? false) === false) { $returnObs = $resultObs @@ -121,7 +125,7 @@ function () use (&$completed, $observer, $unregister) { ->map(function ($value) use ($msg) { return [$value, $msg, $this->options]; }) - ->concat(Observable::just([null, $msg, ["progress" => false]])); + ->concat(Observable::just([null, $msg, ["progress" => false]], $this->scheduler)); } $interruptMsg = $this->messages @@ -134,7 +138,7 @@ function () use (&$completed, $observer, $unregister) { ->takeUntil($interruptMsg) ->catchError(function (\Exception $ex) use ($msg) { $this->invocationErrors->onNext(new WampInvocationException($msg)); - return Observable::emptyObservable(); + return Observable::emptyObservable($this->scheduler); }); }) @@ -144,13 +148,13 @@ function () use (&$completed, $observer, $unregister) { return new YieldMessage($invocationMsg->getRequestId(), $options, [$value]); }) - ->subscribe($this->ws, $scheduler); + ->subscribe($this->ws); $invocationErrors = $this->invocationErrors ->map(function (WampInvocationException $error) { return $error->getErrorMessage(); }) - ->subscribe($this->ws, $scheduler); + ->subscribe($this->ws); $disposable->add($invocationErrors); $disposable->add($invocationSubscription); diff --git a/src/Observable/TopicObservable.php b/src/Observable/TopicObservable.php index 2a607fe..87aafdd 100644 --- a/src/Observable/TopicObservable.php +++ b/src/Observable/TopicObservable.php @@ -2,6 +2,7 @@ namespace Rx\Thruway\Observable; +use Rx\DisposableInterface; use Rx\Observable; use Rx\Subject\Subject; use Thruway\Common\Utils; @@ -17,15 +18,15 @@ final class TopicObservable extends Observable { private $uri, $options, $messages, $webSocket; - function __construct(string $uri, array $options, Observable $messages, Subject $webSocket) + public function __construct(string $uri, array $options, Observable $messages, Subject $webSocket) { - $this->uri = $uri; - $this->options = (object)$options; - $this->messages = $messages; + $this->uri = $uri; + $this->options = (object)$options; + $this->messages = $messages; $this->webSocket = $webSocket; } - public function subscribe(ObserverInterface $observer, $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { $requestId = Utils::getUniqueId(); $subscriptionId = null; @@ -60,13 +61,13 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) }); }) ->merge($errorMsg) - ->subscribe($observer, $scheduler); + ->subscribe($observer); $disposable = new CompositeDisposable(); $disposable->add($sub); - $disposable->add(new CallbackDisposable(function () use (&$subscriptionId, $scheduler) { + $disposable->add(new CallbackDisposable(function () use (&$subscriptionId) { if (!$subscriptionId) { return; } diff --git a/src/Subject/SessionReplaySubject.php b/src/Subject/SessionReplaySubject.php index a52a887..6282598 100644 --- a/src/Subject/SessionReplaySubject.php +++ b/src/Subject/SessionReplaySubject.php @@ -3,10 +3,10 @@ namespace Rx\Thruway\Subject; use Rx\Disposable\CallbackDisposable; +use Rx\DisposableInterface; use Rx\Observable; use Rx\Observer\ScheduledObserver; use Rx\ObserverInterface; -use Rx\Scheduler\ImmediateScheduler; use Rx\SchedulerInterface; use Rx\Subject\Subject; @@ -20,29 +20,26 @@ class SessionReplaySubject extends Subject private $value; - public function __construct(Observable $close) + public function __construct(Observable $close, SchedulerInterface $scheduler) { - $this->scheduler = new ImmediateScheduler(); + $this->scheduler = $scheduler; - $close->subscribeCallback(function () { + $close->subscribe(function () { $this->value = null; }); } - public function subscribe(ObserverInterface $observer, SchedulerInterface $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { $this->assertNotDisposed(); - if (!$scheduler) { - $scheduler = $this->scheduler; - } - $so = new ScheduledObserver($scheduler, $observer); + $so = new ScheduledObserver($this->scheduler, $observer); $subscription = $this->createRemovableDisposable($this, $so); $this->observers[] = $so; - if ($this->value){ + if ($this->value) { $so->onNext($this->value); } @@ -76,12 +73,12 @@ public function onNext($value) } } - private function createRemovableDisposable($subject, $observer) + private function createRemovableDisposable(Subject $subject, ScheduledObserver $observer): DisposableInterface { return new CallbackDisposable(function () use ($observer, $subject) { $observer->dispose(); if (!$subject->isDisposed()) { - array_splice($subject->observers, array_search($observer, $subject->observers), 1); + array_splice($subject->observers, array_search($observer, $subject->observers, true), 1); } }); } diff --git a/src/Subject/WebSocketSubject.php b/src/Subject/WebSocketSubject.php index 54360f0..58a2c53 100644 --- a/src/Subject/WebSocketSubject.php +++ b/src/Subject/WebSocketSubject.php @@ -6,32 +6,32 @@ use React\EventLoop\Timer\Timer; use Rx\Disposable\CallbackDisposable; use Rx\Disposable\CompositeDisposable; -use Rx\React\RejectedPromiseException; +use Rx\DisposableInterface; use Thruway\Message\AbortMessage; use Thruway\Serializer\JsonSerializer; -use React\EventLoop\LoopInterface; use Ratchet\Client\Connector; use Ratchet\Client\WebSocket; use Rx\ObserverInterface; use Rx\Subject\Subject; +use WyriHaximus\React\AsyncInteropLoop\AsyncInteropLoop; final class WebSocketSubject extends Subject { private $url, $protocols, $socket, $openObserver, $closeObserver, $output, $loop, $serializer; - public function __construct(string $url, array $protocols = [], Subject $openObserver = null, Subject $closeObserver = null, WebSocket $socket = null, LoopInterface $loop = null) + public function __construct(string $url, array $protocols = [], Subject $openObserver = null, Subject $closeObserver = null, WebSocket $socket = null) { $this->url = $url; $this->protocols = $protocols; $this->socket = $socket; $this->openObserver = $openObserver; $this->closeObserver = $closeObserver; - $this->loop = $loop ?: \EventLoop\getLoop(); $this->serializer = new JsonSerializer(); + $this->loop = \EventLoop\getLoop(); $this->output = new Subject(); } - public function subscribe(ObserverInterface $observer, $scheduler = null) + public function _subscribe(ObserverInterface $observer): DisposableInterface { $this->output = new Subject(); @@ -41,10 +41,10 @@ public function subscribe(ObserverInterface $observer, $scheduler = null) $disposable = new CompositeDisposable(); - $disposable->add($this->output->subscribe($observer, $scheduler)); + $disposable->add($this->output->subscribe($observer)); $disposable->add(new CallbackDisposable(function () { - if (!$this->output->hasObservers() && $this->socket) { + if ($this->socket && !$this->output->hasObservers()) { $this->socket->close(); $this->socket = null; } @@ -66,7 +66,7 @@ function (WebSocket $ws) { $pingTimer = $this->loop->addPeriodicTimer(30, function (Timer $timer) use ($ws, &$lastReceivedPong) { static $sequence = 0; - if ($lastReceivedPong != $sequence) { + if ((int)$lastReceivedPong !== (int)$sequence) { $timer->cancel(); $ws->close(); } @@ -81,11 +81,11 @@ function (WebSocket $ws) { $lastReceivedPong = $frame->getPayload(); }); - $ws->on("error", function (\Exception $ex) use ($pingTimer) { + $ws->on('error', function (\Exception $ex) use ($pingTimer) { $pingTimer->cancel(); $this->output->onError($ex); }); - $ws->on("close", function ($reason) use ($pingTimer) { + $ws->on('close', function ($reason) use ($pingTimer) { $pingTimer->cancel(); if ($this->closeObserver) { $this->closeObserver->onNext($reason); @@ -98,7 +98,7 @@ function (WebSocket $ws) { $this->output->onError(new \Exception($reason)); }); - $ws->on("message", function ($message) { + $ws->on('message', function ($message) { $msg = $this->serializer->deserialize($message); @@ -115,7 +115,7 @@ function (WebSocket $ws) { } }, function ($error) { - $error = $error instanceof \Exception ? $error : new RejectedPromiseException($error); + $error = $error instanceof \Exception ? $error : new \Exception($error); $this->output->onError($error); }); diff --git a/src/SwitchFirstOperator.php b/src/SwitchFirstOperator.php deleted file mode 100644 index 857157a..0000000 --- a/src/SwitchFirstOperator.php +++ /dev/null @@ -1,65 +0,0 @@ -add($singleDisposable); - - $callbackObserver = new CallbackObserver( - function (Observable $x) use ($disposable, $scheduler, $observer) { - if ($this->hasCurrent) { - return; - } - $this->hasCurrent = true; - - $inner = new SingleAssignmentDisposable(); - $disposable->add($inner); - - $innerSub = $x->subscribe(new CallbackObserver( - [$observer, "onNext"], - [$observer, "onError"], - function () use ($disposable, $inner, $observer) { - $disposable->remove($inner); - $this->hasCurrent = false; - - if ($this->isStopped && $disposable->count() === 1) { - $observer->onCompleted(); - } - } - ), $scheduler); - - $inner->setDisposable($innerSub); - }, - [$observer, 'onError'], - function () use ($disposable, $observer) { - $this->isStopped = true; - if (!$this->hasCurrent && $disposable->count() === 1) { - $observer->onCompleted(); - } - } - ); - - $singleDisposable->setDisposable($observable->subscribe($callbackObserver, $scheduler)); - - return $disposable; - - } -} diff --git a/tests/Functional/Observable/CallObservableTest.php b/tests/Functional/Observable/CallObservableTest.php index 5f5a6eb..6bbecad 100644 --- a/tests/Functional/Observable/CallObservableTest.php +++ b/tests/Functional/Observable/CallObservableTest.php @@ -18,17 +18,17 @@ class CallObservableTest extends FunctionalTestCase /** * @test */ - function call_message_never() + public function call_message_never() { $messages = Observable::never(); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) { + $webSocket->subscribe(function ($msg) { $this->recordWampMessage($msg); }); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([], $results->getMessages()); @@ -42,7 +42,7 @@ function call_message_never() /** * @test */ - function call_messages_empty() + public function call_messages_empty() { $messages = $this->createHotObservable([ onNext(150, 1), @@ -50,12 +50,12 @@ function call_messages_empty() ]); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) { + $webSocket->subscribe(function ($msg) { $this->recordWampMessage($msg); }); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -66,13 +66,12 @@ function call_messages_empty() /** * @test */ - function call_one_no_args() + public function call_one_no_args() { - $resultMessage = new ResultMessage(null, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -85,7 +84,7 @@ function call_one_no_args() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -97,7 +96,7 @@ function call_one_no_args() /** * @test */ - function call_one_with_args() + public function call_one_with_args() { $args = ["testing"]; @@ -105,7 +104,7 @@ function call_one_with_args() $resultMessage = new ResultMessage(null, new \stdClass(), $args); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -118,7 +117,7 @@ function call_one_with_args() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -130,7 +129,7 @@ function call_one_with_args() /** * @test */ - function call_one_with_argskw() + public function call_one_with_argskw() { $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -138,7 +137,7 @@ function call_one_with_argskw() $resultMessage = new ResultMessage(null, new \stdClass(), $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -151,7 +150,7 @@ function call_one_with_argskw() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -164,9 +163,8 @@ function call_one_with_argskw() /** * @test */ - function call_one_with_details() + public function call_one_with_details() { - $args = ["testing"]; $argskw = (object)["foo" => "bar"]; $details = (object)["one" => "two"]; @@ -174,7 +172,7 @@ function call_one_with_details() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -187,7 +185,7 @@ function call_one_with_details() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -200,9 +198,8 @@ function call_one_with_details() /** * @test */ - function call_one_reconnect() + public function call_one_reconnect() { - $args = ["testing"]; $argskw = (object)["foo" => "bar"]; $details = (object)["one" => "two"]; @@ -210,7 +207,7 @@ function call_one_reconnect() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -226,7 +223,7 @@ function call_one_reconnect() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -239,9 +236,8 @@ function call_one_reconnect() /** * @test */ - function call_one_throw_error_before() + public function call_one_throw_error_before() { - $error = new \Exception("testing"); $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -250,7 +246,7 @@ function call_one_throw_error_before() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) use ($resultMessage) { + $webSocket->subscribe(function ($msg) use ($resultMessage) { $this->recordWampMessage($msg); if ($msg instanceof CallMessage) { $requestId = $msg->getRequestId(); @@ -267,7 +263,7 @@ function call_one_throw_error_before() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -284,9 +280,8 @@ function call_one_throw_error_before() /** * @test */ - function call_one_throw_error_after_welcome() + public function call_one_throw_error_after_welcome() { - $error = new \Exception("testing"); $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -295,7 +290,7 @@ function call_one_throw_error_after_welcome() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) use ($resultMessage) { + $webSocket->subscribe(function ($msg) use ($resultMessage) { $this->recordWampMessage($msg); if ($msg instanceof CallMessage) { $requestId = $msg->getRequestId(); @@ -312,7 +307,7 @@ function call_one_throw_error_after_welcome() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -328,9 +323,8 @@ function call_one_throw_error_after_welcome() /** * @test */ - function call_one_throw_error_after_result() + public function call_one_throw_error_after_result() { - $error = new \Exception("testing"); $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -339,7 +333,7 @@ function call_one_throw_error_after_result() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -353,7 +347,7 @@ function call_one_throw_error_after_result() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -365,7 +359,7 @@ function call_one_throw_error_after_result() /** * @test */ - function call_one_throw_sendMessage() + public function call_one_throw_sendMessage() { $error = new \Exception("testing"); @@ -376,7 +370,7 @@ function call_one_throw_sendMessage() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($error) { + $webSocket->subscribe(function (CallMessage $msg) use ($error) { throw $error; }); @@ -388,7 +382,7 @@ function call_one_throw_sendMessage() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -399,13 +393,12 @@ function call_one_throw_sendMessage() /** * @test */ - function call_one_error_message() + public function call_one_error_message() { - $errorMessage = new ErrorMessage(12345, null, new \stdClass(), 'some.server.error'); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) use ($errorMessage) { + $webSocket->subscribe(function ($msg) use ($errorMessage) { if ($msg instanceof CallMessage) { $requestId = $msg->getRequestId(); $errorMessage->setErrorRequestId($requestId); @@ -421,7 +414,7 @@ function call_one_error_message() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }); $this->assertMessages([ @@ -437,7 +430,7 @@ function call_one_error_message() /** * @test */ - function call_one_dispose_before() + public function call_one_dispose_before() { $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -446,7 +439,7 @@ function call_one_dispose_before() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function ($msg) use ($resultMessage) { + $webSocket->subscribe(function ($msg) use ($resultMessage) { if ($msg instanceof CallMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); @@ -462,7 +455,7 @@ function call_one_dispose_before() ]); $results = $this->scheduler->startWithDispose(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }, 220); $this->assertMessages([], $results->getMessages()); @@ -477,7 +470,7 @@ function call_one_dispose_before() /** * @test */ - function call_one_dispose_after() + public function call_one_dispose_after() { $args = ["testing"]; $argskw = (object)["foo" => "bar"]; @@ -486,7 +479,7 @@ function call_one_dispose_after() $resultMessage = new ResultMessage(null, $details, $args, $argskw); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (CallMessage $msg) use ($resultMessage) { + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage) { $requestId = $msg->getRequestId(); $resultMessage->setRequestId($requestId); }); @@ -499,7 +492,7 @@ function call_one_dispose_after() ]); $results = $this->scheduler->startWithDispose(function () use ($messages, $webSocket) { - return new CallObservable('testing.uri', $messages, $webSocket); + return new CallObservable('testing.uri', $messages, $webSocket, null, null, null, 3000, $this->scheduler); }, 260); $this->assertMessages([ diff --git a/tests/Functional/Observable/RegisterObservableTest.php b/tests/Functional/Observable/RegisterObservableTest.php index 80327fc..bb29559 100644 --- a/tests/Functional/Observable/RegisterObservableTest.php +++ b/tests/Functional/Observable/RegisterObservableTest.php @@ -24,23 +24,23 @@ public function callable($first = 0, $second = 0) public function callableObs($first = 0, $second = 0) { - return Observable::just($first + $second); + return Observable::just($first + $second, $this->scheduler); } public function callableManyObs($first = 0, $second = 0) { - return Observable::fromArray([$first, $second]); + return Observable::fromArray([$first, $second], $this->scheduler); } /** * @test */ - function register_message_never() + public function register_message_never() { $messages = Observable::never(); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (RegisterMessage $msg) { + $webSocket->subscribe(function (RegisterMessage $msg) { $this->assertEquals($msg->getUri(), 'testing.uri'); }); @@ -54,7 +54,7 @@ function register_message_never() /** * @test */ - function register_messages_empty() + public function register_messages_empty() { $messages = $this->createHotObservable([ onNext(150, 1), @@ -62,7 +62,7 @@ function register_messages_empty() ]); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (RegisterMessage $msg) { + $webSocket->subscribe(function (RegisterMessage $msg) { $this->assertEquals($msg->getUri(), 'testing.uri'); }); @@ -82,12 +82,12 @@ function register_messages_empty() /** * @test */ - function register_with_no_invocation() + public function register_with_no_invocation() { $registeredMsg = new RegisteredMessage(null, 54321); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -122,12 +122,12 @@ function register_with_no_invocation() /** * @test */ - function register_with_no_invocation_no_complete() + public function register_with_no_invocation_no_complete() { $registeredMsg = new RegisteredMessage(null, 54321); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -159,13 +159,13 @@ function register_with_no_invocation_no_complete() /** * @test */ - function register_with_one_invocation_no_args() + public function register_with_one_invocation_no_args() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -182,7 +182,7 @@ function register_with_one_invocation_no_args() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -201,13 +201,13 @@ function register_with_one_invocation_no_args() /** * @test */ - function register_reconnect() + public function register_reconnect() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -226,7 +226,7 @@ function register_reconnect() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -245,13 +245,13 @@ function register_reconnect() /** * @test */ - function register_with_many_invocations_no_args() + public function register_with_many_invocations_no_args() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -271,7 +271,7 @@ function register_with_many_invocations_no_args() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -293,13 +293,13 @@ function register_with_many_invocations_no_args() /** * @test */ - function register_with_one_invocation_with_one_arg() + public function register_with_one_invocation_with_one_arg() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass(), [1]); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -316,7 +316,7 @@ function register_with_one_invocation_with_one_arg() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -335,13 +335,13 @@ function register_with_one_invocation_with_one_arg() /** * @test */ - function register_with_one_invocation_with_two_arg() + public function register_with_one_invocation_with_two_arg() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass(), [1, 2]); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -358,7 +358,7 @@ function register_with_one_invocation_with_two_arg() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -377,13 +377,13 @@ function register_with_one_invocation_with_two_arg() /** * @test */ - function register_with_one_invocation_with_two_arg_obs() + public function register_with_one_invocation_with_two_arg_obs() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass(), [1, 2]); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -400,7 +400,7 @@ function register_with_one_invocation_with_two_arg_obs() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callableObs'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callableObs'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -423,13 +423,13 @@ function register_with_one_invocation_with_two_arg_obs() /** * @test */ - function register_callback_throws() + public function register_callback_throws() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -448,7 +448,7 @@ function register_callback_throws() $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { return new RegisterObservable('testing.uri', function () { throw new \Exception('error'); - }, $messages, $webSocket); + }, $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -467,13 +467,13 @@ function register_callback_throws() /** * @test */ - function register_callback_throws_multiple() + public function register_callback_throws_multiple() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -495,7 +495,7 @@ function register_callback_throws_multiple() $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { return new RegisterObservable('testing.uri', function () { throw new \Exception('error'); - }, $messages, $webSocket); + }, $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -517,13 +517,13 @@ function register_callback_throws_multiple() /** * @test */ - function register_callback_errors() + public function register_callback_errors() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -541,8 +541,8 @@ function register_callback_errors() $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { return new RegisterObservable('testing.uri', function () { - return Observable::error(new \Exception('error')); - }, $messages, $webSocket); + return Observable::error(new \Exception('error'), $this->scheduler); + }, $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -561,13 +561,13 @@ function register_callback_errors() /** * @test */ - function register_callback_errors_multiple() + public function register_callback_errors_multiple() { $registeredMsg = new RegisteredMessage(null, 54321); $invocationMsg = new InvocationMessage(44444, 54321, new \stdClass()); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($registeredMsg) { + $webSocket->subscribe(function (Message $msg) use ($registeredMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $registeredMsg->setRequestId($requestId); @@ -588,8 +588,8 @@ function register_callback_errors_multiple() $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { return new RegisterObservable('testing.uri', function () { - return Observable::error(new \Exception('error')); - }, $messages, $webSocket); + return Observable::error(new \Exception('error'), $this->scheduler); + }, $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ @@ -611,12 +611,12 @@ function register_callback_errors_multiple() /** * @test */ - function register_with_registration_error() + public function register_with_registration_error() { $errorMsg = new ErrorMessage(null, 54321, new \stdClass(), "registration.error.uri"); $webSocket = new Subject(); - $webSocket->subscribeCallback(function (Message $msg) use ($errorMsg) { + $webSocket->subscribe(function (Message $msg) use ($errorMsg) { if ($msg instanceof RegisterMessage) { $requestId = $msg->getRequestId(); $errorMsg->setErrorRequestId($requestId); @@ -633,7 +633,7 @@ function register_with_registration_error() ]); $results = $this->scheduler->startWithCreate(function () use ($messages, $webSocket) { - return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket); + return new RegisterObservable('testing.uri', [$this, 'callable'], $messages, $webSocket, [], false, null, $this->scheduler); }); $this->assertMessages([ diff --git a/tests/bootstrap.php b/tests/bootstrap.php index a10c036..7986156 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -1,34 +1,15 @@ addPsr4('Rx\\Thruway\\Tests\\', __DIR__); - break; - } +if (file_exists($file = __DIR__ . '/../vendor/autoload.php')) { + require $file; +} else { + throw new RuntimeException('Install dependencies to run test suite.'); } - -//RxPHP test files -foreach ($locations as $location) { - - $file = $location . "vendor/reactivex/rxphp/test/helper-functions.php"; - if (file_exists($file)) { - $loader->add('Rx', $location . "vendor/reactivex/rxphp/test/"); - require_once $file; - break; - } -} \ No newline at end of file +/** + * The default scheduler is the EventLoopScheduler, which is asynchronous. + * For testing we need to block at `subscribe`, so we need to switch the default to the ImmediateScheduler. + */ +\Rx\Scheduler::setDefaultFactory(function () { + return new \Rx\Scheduler\ImmediateScheduler(); +}); \ No newline at end of file