Skip to content

Commit

Permalink
Fixed critical bugs e.g., incorrectly retained messages on qos0, race…
Browse files Browse the repository at this point in the history
… conditions on detached sessions, eliminated set_exception on client disconnect tasks, a few debug log message isEnabledFor wrappers.
  • Loading branch information
shipmints committed Apr 1, 2020
1 parent 31165fb commit 1f5efd3
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 13 deletions.
18 changes: 11 additions & 7 deletions hbmqtt/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,21 +707,25 @@ def _broadcast_loop(self):
if 'qos' in broadcast:
qos = broadcast['qos']
if target_session.transitions.state == 'connected':
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
handler = self._get_handler(target_session)
task = asyncio.ensure_future(
handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False),
loop=self._loop)
running_tasks.append(task)
else:
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
elif qos is not None and qos > 0:

This comment has been minimized.

Copy link
@HerrMuellerluedenscheid

HerrMuellerluedenscheid Jan 10, 2021

@shipmints I think this is wrong. It means that only messages with qos>0 can be retained. As far as I understand the retained flag it does not have an effect on qos and vice versa. That's how I understand this resource: http://www.steves-internet-guide.com/mqtt-retained-messages-example/
Pay special attention to 'common questions and answers' section.

Also the test test_broker.BrokerTest.test_client_publish_retain_subscribe was designed with retained=1 and qos=0 to be published. Thus, either fix the code or fix the test.

This comment has been minimized.

Copy link
@shipmints

shipmints Jan 11, 2021

Author Contributor

There is a larger design issue with how hbmqtt handles this case which was not addressed in this commit. If I recall correctly, there was a mention of this in the issue. The major issue that this simple change addressed was that hbmqtt consumed memory ad infinitum retaining messages which was clearly not correct and definitely worse than the solution until @njouanin could take a look (or someone else).

This comment has been minimized.

Copy link
@HerrMuellerluedenscheid

HerrMuellerluedenscheid Jan 11, 2021

I see. Still, the tests were failing after this commit/change and need to be adjusted to match the changed logic even if it is only a temporary workaround. Do you have resources at the moment to fix the test (stated above)?

This comment has been minimized.

Copy link
@shipmints

shipmints Jan 12, 2021

Author Contributor

@HerrMuellerluedenscheid please use @ mentions when commenting because no notifications to comment updates such as these are made by github to participants (or I don't know how to enable them).

This comment has been minimized.

Copy link
@shipmints

shipmints Jan 12, 2021

Author Contributor

@HerrMuellerluedenscheid I will take a look. Not sure how many cycles I have for this but if easy, I'll see if I can branch from the old commit and address the test.

This comment has been minimized.

Copy link
@shipmints

shipmints Jan 12, 2021

Author Contributor

Seems I do get email notifications. As spam. I unspammed them.

This comment has been minimized.

Copy link
@shipmints

shipmints Jan 19, 2021

Author Contributor

Test fixed. See here #236

This comment has been minimized.

Copy link
@shipmints

shipmints Jan 19, 2021

Author Contributor

The comments in the fixes need revision in light of the further details I documented here #237

if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
retained_message = RetainedApplicationMessage(
broadcast['session'], broadcast['topic'], broadcast['data'], qos)
yield from target_session.retained_messages.put(retained_message)
if self.logger.isEnabledFor(logging.DEBUG):

This comment has been minimized.

Copy link
@HerrMuellerluedenscheid

HerrMuellerluedenscheid Jan 10, 2021

@shipmints Is this self.logger.isEnabledFor(logging.DEBUG) needed? Isn't self.logger.debug(...) implicitly doing the same? Same on lines 720 and 710.

This comment has been minimized.

Copy link
@shipmints

shipmints Jan 11, 2021

Author Contributor

The isEnabledFor test avoids the cost of f-string log message construction. In high-pressure areas of the code, this kind of test is sensible.

This comment has been minimized.

Copy link
@HerrMuellerluedenscheid

HerrMuellerluedenscheid Jan 11, 2021

The f-string was breaking py35 support and should not have been used here as it lead to breaking unittests.
If this is really a resource critical issue it might better be replaced with explicit formatting. But the if checking also does not come for free. Is f-string formatting really that expensive? Do you have a benchmark for that?

This comment has been minimized.

Copy link
@shipmints

shipmints Jan 12, 2021

Author Contributor

Considering you're working on py39 support, it seems increasingly likely that py35 is less relevant. Nobody, so far, has complained about py35 breakage. Perhaps it should be deprecated in the test infrastructure as part of your py39 work (which I'm thankful for). We use hbmqtt under py38 which is very stable for us and no plans to upgrade to py39 in the near future.

