Skip to content

Commit

Permalink
Adding option to ignore local messages.
Browse files Browse the repository at this point in the history
Signed-off-by: Carlos Agüero <[email protected]>
  • Loading branch information
caguero committed Jun 4, 2024
1 parent 5ceee93 commit 7b90652
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 0 deletions.
12 changes: 12 additions & 0 deletions include/gz/transport/SubscribeOptions.hh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ namespace gz
/// \return The maximum number of messages per second.
public: uint64_t MsgsPerSec() const;

/// \brief Set the value to ignore local messages or not.
/// \param[in] _ignore True when ignoring local messages
/// or false otherwise.
/// \sa IgnoreLocalMessages
public: void SetIgnoreLocalMessages(bool _ignore);

/// \brief Whether the local messages should be ignored.
/// \return true when the local messages should be ignored or
/// false otherwise.
/// \sa SetIgnoreLocalMessages
public: bool IgnoreLocalMessages() const;

#ifdef _WIN32
// Disable warning C4251 which is triggered by
// std::unique_ptr
Expand Down
4 changes: 4 additions & 0 deletions include/gz/transport/SubscriptionHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ namespace gz
/// \return A string representation of the handler UUID.
public: std::string HandlerUuid() const;

/// \brief Return whether local messages are ignored or not.
/// \return True when local messages are ignored or false otherwise.
public: bool IgnoreLocalMessages() const;

/// \brief Check if message subscription is throttled. If so, verify
/// whether the callback should be executed or not.
/// \return true if the callback should be executed or false otherwise.
Expand Down
2 changes: 2 additions & 0 deletions src/Node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ bool Node::Publisher::Publish(const ProtoMsg &_msg)
pubMsgDetails->msgCopy.reset(_msg.New());
pubMsgDetails->msgCopy->CopyFrom(_msg);

pubMsgDetails->publisherNodeUUID = this->dataPtr->publisher.NUuid();

if (subscribers.haveLocal)
{
for (const auto &node : subscribers.localHandlers)
Expand Down
8 changes: 8 additions & 0 deletions src/NodeShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1849,6 +1849,14 @@ void NodeSharedPrivate::PublishThread()
// Send the message to all the local handlers.
for (auto &handler : msgDetails->localHandlers)
{

// Check here if we want to ignore local publications.
if (handler->IgnoreLocalMessages() &&
msgDetails->publisherNodeUUID == handler->NodeUuid())
{
continue;
}

try
{
handler->RunLocalCallback(*(msgDetails->msgCopy.get()),
Expand Down
3 changes: 3 additions & 0 deletions src/NodeSharedPrivate.hh
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ namespace gz

/// \brief Information about the topic and type.
public: MessageInfo info;

/// \brief Publisher's node UUID.
public: std::string publisherNodeUUID;
};

/// \brief Publish thread used to process the pubQueue.
Expand Down
35 changes: 35 additions & 0 deletions src/Node_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2020,6 +2020,41 @@ TEST(NodeTest, PubThrottled)
reset();
}

//////////////////////////////////////////////////
/// \brief This test creates one local publisher and subscriber and
/// checks that no messages are received when using SetIgnoreLocalMessages
/// is set to true.
TEST(NodeTest, IgnoreLocalMessages)
{
reset();

msgs::Int32 msg;
msg.set_data(data);

transport::Node node;

auto pub = node.Advertise<msgs::Int32>(g_topic);
EXPECT_TRUE(pub);

transport::SubscribeOptions opts;
EXPECT_FALSE(opts.IgnoreLocalMessages())
opts.SetIgnoreLocalMessages(true);
EXPECT_TRUE(opts.IgnoreLocalMessages())
EXPECT_TRUE(node.Subscribe(g_topic, cb, opts));

// Should be true the first time
for (auto i = 0; i < 3; ++i)
{
EXPECT_TRUE(pub.Publish(msg));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

// No messages should be received.
EXPECT_EQ(0, counter);

reset();
}

//////////////////////////////////////////////////
/// \brief This test spawns a service responser and a service requester. The
/// requester uses a wrong type for the request argument. The test should verify
Expand Down
13 changes: 13 additions & 0 deletions src/SubscribeOptions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ SubscribeOptions::SubscribeOptions(const SubscribeOptions &_otherSubscribeOpts)
: dataPtr(new SubscribeOptionsPrivate())
{
this->SetMsgsPerSec(_otherSubscribeOpts.MsgsPerSec());
this->SetIgnoreLocalMessages(_otherSubscribeOpts.IgnoreLocalMessages());
}

//////////////////////////////////////////////////
Expand All @@ -60,3 +61,15 @@ void SubscribeOptions::SetMsgsPerSec(const uint64_t _newMsgsPerSec)
{
this->dataPtr->msgsPerSec = _newMsgsPerSec;
}

//////////////////////////////////////////////////
bool SubscribeOptions::IgnoreLocalMessages() const
{
return this->dataPtr->ignoreLocalMessages;
}

//////////////////////////////////////////////////
void SubscribeOptions::SetIgnoreLocalMessages(bool _ignore)
{
this->dataPtr->ignoreLocalMessages = _ignore;
}
3 changes: 3 additions & 0 deletions src/SubscribeOptionsPrivate.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ namespace gz

/// \brief Default message subscription rate.
public: uint64_t msgsPerSec = kUnthrottled;

/// \brief Whether local messages should be ignored or not.
public: bool ignoreLocalMessages = false;
};
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/SubscriptionHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ namespace gz
return this->hUuid;
}

/////////////////////////////////////////////////
bool SubscriptionHandlerBase::IgnoreLocalMessages() const
{
return this->opts.IgnoreLocalMessages();
}

/////////////////////////////////////////////////
bool SubscriptionHandlerBase::UpdateThrottling()
{
Expand Down

0 comments on commit 7b90652

Please sign in to comment.