Skip to content

Commit

Permalink
Merge pull request #3664 from marta-lokhova/outbound_queue_traffic
Browse files Browse the repository at this point in the history
Improvements to overlay outbound queues

Reviewed-by: MonsieurNicolas
  • Loading branch information
latobarita authored Feb 16, 2023
2 parents e6564bb + e1e4ae4 commit 040a29c
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 33 deletions.
27 changes: 24 additions & 3 deletions src/overlay/Peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ namespace stellar
{

constexpr uint32 const ADVERT_CACHE_SIZE = 50000;
constexpr std::chrono::seconds const OUTBOUND_QUEUE_TIMEOUT =
std::chrono::seconds(30);

using namespace std;
using namespace soci;
Expand Down Expand Up @@ -975,6 +977,16 @@ Peer::hasReadingCapacity() const
return mCapacity.mTotalCapacity > 0;
}

bool
dropMessageAfterTimeout(Peer::QueuedOutboundMessage const& queuedMsg,
VirtualClock::time_point now)
{
auto const& msg = *(queuedMsg.mMessage);
bool dropType = msg.type() == TRANSACTION || msg.type() == FLOOD_ADVERT ||
msg.type() == FLOOD_DEMAND;
return dropType && (now - queuedMsg.mTimeEmplaced > OUTBOUND_QUEUE_TIMEOUT);
}

