From 1f5efd383f85657a64eaca6ce37aecb360788293 Mon Sep 17 00:00:00 2001 From: shipmints Date: Wed, 1 Apr 2020 15:09:09 -0400 Subject: [PATCH] Fixed critical bugs e.g., incorrectly retained messages on qos0, race conditions on detached sessions, eliminated set_exception on client disconnect tasks, a few debug log message isEnabledFor wrappers. --- hbmqtt/broker.py | 18 +++++++++++------- hbmqtt/client.py | 2 +- hbmqtt/mqtt/protocol/handler.py | 10 ++++++---- hbmqtt/plugins/manager.py | 3 ++- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 7bb8bd6..55beb95 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -707,21 +707,25 @@ def _broadcast_loop(self): if 'qos' in broadcast: qos = broadcast['qos'] if target_session.transitions.state == 'connected': - self.logger.debug("broadcasting application message from %s on topic '%s' to %s" % - (format_client_message(session=broadcast['session']), - broadcast['topic'], format_client_message(session=target_session))) + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug("broadcasting application message from %s on topic '%s' to %s" % + (format_client_message(session=broadcast['session']), + broadcast['topic'], format_client_message(session=target_session))) handler = self._get_handler(target_session) task = asyncio.ensure_future( handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False), loop=self._loop) running_tasks.append(task) - else: - self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" % - (format_client_message(session=broadcast['session']), - broadcast['topic'], format_client_message(session=target_session))) + elif qos is not None and qos > 0: + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" % + (format_client_message(session=broadcast['session']), + broadcast['topic'], format_client_message(session=target_session))) retained_message = RetainedApplicationMessage( broadcast['session'], broadcast['topic'], broadcast['data'], qos) yield from target_session.retained_messages.put(retained_message) + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f'target_session.retained_messages={target_session.retained_messages.qsize()}') except CancelledError: # Wait until current broadcasting tasks end if running_tasks: diff --git a/hbmqtt/client.py b/hbmqtt/client.py index 4df0526..6846878 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -456,7 +456,7 @@ def cancel_tasks(): while self.client_tasks: task = self.client_tasks.popleft() if not task.done(): - task.set_exception(ClientException("Connection lost")) + task.cancel() self.logger.debug("Watch broker disconnection") # Wait for disconnection from broker (like connection lost) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 3c294bc..f231a14 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -417,7 +417,7 @@ def _reader_loop(self): if task: running_tasks.append(task) else: - self.logger.debug("%s No more data (EOF received), stopping reader coro" % self.session.client_id) + self.logger.debug("No more data (EOF received), stopping reader coro") break except MQTTException: self.logger.debug("Message discarded") @@ -425,10 +425,10 @@ def _reader_loop(self): self.logger.debug("Task cancelled, reader loop ending") break except asyncio.TimeoutError: - self.logger.debug("%s Input stream read timeout" % self.session.client_id) + self.logger.debug("Input stream read timeout") self.handle_read_timeout() except NoDataException: - self.logger.debug("%s No data available" % self.session.client_id) + self.logger.debug("No data available") except BaseException as e: self.logger.warning("%s Unhandled exception in reader coro: %r" % (type(self).__name__, e)) break @@ -436,7 +436,7 @@ def _reader_loop(self): running_tasks.popleft().cancel() yield from self.handle_connection_closed() self._reader_stopped.set() - self.logger.debug("%s Reader coro stopped" % self.session.client_id) + self.logger.debug("Reader coro stopped") yield from self.stop() @asyncio.coroutine @@ -457,6 +457,8 @@ def _send_packet(self, packet): @asyncio.coroutine def mqtt_deliver_next_message(self): + if not self._is_attached(): + return None if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("%d message(s) available for delivery" % self.session.delivered_message_queue.qsize()) try: diff --git a/hbmqtt/plugins/manager.py b/hbmqtt/plugins/manager.py index 7a923a8..c96c5f1 100644 --- a/hbmqtt/plugins/manager.py +++ b/hbmqtt/plugins/manager.py @@ -149,7 +149,8 @@ def clean_fired_events(future): if wait: if tasks: yield from asyncio.wait(tasks, loop=self._loop) - self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events))) + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events))) @asyncio.coroutine def map(self, coro, *args, **kwargs):