diff --git a/examples/rpc-progress.php b/examples/rpc-progress.php index a3eaade..77c69ea 100644 --- a/examples/rpc-progress.php +++ b/examples/rpc-progress.php @@ -7,8 +7,8 @@ $client = new Client('ws://127.0.0.1:9090', "realm1"); -$client->call('com.myapp.example', [1234], [], ["receive_progress" => true]) - ->take(10) +$client->progressiveCall('com.myapp.example', [1234], []) +// ->take(10) ->subscribe(new CallbackObserver( function ($res) { list($args, $argskw, $details) = $res; diff --git a/examples/rpc-repeat-progress.php b/examples/rpc-repeat-progress.php index a8f974f..c20ee2c 100644 --- a/examples/rpc-repeat-progress.php +++ b/examples/rpc-repeat-progress.php @@ -1,29 +1,48 @@ register('com.myapp.example', function () { - return \Rx\Observable::fromArray([1, 2, 3, 4]); - }, ["progress" => true]) - ->flatMapTo($client->call('com.myapp.example', [], [], ["receive_progress" => true]) - ->repeatWhen(function (\Rx\Observable $attempts) { - return $attempts->delay(1000); - }) - ); - -$source->subscribe(new \Rx\Observer\CallbackObserver( - function ($res) { - list($args, $argskw, $details) = $res; - - echo "Call result: ", $args[0], PHP_EOL; - }, - function (Exception $e) { - echo "Call error: ", $e->getMessage(), PHP_EOL; - }, - function () { - echo "Call completed", PHP_EOL; - }), $scheduler); +$client = new Client("ws://127.0.0.1:9090", "realm1"); +$scheduler = new EventLoopScheduler(\EventLoop\getLoop()); + +$client->progressiveRegister('com.myapp.example', function () { + return Observable::interval(500); +})->subscribe(new CallbackObserver(), $scheduler); + + +$client->progressiveCall('com.myapp.example', [], []) + ->take(5) + ->repeatWhen(function (Observable $attempts) { + return $attempts->delay(1000)->take(1); + }) + ->subscribe(new CallbackObserver( + function ($res) { + list($args, $argskw, $details) = $res; + + echo "Call result: ", $args[0], PHP_EOL; + }, + function (Exception $e) { + echo "Call error: ", $e->getMessage(), PHP_EOL; + }, + function () { + echo "Call completed", PHP_EOL; + }), $scheduler); + + +//Output +//Call result: 0 +//Call result: 1 +//Call result: 2 +//Call result: 3 +//Call result: 4 +//Call result: 0 +//Call result: 1 +//Call result: 2 +//Call result: 3 +//Call result: 4 +//Call completed diff --git a/src/Client.php b/src/Client.php index 88d768e..95b26f0 100644 --- a/src/Client.php +++ b/src/Client.php @@ -22,7 +22,7 @@ final class Client { private $url, $loop, $realm, $session, $options, $messages, $webSocket, $scheduler, $disposable, $challengeCallback; - public function __construct(string $url, string $realm, array $options = [], LoopInterface $loop = null) + public function __construct(string $url, string $realm, array $options = [], LoopInterface $loop = null, Subject $webSocket = null, Observable $messages = null, Observable $session = null) { $this->url = $url; $this->realm = $realm; @@ -37,9 +37,9 @@ public function __construct(string $url, string $realm, array $options = [], Loo $open = new Subject(); $close = new Subject(); - $this->webSocket = new WebSocketSubject($url, ['wamp.2.json'], $open, $close); + $this->webSocket = $webSocket ?: new WebSocketSubject($url, ['wamp.2.json'], $open, $close); - $this->messages = $this->webSocket->retryWhen([$this, '_reconnect'])->shareReplay(0); + $this->messages = $messages ?: $this->webSocket->retryWhen([$this, '_reconnect'])->shareReplay(0); $open->map(function () { echo "Connected", PHP_EOL; @@ -75,7 +75,7 @@ public function __construct(string $url, string $realm, array $options = [], Loo }) ->doOnEach($this->webSocket); - $this->session = $this->messages + $this->session = $session ?: $this->messages ->merge($challengeMsg) ->filter(function (Message $msg) { return $msg instanceof WelcomeMessage; @@ -111,6 +111,43 @@ public function register(string $uri, callable $callback, array $options = []) : return $this->registerExtended($uri, $callback, $options, false); } + /** + * This is a variant of call, that expects the far end to emit more than one result. It will also repeat the call, + * if the websocket connection resets and the observable has not completed or errored. + * + * @param string $uri + * @param array $args + * @param array $argskw + * @param array $options + * @return Observable + */ + public function progressiveCall(string $uri, array $args = [], array $argskw = [], array $options = null) :Observable + { + $options['receive_progress'] = true; + + $completed = new Subject(); + $callObs = new CallObservable($uri, $this->messages, $this->webSocket, $args, $argskw, $options); + + return $this->session + ->takeUntil($completed) + ->mapTo($callObs->doOnCompleted(function () use ($completed) { + $completed->onNext(0); + })) + ->switchLatest(); + } + + /** + * @param string $uri + * @param callable $callback + * @param array $options + * @return Observable + */ + public function progressiveRegister(string $uri, callable $callback, array $options = []) :Observable + { + $options['progress'] = true; + return $this->registerExtended($uri, $callback, $options); + } + /** * @param string $uri * @param callable $callback