void
Peer::addMsgAndMaybeTrimQueue(std::shared_ptr<StellarMessage const> msg)
{
Expand All @@ -983,6 +995,8 @@ Peer::addMsgAndMaybeTrimQueue(std::shared_ptr<StellarMessage const> msg)
releaseAssert(msg);
auto type = msg->type();
size_t msgQInd = 0;
auto now = mApp.getClock().now();

switch (type)
{
case SCP_MESSAGE:
Expand Down Expand Up @@ -1025,8 +1039,13 @@ Peer::addMsgAndMaybeTrimQueue(std::shared_ptr<StellarMessage const> msg)
{
dropped = queue.size() - limit;
queue.erase(queue.begin(), queue.begin() + dropped);
getOverlayMetrics().mOutboundQueueDropTxs.Mark(dropped);
}
while (!queue.empty() && dropMessageAfterTimeout(queue.front(), now))
{
++dropped;
queue.pop_front();
}
getOverlayMetrics().mOutboundQueueDropTxs.Mark(dropped);
}
else if (type == SCP_MESSAGE)
{
Expand Down Expand Up @@ -1065,7 +1084,8 @@ Peer::addMsgAndMaybeTrimQueue(std::shared_ptr<StellarMessage const> msg)
}
else if (type == FLOOD_ADVERT)
{
while (mAdvertQueueTxHashCount > limit)
while (mAdvertQueueTxHashCount > limit ||
(!queue.empty() && dropMessageAfterTimeout(queue.front(), now)))
{
dropped++;
size_t s = queue.front().mMessage->floodAdvert().txHashes.size();
Expand All @@ -1077,7 +1097,8 @@ Peer::addMsgAndMaybeTrimQueue(std::shared_ptr<StellarMessage const> msg)
}
else if (type == FLOOD_DEMAND)
{
while (mDemandQueueTxHashCount > limit)
while (mDemandQueueTxHashCount > limit ||
(!queue.empty() && dropMessageAfterTimeout(queue.front(), now)))
{
dropped++;
size_t s = queue.front().mMessage->floodDemand().txHashes.size();
Expand Down
101 changes: 71 additions & 30 deletions src/overlay/test/OverlayTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,16 @@ TEST_CASE("outbound queue filtering", "[overlay][connections]")
return std::make_shared<StellarMessage const>(msg);
};

auto testTimeBasedTrimming =
[&](std::deque<Peer::QueuedOutboundMessage> const& queue,
StellarMessage const& msg) {
peer->addMsgAndMaybeTrimQueue(
std::make_shared<StellarMessage const>(msg));
REQUIRE(queue.size() == 1);
simulation->setCurrentVirtualTime(node->getClock().now() +
std::chrono::minutes(2));
};

SECTION("SCP messages, slot too old")
{
for (auto& env : envs)
Expand All @@ -458,16 +468,27 @@ TEST_CASE("outbound queue filtering", "[overlay][connections]")
}
SECTION("txs, limit reached")
{
uint32_t limit = node->getLedgerManager().getLastMaxTxSetSizeOps();
for (uint32_t i = 0; i < limit + 10; ++i)
SECTION("count-based")
{
StellarMessage msg;
msg.type(TRANSACTION);
peer->addMsgAndMaybeTrimQueue(
std::make_shared<StellarMessage const>(msg));
uint32_t limit = node->getLedgerManager().getLastMaxTxSetSizeOps();
for (uint32_t i = 0; i < limit + 10; ++i)
{
StellarMessage msg;
msg.type(TRANSACTION);
peer->addMsgAndMaybeTrimQueue(
std::make_shared<StellarMessage const>(msg));
}
REQUIRE(txQueue.size() == limit);
}
SECTION("time-based")
{
for (uint32_t i = 0; i < 10; ++i)
{
StellarMessage msg;
msg.type(TRANSACTION);
testTimeBasedTrimming(txQueue, msg);
}
}

REQUIRE(txQueue.size() == limit);
}
SECTION("obsolete SCP messages")
{
Expand Down Expand Up @@ -531,39 +552,59 @@ TEST_CASE("outbound queue filtering", "[overlay][connections]")
}
SECTION("advert demand limit reached")
{
uint32_t limit = node->getLedgerManager().getLastMaxTxSetSizeOps();
for (uint32_t i = 0; i < limit + 10; ++i)
SECTION("count-based")
{
uint32_t limit = node->getLedgerManager().getLastMaxTxSetSizeOps();
for (uint32_t i = 0; i < limit + 10; ++i)
{
StellarMessage adv, dem, txn;
adv.type(FLOOD_ADVERT);
dem.type(FLOOD_DEMAND);
adv.floodAdvert().txHashes.push_back(xdrSha256(txn));
dem.floodDemand().txHashes.push_back(xdrSha256(txn));
peer->addMsgAndMaybeTrimQueue(
std::make_shared<StellarMessage const>(adv));
peer->addMsgAndMaybeTrimQueue(
std::make_shared<StellarMessage const>(dem));
}

REQUIRE(advertQueue.size() == limit);
REQUIRE(demandQueue.size() == limit);

StellarMessage adv, dem, txn;
adv.type(FLOOD_ADVERT);
dem.type(FLOOD_DEMAND);
adv.floodAdvert().txHashes.push_back(xdrSha256(txn));
dem.floodDemand().txHashes.push_back(xdrSha256(txn));
for (auto i = 0; i < 2; i++)
{
adv.floodAdvert().txHashes.push_back(xdrSha256(txn));
dem.floodDemand().txHashes.push_back(xdrSha256(txn));
}

peer->addMsgAndMaybeTrimQueue(
std::make_shared<StellarMessage const>(adv));
peer->addMsgAndMaybeTrimQueue(
std::make_shared<StellarMessage const>(dem));
}

REQUIRE(advertQueue.size() == limit);
REQUIRE(demandQueue.size() == limit);

StellarMessage adv, dem, txn;
adv.type(FLOOD_ADVERT);
dem.type(FLOOD_DEMAND);
for (auto i = 0; i < 2; i++)
REQUIRE(advertQueue.size() == limit - 1);
REQUIRE(demandQueue.size() == limit - 1);
}
SECTION("time-based")
{
adv.floodAdvert().txHashes.push_back(xdrSha256(txn));
dem.floodDemand().txHashes.push_back(xdrSha256(txn));
Hash hash;
advertQueue.clear();
demandQueue.clear();
for (uint32_t i = 0; i < 10; ++i)
{
StellarMessage adv, dem;
adv.type(FLOOD_ADVERT);
adv.floodAdvert().txHashes.push_back(hash);
testTimeBasedTrimming(advertQueue, adv);

dem.type(FLOOD_DEMAND);
dem.floodDemand().txHashes.push_back(hash);
testTimeBasedTrimming(demandQueue, dem);
}
}

peer->addMsgAndMaybeTrimQueue(
std::make_shared<StellarMessage const>(adv));
peer->addMsgAndMaybeTrimQueue(
std::make_shared<StellarMessage const>(dem));

REQUIRE(advertQueue.size() == limit - 1);
REQUIRE(demandQueue.size() == limit - 1);
}
}

Expand Down

0 comments on commit 040a29c

Please sign in to comment.