diff --git a/src/Thruway/ClientSession.php b/src/Thruway/ClientSession.php index 7939a779..aecbc295 100644 --- a/src/Thruway/ClientSession.php +++ b/src/Thruway/ClientSession.php @@ -45,7 +45,18 @@ public function subscribe($topicName, callable $callback, $options = null) { return $this->peer->getSubscriber()->subscribe($this, $topicName, $callback, $options); } - + + /** + * Unsubscribe + * + * @param string $subscriptionId + * @return Promise + */ + public function unsubscribe($subscriptionId) + { + return $this->peer->getSubscriber()->unsubscribe($this, $subscriptionId); + } + /** * Publish * @@ -129,7 +140,6 @@ public function close() */ public function onClose() { - + $this->state = static::STATE_DOWN; } - } diff --git a/src/Thruway/Role/Subscriber.php b/src/Thruway/Role/Subscriber.php index cecdfe55..f3cbf87c 100644 --- a/src/Thruway/Role/Subscriber.php +++ b/src/Thruway/Role/Subscriber.php @@ -15,6 +15,7 @@ use Thruway\Message\SubscribedMessage; use Thruway\Message\SubscribeMessage; use Thruway\Message\UnsubscribedMessage; +use Thruway\Message\UnsubscribeMessage; /** * Class Subscriber @@ -28,13 +29,12 @@ class Subscriber extends AbstractRole * @var array */ private $subscriptions; - + /** * Constructor */ public function __construct() { - $this->subscriptions = []; } @@ -86,7 +86,7 @@ protected function processError(AbstractSession $session, ErrorMessage $msg) $this->processSubscribeError($session, $msg); break; case Message::MSG_UNSUBSCRIBE: - // TODO + $this->processUnsubscribeError($session, $msg); break; default: Logger::critical($this, "Unhandled error"); @@ -128,6 +128,23 @@ protected function processSubscribed(ClientSession $session, SubscribedMessage $ } } } + + /** + * Process unsubscribe error + * + * @param \Thruway\AbstractSession $session + * @param \Thruway\Message\ErrorMessage $msg + */ + protected function processUnsubscribeError(AbstractSession $session, ErrorMessage $msg) + { + foreach ($this->subscriptions as $key => $subscription) { + if ($subscription["unsubscribed_request_id"] === $msg->getErrorRequestId()) { + // reject the promise + $subscription['unsubscribed_deferred']->reject($msg); + return; + } + } + } /** * process unsubscribed @@ -219,13 +236,46 @@ public function subscribe(ClientSession $session, $topicName, callable $callback "options" => $options, "deferred" => $deferred ]; - + array_push($this->subscriptions, $subscription); - + $subscribeMsg = new SubscribeMessage($requestId, $options, $topicName); $session->sendMessage($subscribeMsg); return $deferred->promise(); } - + + /** + * process unsubscribe + * @param ClientSession $session + * @param string $subscriptionId + * @return Promise + */ + public function unsubscribe(ClientSession $session, $subscriptionId) + { + $requestId = Utils::getUniqueId(); + $subscriptionExists = false; + $deferred = new Deferred(); + + foreach ($this->subscriptions as $i => $subscription) { + if ($subscription["subscription_id"] == $subscriptionId) { + $subscriptionExists = true; + + $this->subscriptions[$i]["unsubscribed_request_id"] = $requestId; + $this->subscriptions[$i]["unsubscribed_deferred"] = $deferred; + } + } + + // In case the client never subscribed to this topic before + if ($subscriptionExists === false) { + $errorMsg = new ErrorMessage(Message::MSG_UNSUBSCRIBE, $requestId, new \stdClass, "wamp.error.no_such_subscription"); + $deferred->reject($errorMsg); + return $deferred->promise(); + } + + $unsubscribeMessage = new UnsubscribeMessage($requestId, $subscriptionId); + $session->sendMessage($unsubscribeMessage); + + return $deferred->promise(); + } } diff --git a/src/Thruway/Subscription/SubscriptionGroup.php b/src/Thruway/Subscription/SubscriptionGroup.php index 3abfcd37..1431ff32 100644 --- a/src/Thruway/Subscription/SubscriptionGroup.php +++ b/src/Thruway/Subscription/SubscriptionGroup.php @@ -275,6 +275,8 @@ public function processUnsubscribe(Session $session, UnsubscribeMessage $msg) } $this->removeSubscription($subscription); + + Logger::debug($this, "Removed subscription to \"" . $this->getMatchType() . "\":\"" . $this->getUri() . "\""); $session->sendMessage(new UnsubscribedMessage($msg->getRequestId())); return $subscription;