diff --git a/src/Observable/CallObservable.php b/src/Observable/CallObservable.php index 6f99871..c2e8e98 100644 --- a/src/Observable/CallObservable.php +++ b/src/Observable/CallObservable.php @@ -67,7 +67,7 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface new ResultMessage($msg->getRequestId(), (object)['progress' => false]) ], $this->scheduler); } - return Observable::just($msg); + return Observable::of($msg); }) ->share(); diff --git a/src/Observable/RegisterObservable.php b/src/Observable/RegisterObservable.php index acd1be8..fb49d2a 100644 --- a/src/Observable/RegisterObservable.php +++ b/src/Observable/RegisterObservable.php @@ -106,12 +106,12 @@ function () use (&$completed, $observer, $unregister) { } else { $result = call_user_func_array($this->callback, $msg->getArguments()); } - } catch (\Exception $e) { + } catch (\Throwable $e) { $this->invocationErrors->onNext(new WampInvocationException($msg)); - return Observable::emptyObservable($this->scheduler); + return Observable::empty($this->scheduler); } - $resultObs = $result instanceof Observable ? $result : Observable::just($result, $this->scheduler); + $resultObs = $result instanceof Observable ? $result : Observable::of($result, $this->scheduler); if (($this->options['progress'] ?? false) === false) { $returnObs = $resultObs @@ -125,7 +125,7 @@ function () use (&$completed, $observer, $unregister) { ->map(function ($value) use ($msg) { return [$value, $msg, $this->options]; }) - ->concat(Observable::just([null, $msg, ["progress" => false]], $this->scheduler)); + ->concat(Observable::of([null, $msg, ["progress" => false]], $this->scheduler)); } $interruptMsg = $this->messages @@ -136,9 +136,9 @@ function () use (&$completed, $observer, $unregister) { return $returnObs ->takeUntil($interruptMsg) - ->catchError(function (\Exception $ex) use ($msg) { + ->catch(function (\Throwable $ex) use ($msg) { $this->invocationErrors->onNext(new WampInvocationException($msg)); - return Observable::emptyObservable($this->scheduler); + return Observable::empty($this->scheduler); }); })