From 7b90652e3ade93d8221e31094f1c5325f5740e63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ag=C3=BCero?= Date: Tue, 4 Jun 2024 23:30:55 +0200 Subject: [PATCH] Adding option to ignore local messages. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Carlos Agüero --- include/gz/transport/SubscribeOptions.hh | 12 +++++++ include/gz/transport/SubscriptionHandler.hh | 4 +++ src/Node.cc | 2 ++ src/NodeShared.cc | 8 +++++ src/NodeSharedPrivate.hh | 3 ++ src/Node_TEST.cc | 35 +++++++++++++++++++++ src/SubscribeOptions.cc | 13 ++++++++ src/SubscribeOptionsPrivate.hh | 3 ++ src/SubscriptionHandler.cc | 6 ++++ 9 files changed, 86 insertions(+) diff --git a/include/gz/transport/SubscribeOptions.hh b/include/gz/transport/SubscribeOptions.hh index 72cd78e66..1c1f0ef49 100644 --- a/include/gz/transport/SubscribeOptions.hh +++ b/include/gz/transport/SubscribeOptions.hh @@ -66,6 +66,18 @@ namespace gz /// \return The maximum number of messages per second. public: uint64_t MsgsPerSec() const; + /// \brief Set the value to ignore local messages or not. + /// \param[in] _ignore True when ignoring local messages + /// or false otherwise. + /// \sa IgnoreLocalMessages + public: void SetIgnoreLocalMessages(bool _ignore); + + /// \brief Whether the local messages should be ignored. + /// \return true when the local messages should be ignored or + /// false otherwise. + /// \sa SetIgnoreLocalMessages + public: bool IgnoreLocalMessages() const; + #ifdef _WIN32 // Disable warning C4251 which is triggered by // std::unique_ptr diff --git a/include/gz/transport/SubscriptionHandler.hh b/include/gz/transport/SubscriptionHandler.hh index 9e4d789b8..6119e100e 100644 --- a/include/gz/transport/SubscriptionHandler.hh +++ b/include/gz/transport/SubscriptionHandler.hh @@ -80,6 +80,10 @@ namespace gz /// \return A string representation of the handler UUID. public: std::string HandlerUuid() const; + /// \brief Return whether local messages are ignored or not. + /// \return True when local messages are ignored or false otherwise. + public: bool IgnoreLocalMessages() const; + /// \brief Check if message subscription is throttled. If so, verify /// whether the callback should be executed or not. /// \return true if the callback should be executed or false otherwise. diff --git a/src/Node.cc b/src/Node.cc index 4dbc29598..91820042b 100644 --- a/src/Node.cc +++ b/src/Node.cc @@ -338,6 +338,8 @@ bool Node::Publisher::Publish(const ProtoMsg &_msg) pubMsgDetails->msgCopy.reset(_msg.New()); pubMsgDetails->msgCopy->CopyFrom(_msg); + pubMsgDetails->publisherNodeUUID = this->dataPtr->publisher.NUuid(); + if (subscribers.haveLocal) { for (const auto &node : subscribers.localHandlers) diff --git a/src/NodeShared.cc b/src/NodeShared.cc index ce9e8a0c8..bfa1b08d2 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -1849,6 +1849,14 @@ void NodeSharedPrivate::PublishThread() // Send the message to all the local handlers. for (auto &handler : msgDetails->localHandlers) { + + // Check here if we want to ignore local publications. + if (handler->IgnoreLocalMessages() && + msgDetails->publisherNodeUUID == handler->NodeUuid()) + { + continue; + } + try { handler->RunLocalCallback(*(msgDetails->msgCopy.get()), diff --git a/src/NodeSharedPrivate.hh b/src/NodeSharedPrivate.hh index ccb9ba9e2..812bacf31 100644 --- a/src/NodeSharedPrivate.hh +++ b/src/NodeSharedPrivate.hh @@ -167,6 +167,9 @@ namespace gz /// \brief Information about the topic and type. public: MessageInfo info; + + /// \brief Publisher's node UUID. + public: std::string publisherNodeUUID; }; /// \brief Publish thread used to process the pubQueue. diff --git a/src/Node_TEST.cc b/src/Node_TEST.cc index 6c0e2435a..b08439269 100644 --- a/src/Node_TEST.cc +++ b/src/Node_TEST.cc @@ -2020,6 +2020,41 @@ TEST(NodeTest, PubThrottled) reset(); } +////////////////////////////////////////////////// +/// \brief This test creates one local publisher and subscriber and +/// checks that no messages are received when using SetIgnoreLocalMessages +/// is set to true. +TEST(NodeTest, IgnoreLocalMessages) +{ + reset(); + + msgs::Int32 msg; + msg.set_data(data); + + transport::Node node; + + auto pub = node.Advertise(g_topic); + EXPECT_TRUE(pub); + + transport::SubscribeOptions opts; + EXPECT_FALSE(opts.IgnoreLocalMessages()) + opts.SetIgnoreLocalMessages(true); + EXPECT_TRUE(opts.IgnoreLocalMessages()) + EXPECT_TRUE(node.Subscribe(g_topic, cb, opts)); + + // Should be true the first time + for (auto i = 0; i < 3; ++i) + { + EXPECT_TRUE(pub.Publish(msg)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + // No messages should be received. + EXPECT_EQ(0, counter); + + reset(); +} + ////////////////////////////////////////////////// /// \brief This test spawns a service responser and a service requester. The /// requester uses a wrong type for the request argument. The test should verify diff --git a/src/SubscribeOptions.cc b/src/SubscribeOptions.cc index 6a0c1171b..761402b00 100644 --- a/src/SubscribeOptions.cc +++ b/src/SubscribeOptions.cc @@ -36,6 +36,7 @@ SubscribeOptions::SubscribeOptions(const SubscribeOptions &_otherSubscribeOpts) : dataPtr(new SubscribeOptionsPrivate()) { this->SetMsgsPerSec(_otherSubscribeOpts.MsgsPerSec()); + this->SetIgnoreLocalMessages(_otherSubscribeOpts.IgnoreLocalMessages()); } ////////////////////////////////////////////////// @@ -60,3 +61,15 @@ void SubscribeOptions::SetMsgsPerSec(const uint64_t _newMsgsPerSec) { this->dataPtr->msgsPerSec = _newMsgsPerSec; } + +////////////////////////////////////////////////// +bool SubscribeOptions::IgnoreLocalMessages() const +{ + return this->dataPtr->ignoreLocalMessages; +} + +////////////////////////////////////////////////// +void SubscribeOptions::SetIgnoreLocalMessages(bool _ignore) +{ + this->dataPtr->ignoreLocalMessages = _ignore; +} diff --git a/src/SubscribeOptionsPrivate.hh b/src/SubscribeOptionsPrivate.hh index 030ed63b8..0920b1ddc 100644 --- a/src/SubscribeOptionsPrivate.hh +++ b/src/SubscribeOptionsPrivate.hh @@ -41,6 +41,9 @@ namespace gz /// \brief Default message subscription rate. public: uint64_t msgsPerSec = kUnthrottled; + + /// \brief Whether local messages should be ignored or not. + public: bool ignoreLocalMessages = false; }; } } diff --git a/src/SubscriptionHandler.cc b/src/SubscriptionHandler.cc index abefad85e..f41fe6952 100644 --- a/src/SubscriptionHandler.cc +++ b/src/SubscriptionHandler.cc @@ -49,6 +49,12 @@ namespace gz return this->hUuid; } + ///////////////////////////////////////////////// + bool SubscriptionHandlerBase::IgnoreLocalMessages() const + { + return this->opts.IgnoreLocalMessages(); + } + ///////////////////////////////////////////////// bool SubscriptionHandlerBase::UpdateThrottling() {