Skip to content

Commit

Permalink
[WIP] Added progressiveCall and progressiveRegister
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwdan committed Oct 21, 2016
1 parent b2df7f3 commit e09a5e0
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 31 deletions.
4 changes: 2 additions & 2 deletions examples/rpc-progress.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

$client = new Client('ws://127.0.0.1:9090', "realm1");

$client->call('com.myapp.example', [1234], [], ["receive_progress" => true])
->take(10)
$client->progressiveCall('com.myapp.example', [1234], [])
// ->take(10)
->subscribe(new CallbackObserver(
function ($res) {
list($args, $argskw, $details) = $res;
Expand Down
69 changes: 44 additions & 25 deletions examples/rpc-repeat-progress.php
Original file line number Diff line number Diff line change
@@ -1,29 +1,48 @@
<?php

use Rx\Observable;
use Rx\Observer\CallbackObserver;
use Rx\Scheduler\EventLoopScheduler;
use Rx\Thruway\Client;

require __DIR__ . '/../vendor/autoload.php';

$client = new \Rx\Thruway\Client("ws://127.0.0.1:9090", "realm1");
$scheduler = new \Rx\Scheduler\EventLoopScheduler(\EventLoop\getLoop());

$source = $client
->register('com.myapp.example', function () {
return \Rx\Observable::fromArray([1, 2, 3, 4]);
}, ["progress" => true])
->flatMapTo($client->call('com.myapp.example', [], [], ["receive_progress" => true])
->repeatWhen(function (\Rx\Observable $attempts) {
return $attempts->delay(1000);
})
);

$source->subscribe(new \Rx\Observer\CallbackObserver(
function ($res) {
list($args, $argskw, $details) = $res;

echo "Call result: ", $args[0], PHP_EOL;
},
function (Exception $e) {
echo "Call error: ", $e->getMessage(), PHP_EOL;
},
function () {
echo "Call completed", PHP_EOL;
}), $scheduler);
$client = new Client("ws://127.0.0.1:9090", "realm1");
$scheduler = new EventLoopScheduler(\EventLoop\getLoop());

$client->progressiveRegister('com.myapp.example', function () {
return Observable::interval(500);
})->subscribe(new CallbackObserver(), $scheduler);


$client->progressiveCall('com.myapp.example', [], [])
->take(5)
->repeatWhen(function (Observable $attempts) {
return $attempts->delay(1000)->take(1);
})
->subscribe(new CallbackObserver(
function ($res) {
list($args, $argskw, $details) = $res;

echo "Call result: ", $args[0], PHP_EOL;
},
function (Exception $e) {
echo "Call error: ", $e->getMessage(), PHP_EOL;
},
function () {
echo "Call completed", PHP_EOL;
}), $scheduler);


//Output
//Call result: 0
//Call result: 1
//Call result: 2
//Call result: 3
//Call result: 4
//Call result: 0
//Call result: 1
//Call result: 2
//Call result: 3
//Call result: 4
//Call completed
45 changes: 41 additions & 4 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ final class Client
{
private $url, $loop, $realm, $session, $options, $messages, $webSocket, $scheduler, $disposable, $challengeCallback;

public function __construct(string $url, string $realm, array $options = [], LoopInterface $loop = null)
public function __construct(string $url, string $realm, array $options = [], LoopInterface $loop = null, Subject $webSocket = null, Observable $messages = null, Observable $session = null)
{
$this->url = $url;
$this->realm = $realm;
Expand All @@ -37,9 +37,9 @@ public function __construct(string $url, string $realm, array $options = [], Loo
$open = new Subject();
$close = new Subject();

$this->webSocket = new WebSocketSubject($url, ['wamp.2.json'], $open, $close);
$this->webSocket = $webSocket ?: new WebSocketSubject($url, ['wamp.2.json'], $open, $close);

$this->messages = $this->webSocket->retryWhen([$this, '_reconnect'])->shareReplay(0);
$this->messages = $messages ?: $this->webSocket->retryWhen([$this, '_reconnect'])->shareReplay(0);

$open->map(function () {
echo "Connected", PHP_EOL;
Expand Down Expand Up @@ -75,7 +75,7 @@ public function __construct(string $url, string $realm, array $options = [], Loo
})
->doOnEach($this->webSocket);

$this->session = $this->messages
$this->session = $session ?: $this->messages
->merge($challengeMsg)
->filter(function (Message $msg) {
return $msg instanceof WelcomeMessage;
Expand Down Expand Up @@ -111,6 +111,43 @@ public function register(string $uri, callable $callback, array $options = []) :
return $this->registerExtended($uri, $callback, $options, false);
}

/**
* This is a variant of call, that expects the far end to emit more than one result. It will also repeat the call,
* if the websocket connection resets and the observable has not completed or errored.
*
* @param string $uri
* @param array $args
* @param array $argskw
* @param array $options
* @return Observable
*/
public function progressiveCall(string $uri, array $args = [], array $argskw = [], array $options = null) :Observable
{
$options['receive_progress'] = true;

$completed = new Subject();
$callObs = new CallObservable($uri, $this->messages, $this->webSocket, $args, $argskw, $options);

return $this->session
->takeUntil($completed)
->mapTo($callObs->doOnCompleted(function () use ($completed) {
$completed->onNext(0);
}))
->switchLatest();
}

/**
* @param string $uri
* @param callable $callback
* @param array $options
* @return Observable
*/
public function progressiveRegister(string $uri, callable $callback, array $options = []) :Observable
{
$options['progress'] = true;
return $this->registerExtended($uri, $callback, $options);
}

/**
* @param string $uri
* @param callable $callback
Expand Down

0 comments on commit e09a5e0

Please sign in to comment.