From e1e4ae4e9df67283fe38e97c97feb82e46af082f Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Tue, 3 Jan 2023 18:14:21 -0800 Subject: [PATCH] Expire messages in outbound queues --- src/overlay/Peer.cpp | 27 +++++++- src/overlay/test/OverlayTests.cpp | 101 +++++++++++++++++++++--------- 2 files changed, 95 insertions(+), 33 deletions(-) diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp index 64f8d5366f..41cfcf4418 100644 --- a/src/overlay/Peer.cpp +++ b/src/overlay/Peer.cpp @@ -44,6 +44,8 @@ namespace stellar { constexpr uint32 const ADVERT_CACHE_SIZE = 50000; +constexpr std::chrono::seconds const OUTBOUND_QUEUE_TIMEOUT = + std::chrono::seconds(30); using namespace std; using namespace soci; @@ -975,6 +977,16 @@ Peer::hasReadingCapacity() const return mCapacity.mTotalCapacity > 0; } +bool +dropMessageAfterTimeout(Peer::QueuedOutboundMessage const& queuedMsg, + VirtualClock::time_point now) +{ + auto const& msg = *(queuedMsg.mMessage); + bool dropType = msg.type() == TRANSACTION || msg.type() == FLOOD_ADVERT || + msg.type() == FLOOD_DEMAND; + return dropType && (now - queuedMsg.mTimeEmplaced > OUTBOUND_QUEUE_TIMEOUT); +} + void Peer::addMsgAndMaybeTrimQueue(std::shared_ptr msg) { @@ -983,6 +995,8 @@ Peer::addMsgAndMaybeTrimQueue(std::shared_ptr msg) releaseAssert(msg); auto type = msg->type(); size_t msgQInd = 0; + auto now = mApp.getClock().now(); + switch (type) { case SCP_MESSAGE: @@ -1025,8 +1039,13 @@ Peer::addMsgAndMaybeTrimQueue(std::shared_ptr msg) { dropped = queue.size() - limit; queue.erase(queue.begin(), queue.begin() + dropped); - getOverlayMetrics().mOutboundQueueDropTxs.Mark(dropped); } + while (!queue.empty() && dropMessageAfterTimeout(queue.front(), now)) + { + ++dropped; + queue.pop_front(); + } + getOverlayMetrics().mOutboundQueueDropTxs.Mark(dropped); } else if (type == SCP_MESSAGE) { @@ -1065,7 +1084,8 @@ Peer::addMsgAndMaybeTrimQueue(std::shared_ptr msg) } else if (type == FLOOD_ADVERT) { - while (mAdvertQueueTxHashCount > limit) + while (mAdvertQueueTxHashCount > limit || + (!queue.empty() && dropMessageAfterTimeout(queue.front(), now))) { dropped++; size_t s = queue.front().mMessage->floodAdvert().txHashes.size(); @@ -1077,7 +1097,8 @@ Peer::addMsgAndMaybeTrimQueue(std::shared_ptr msg) } else if (type == FLOOD_DEMAND) { - while (mDemandQueueTxHashCount > limit) + while (mDemandQueueTxHashCount > limit || + (!queue.empty() && dropMessageAfterTimeout(queue.front(), now))) { dropped++; size_t s = queue.front().mMessage->floodDemand().txHashes.size(); diff --git a/src/overlay/test/OverlayTests.cpp b/src/overlay/test/OverlayTests.cpp index 439268242a..e9c711f3a9 100644 --- a/src/overlay/test/OverlayTests.cpp +++ b/src/overlay/test/OverlayTests.cpp @@ -445,6 +445,16 @@ TEST_CASE("outbound queue filtering", "[overlay][connections]") return std::make_shared(msg); }; + auto testTimeBasedTrimming = + [&](std::deque const& queue, + StellarMessage const& msg) { + peer->addMsgAndMaybeTrimQueue( + std::make_shared(msg)); + REQUIRE(queue.size() == 1); + simulation->setCurrentVirtualTime(node->getClock().now() + + std::chrono::minutes(2)); + }; + SECTION("SCP messages, slot too old") { for (auto& env : envs) @@ -458,16 +468,27 @@ TEST_CASE("outbound queue filtering", "[overlay][connections]") } SECTION("txs, limit reached") { - uint32_t limit = node->getLedgerManager().getLastMaxTxSetSizeOps(); - for (uint32_t i = 0; i < limit + 10; ++i) + SECTION("count-based") { - StellarMessage msg; - msg.type(TRANSACTION); - peer->addMsgAndMaybeTrimQueue( - std::make_shared(msg)); + uint32_t limit = node->getLedgerManager().getLastMaxTxSetSizeOps(); + for (uint32_t i = 0; i < limit + 10; ++i) + { + StellarMessage msg; + msg.type(TRANSACTION); + peer->addMsgAndMaybeTrimQueue( + std::make_shared(msg)); + } + REQUIRE(txQueue.size() == limit); + } + SECTION("time-based") + { + for (uint32_t i = 0; i < 10; ++i) + { + StellarMessage msg; + msg.type(TRANSACTION); + testTimeBasedTrimming(txQueue, msg); + } } - - REQUIRE(txQueue.size() == limit); } SECTION("obsolete SCP messages") { @@ -531,39 +552,59 @@ TEST_CASE("outbound queue filtering", "[overlay][connections]") } SECTION("advert demand limit reached") { - uint32_t limit = node->getLedgerManager().getLastMaxTxSetSizeOps(); - for (uint32_t i = 0; i < limit + 10; ++i) + SECTION("count-based") { + uint32_t limit = node->getLedgerManager().getLastMaxTxSetSizeOps(); + for (uint32_t i = 0; i < limit + 10; ++i) + { + StellarMessage adv, dem, txn; + adv.type(FLOOD_ADVERT); + dem.type(FLOOD_DEMAND); + adv.floodAdvert().txHashes.push_back(xdrSha256(txn)); + dem.floodDemand().txHashes.push_back(xdrSha256(txn)); + peer->addMsgAndMaybeTrimQueue( + std::make_shared(adv)); + peer->addMsgAndMaybeTrimQueue( + std::make_shared(dem)); + } + + REQUIRE(advertQueue.size() == limit); + REQUIRE(demandQueue.size() == limit); + StellarMessage adv, dem, txn; adv.type(FLOOD_ADVERT); dem.type(FLOOD_DEMAND); - adv.floodAdvert().txHashes.push_back(xdrSha256(txn)); - dem.floodDemand().txHashes.push_back(xdrSha256(txn)); + for (auto i = 0; i < 2; i++) + { + adv.floodAdvert().txHashes.push_back(xdrSha256(txn)); + dem.floodDemand().txHashes.push_back(xdrSha256(txn)); + } + peer->addMsgAndMaybeTrimQueue( std::make_shared(adv)); peer->addMsgAndMaybeTrimQueue( std::make_shared(dem)); - } - REQUIRE(advertQueue.size() == limit); - REQUIRE(demandQueue.size() == limit); - - StellarMessage adv, dem, txn; - adv.type(FLOOD_ADVERT); - dem.type(FLOOD_DEMAND); - for (auto i = 0; i < 2; i++) + REQUIRE(advertQueue.size() == limit - 1); + REQUIRE(demandQueue.size() == limit - 1); + } + SECTION("time-based") { - adv.floodAdvert().txHashes.push_back(xdrSha256(txn)); - dem.floodDemand().txHashes.push_back(xdrSha256(txn)); + Hash hash; + advertQueue.clear(); + demandQueue.clear(); + for (uint32_t i = 0; i < 10; ++i) + { + StellarMessage adv, dem; + adv.type(FLOOD_ADVERT); + adv.floodAdvert().txHashes.push_back(hash); + testTimeBasedTrimming(advertQueue, adv); + + dem.type(FLOOD_DEMAND); + dem.floodDemand().txHashes.push_back(hash); + testTimeBasedTrimming(demandQueue, dem); + } } - - peer->addMsgAndMaybeTrimQueue( - std::make_shared(adv)); - peer->addMsgAndMaybeTrimQueue( - std::make_shared(dem)); - - REQUIRE(advertQueue.size() == limit - 1); - REQUIRE(demandQueue.size() == limit - 1); } }