This comment has been minimized.

Copy link
@HerrMuellerluedenscheid

HerrMuellerluedenscheid Jan 12, 2021

@shipmints Well, I and the tests complain about py35 breakage. Sorry for being the bearer of bad news 😊 But I'm totally in for removing py35! Feedback from @njouanin would be warmly welcomed!

I also use hbmqtt under py38 and py37 mostly fine. But there are known (and maybe unknown?) situations that brake at least py38/py39 such as #223.

self.logger.debug(f'target_session.retained_messages={target_session.retained_messages.qsize()}')
except CancelledError:
# Wait until current broadcasting tasks end
if running_tasks:
Expand Down
2 changes: 1 addition & 1 deletion hbmqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def cancel_tasks():
while self.client_tasks:
task = self.client_tasks.popleft()
if not task.done():
task.set_exception(ClientException("Connection lost"))
task.cancel()

This comment has been minimized.

Copy link
@HerrMuellerluedenscheid

HerrMuellerluedenscheid Jan 10, 2021

I think the task shouldn't be cancelled silently. At least a log message should be introduced. What's wrong with raising the exception?

This comment has been minimized.

Copy link
@shipmints

shipmints Jan 11, 2021

Author Contributor

The set_exception API had been deprecated and is now no longer supported. Cancellation is the best policy here and the higher-level handler can process a CancelledError as it sees fit.

This comment has been minimized.

Copy link
@HerrMuellerluedenscheid

HerrMuellerluedenscheid Jan 11, 2021

Legit 👍


self.logger.debug("Watch broker disconnection")
# Wait for disconnection from broker (like connection lost)
Expand Down
10 changes: 6 additions & 4 deletions hbmqtt/mqtt/protocol/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,26 +417,26 @@ def _reader_loop(self):
if task:
running_tasks.append(task)
else:
self.logger.debug("%s No more data (EOF received), stopping reader coro" % self.session.client_id)
self.logger.debug("No more data (EOF received), stopping reader coro")
break
except MQTTException:
self.logger.debug("Message discarded")
except asyncio.CancelledError:
self.logger.debug("Task cancelled, reader loop ending")
break
except asyncio.TimeoutError:
self.logger.debug("%s Input stream read timeout" % self.session.client_id)
self.logger.debug("Input stream read timeout")
self.handle_read_timeout()
except NoDataException:
self.logger.debug("%s No data available" % self.session.client_id)
self.logger.debug("No data available")
except BaseException as e:
self.logger.warning("%s Unhandled exception in reader coro: %r" % (type(self).__name__, e))
break
while running_tasks:
running_tasks.popleft().cancel()
yield from self.handle_connection_closed()
self._reader_stopped.set()
self.logger.debug("%s Reader coro stopped" % self.session.client_id)
self.logger.debug("Reader coro stopped")
yield from self.stop()

@asyncio.coroutine
Expand All @@ -457,6 +457,8 @@ def _send_packet(self, packet):

@asyncio.coroutine
def mqtt_deliver_next_message(self):
if not self._is_attached():
return None
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("%d message(s) available for delivery" % self.session.delivered_message_queue.qsize())
try:
Expand Down
3 changes: 2 additions & 1 deletion hbmqtt/plugins/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ def clean_fired_events(future):
if wait:
if tasks:
yield from asyncio.wait(tasks, loop=self._loop)
self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events)))
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events)))

@asyncio.coroutine
def map(self, coro, *args, **kwargs):
Expand Down

0 comments on commit 1f5efd3

Please sign in to comment.