Skip to content

Commit

Permalink
Catch Throwable in register fixes #2
Browse files Browse the repository at this point in the history
Made some other RxPHP v2 updates
  • Loading branch information
davidwdan committed Mar 28, 2017
1 parent 71d4484 commit 079b4ea
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/Observable/CallObservable.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface
new ResultMessage($msg->getRequestId(), (object)['progress' => false])
], $this->scheduler);
}
return Observable::just($msg);
return Observable::of($msg);
})
->share();

Expand Down
12 changes: 6 additions & 6 deletions src/Observable/RegisterObservable.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ function () use (&$completed, $observer, $unregister) {
} else {
$result = call_user_func_array($this->callback, $msg->getArguments());
}
} catch (\Exception $e) {
} catch (\Throwable $e) {
$this->invocationErrors->onNext(new WampInvocationException($msg));
return Observable::emptyObservable($this->scheduler);
return Observable::empty($this->scheduler);
}

$resultObs = $result instanceof Observable ? $result : Observable::just($result, $this->scheduler);
$resultObs = $result instanceof Observable ? $result : Observable::of($result, $this->scheduler);

if (($this->options['progress'] ?? false) === false) {
$returnObs = $resultObs
Expand All @@ -125,7 +125,7 @@ function () use (&$completed, $observer, $unregister) {
->map(function ($value) use ($msg) {
return [$value, $msg, $this->options];
})
->concat(Observable::just([null, $msg, ["progress" => false]], $this->scheduler));
->concat(Observable::of([null, $msg, ["progress" => false]], $this->scheduler));
}

$interruptMsg = $this->messages
Expand All @@ -136,9 +136,9 @@ function () use (&$completed, $observer, $unregister) {

return $returnObs
->takeUntil($interruptMsg)
->catchError(function (\Exception $ex) use ($msg) {
->catch(function (\Throwable $ex) use ($msg) {
$this->invocationErrors->onNext(new WampInvocationException($msg));
return Observable::emptyObservable($this->scheduler);
return Observable::empty($this->scheduler);
});

})
Expand Down

0 comments on commit 079b4ea

Please sign in to comment.