Skip to content

Commit

Permalink
Overlay improvements
Browse files Browse the repository at this point in the history
Co-authored-by: MonsieurNicolas <[email protected]>
Co-authored-by: Graydon Hoare <[email protected]>
  • Loading branch information
3 people committed Sep 19, 2023
1 parent c2599d6 commit 5664eff
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 71 deletions.
10 changes: 10 additions & 0 deletions src/main/ApplicationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "util/Fs.h"
#include "work/ConditionalWork.h"
#include "work/WorkWithCallback.h"
#include "xdr/Stellar-ledger-entries.h"
Expand Down Expand Up @@ -1211,6 +1212,15 @@ ApplicationImpl::syncOwnMetrics()
TracyPlot("process.action.queue", qsize);
mMetrics->NewCounter({"process", "action", "overloaded"})
.set_count(static_cast<int64_t>(getClock().actionQueueIsOverloaded()));

// Update overlay inbound-connections and file-handle metrics.
if (mOverlayManager)
{
mMetrics->NewCounter({"overlay", "inbound", "live"})
.set_count(*mOverlayManager->getLiveInboundPeersCounter());
}
mMetrics->NewCounter({"process", "file", "handles"})
.set_count(fs::getOpenHandleCount());
}

void
Expand Down
2 changes: 1 addition & 1 deletion src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1634,7 +1634,7 @@ Config::adjust()
auto const originalMaxPendingConnections = MAX_PENDING_CONNECTIONS;

int maxFsConnections = std::min<int>(
std::numeric_limits<unsigned short>::max(), fs::getMaxConnections());
std::numeric_limits<unsigned short>::max(), fs::getMaxHandles());

auto totalAuthenticatedConnections =
TARGET_PEER_CONNECTIONS + MAX_ADDITIONAL_PEER_CONNECTIONS;
Expand Down
7 changes: 6 additions & 1 deletion src/overlay/OverlayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class OverlayManager
virtual Peer::pointer getConnectedPeer(PeerBareAddress const& address) = 0;

// Add new pending inbound connection.
virtual void addInboundConnection(Peer::pointer peer) = 0;
virtual void maybeAddInboundConnection(Peer::pointer peer) = 0;

// Add new pending outbound connection. Return true if connection was added.
virtual bool addOutboundConnection(Peer::pointer peer) = 0;
Expand All @@ -136,6 +136,8 @@ class OverlayManager
virtual bool acceptAuthenticatedPeer(Peer::pointer peer) = 0;

virtual bool isPreferred(Peer* peer) const = 0;
virtual bool isPossiblyPreferred(std::string const& ip) const = 0;
virtual bool haveSpaceForConnection(std::string const& ip) const = 0;

virtual bool isFloodMessage(StellarMessage const& msg) = 0;

Expand All @@ -150,6 +152,9 @@ class OverlayManager
// Return the current in-memory set of pending peers.
virtual std::vector<Peer::pointer> getPendingPeers() const = 0;

// return the counter of live inbound peers (shared with TCPPeer)
virtual std::shared_ptr<int> getLiveInboundPeersCounter() const = 0;

// Return number of pending peers
virtual int getPendingPeersCount() const = 0;

Expand Down
104 changes: 69 additions & 35 deletions src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ OverlayManagerImpl::OverlayManagerImpl(Application& app)
mApp.getConfig().MAX_ADDITIONAL_PEER_CONNECTIONS)
, mOutboundPeers(*this, mApp.getMetrics(), "outbound", "cancel",
mApp.getConfig().TARGET_PEER_CONNECTIONS)
, mLiveInboundPeersCounter(make_shared<int>(0))
, mPeerManager(app)
, mDoor(mApp)
, mAuth(mApp)
Expand All @@ -283,7 +284,6 @@ OverlayManagerImpl::OverlayManagerImpl(Application& app)
, mDemandTimer(app)
, mResolvingPeersWithBackoff(true)
, mResolvingPeersRetryCount(0)

{
mPeerSources[PeerType::INBOUND] = std::make_unique<RandomPeerSource>(
mPeerManager, RandomPeerSource::nextAttemptCutoff(PeerType::INBOUND));
Expand Down Expand Up @@ -801,60 +801,88 @@ OverlayManagerImpl::updateSizeCounters()
}

void
OverlayManagerImpl::addInboundConnection(Peer::pointer peer)
OverlayManagerImpl::maybeAddInboundConnection(Peer::pointer peer)
{
ZoneScoped;
releaseAssert(peer->getRole() == Peer::REMOTE_CALLED_US);
mInboundPeers.mConnectionsAttempted.Mark();

auto haveSpace = mInboundPeers.mPending.size() <
mApp.getConfig().MAX_INBOUND_PENDING_CONNECTIONS;
if (!haveSpace && mInboundPeers.mPending.size() <
mApp.getConfig().MAX_INBOUND_PENDING_CONNECTIONS +
Config::POSSIBLY_PREFERRED_EXTRA)
if (peer)
{
// for peers that are possibly preferred (they have the same IP as some
// preferred peer we enocuntered in past), we allow an extra
// Config::POSSIBLY_PREFERRED_EXTRA incoming pending connections, that
// are not available for non-preferred peers
haveSpace = isPossiblyPreferred(peer->getIP());
}
releaseAssert(peer->getRole() == Peer::REMOTE_CALLED_US);
bool haveSpace = haveSpaceForConnection(peer->getIP());

if (mShuttingDown || !haveSpace)
{
if (!mShuttingDown)
if (mShuttingDown || !haveSpace)
{
CLOG_DEBUG(
Overlay,
"Peer rejected - all pending inbound connections are taken: {}",
peer->toString());
CLOG_DEBUG(Overlay, "If you wish to allow for more pending "
"inbound connections, please update your "
"MAX_PENDING_CONNECTIONS setting in "
"configuration file.");
mInboundPeers.mConnectionsCancelled.Mark();
peer->drop("all pending inbound connections are taken",
Peer::DropDirection::WE_DROPPED_REMOTE,
Peer::DropMode::IGNORE_WRITE_QUEUE);
return;
}

CLOG_DEBUG(Overlay, "New (inbound) connected peer {}",
peer->toString());
mInboundPeers.mConnectionsEstablished.Mark();
mInboundPeers.mPending.push_back(peer);
updateSizeCounters();
}
else
{
mInboundPeers.mConnectionsCancelled.Mark();
peer->drop("all pending inbound connections are taken",
Peer::DropDirection::WE_DROPPED_REMOTE,
Peer::DropMode::IGNORE_WRITE_QUEUE);
return;
}
CLOG_DEBUG(Overlay, "New (inbound) connected peer {}", peer->toString());
mInboundPeers.mConnectionsEstablished.Mark();
mInboundPeers.mPending.push_back(peer);
updateSizeCounters();
}

bool
OverlayManagerImpl::isPossiblyPreferred(std::string const& ip)
OverlayManagerImpl::isPossiblyPreferred(std::string const& ip) const
{
return std::any_of(
std::begin(mConfigurationPreferredPeers),
std::end(mConfigurationPreferredPeers),
[&](PeerBareAddress const& address) { return address.getIP() == ip; });
}

bool
OverlayManagerImpl::haveSpaceForConnection(std::string const& ip) const
{
auto totalAuthenticated = getInboundAuthenticatedPeers().size();
auto totalTracked = *getLiveInboundPeersCounter();

size_t totalPendingCount = 0;
if (totalTracked > totalAuthenticated)
{
totalPendingCount = totalTracked - totalAuthenticated;
}
auto adjustedInCount =
std::max<size_t>(mInboundPeers.mPending.size(), totalPendingCount);

auto haveSpace =
adjustedInCount < mApp.getConfig().MAX_INBOUND_PENDING_CONNECTIONS;

if (!haveSpace &&
adjustedInCount < mApp.getConfig().MAX_INBOUND_PENDING_CONNECTIONS +
Config::POSSIBLY_PREFERRED_EXTRA)
{
// for peers that are possibly preferred (they have the same IP as some
// preferred peer we enocuntered in past), we allow an extra
// Config::POSSIBLY_PREFERRED_EXTRA incoming pending connections, that
// are not available for non-preferred peers
haveSpace = isPossiblyPreferred(ip);
}

if (!haveSpace)
{
CLOG_DEBUG(
Overlay,
"Peer rejected - all pending inbound connections are taken: {}",
ip);
CLOG_DEBUG(Overlay, "If you wish to allow for more pending "
"inbound connections, please update your "
"MAX_PENDING_CONNECTIONS setting in "
"configuration file.");
}

return haveSpace;
}

bool
OverlayManagerImpl::addOutboundConnection(Peer::pointer peer)
{
Expand Down Expand Up @@ -955,6 +983,12 @@ OverlayManagerImpl::getAuthenticatedPeers() const
return result;
}

std::shared_ptr<int>
OverlayManagerImpl::getLiveInboundPeersCounter() const
{
return mLiveInboundPeersCounter;
}

int
OverlayManagerImpl::getPendingPeersCount() const
{
Expand Down
10 changes: 8 additions & 2 deletions src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class OverlayManagerImpl : public OverlayManager
PeersList mInboundPeers;
PeersList mOutboundPeers;

std::shared_ptr<int> mLiveInboundPeersCounter;

PeersList& getPeersList(Peer* peer);

PeerManager mPeerManager;
Expand Down Expand Up @@ -141,7 +143,7 @@ class OverlayManagerImpl : public OverlayManager
std::optional<Hash> const hash = std::nullopt) override;
void connectTo(PeerBareAddress const& address) override;

void addInboundConnection(Peer::pointer peer) override;
void maybeAddInboundConnection(Peer::pointer peer) override;
bool addOutboundConnection(Peer::pointer peer) override;
void removePeer(Peer* peer) override;
void storeConfigPeers();
Expand All @@ -153,6 +155,9 @@ class OverlayManagerImpl : public OverlayManager
std::vector<Peer::pointer> const& getInboundPendingPeers() const override;
std::vector<Peer::pointer> const& getOutboundPendingPeers() const override;
std::vector<Peer::pointer> getPendingPeers() const override;

virtual std::shared_ptr<int> getLiveInboundPeersCounter() const override;

int getPendingPeersCount() const override;
std::map<NodeID, Peer::pointer> const&
getInboundAuthenticatedPeers() const override;
Expand Down Expand Up @@ -221,7 +226,8 @@ class OverlayManagerImpl : public OverlayManager
int availableOutboundAuthenticatedSlots() const;
int nonPreferredAuthenticatedCount() const;

bool isPossiblyPreferred(std::string const& ip);
virtual bool isPossiblyPreferred(std::string const& ip) const override;
virtual bool haveSpaceForConnection(std::string const& ip) const override;

void updateSizeCounters();

Expand Down
33 changes: 17 additions & 16 deletions src/overlay/Peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -596,13 +596,13 @@ Peer::sendMessage(std::shared_ptr<StellarMessage const> msg, bool log)
CLOG_TRACE(Overlay, "send: {} to : {}", msgSummary(*msg),
mApp.getConfig().toShortString(mPeerID));

// There are really _two_ layers of queues, one in Scheduler for actions and
// one in Peer (and its subclasses) for outgoing writes. We enforce a
// similar load-shedding discipline here as in Scheduler: if there is more
// than the scheduler latency-window worth of material in the write queue,
// and we're being asked to add messages that are being generated _from_ a
// droppable action, we drop the message rather than enqueue it. This avoids
// growing our queues indefinitely.
// There are really _two_ layers of queues, one in Scheduler for actions
// and one in Peer (and its subclasses) for outgoing writes. We enforce
// a similar load-shedding discipline here as in Scheduler: if there is
// more than the scheduler latency-window worth of material in the write
// queue, and we're being asked to add messages that are being generated
// _from_ a droppable action, we drop the message rather than enqueue
// it. This avoids growing our queues indefinitely.
if (mApp.getClock().currentSchedulerActionType() ==
Scheduler::ActionType::DROPPABLE_ACTION &&
sendQueueIsOverloaded())
Expand Down Expand Up @@ -854,8 +854,8 @@ Peer::recvMessage(StellarMessage const& stellarMsg)

if (!mApp.getLedgerManager().isSynced() && ignoreIfOutOfSync)
{
// For transactions, exit early during the state rebuild, as we can't
// properly verify them
// For transactions, exit early during the state rebuild, as we
// can't properly verify them
return;
}

Expand Down Expand Up @@ -1090,9 +1090,9 @@ Peer::recvGetTxSet(StellarMessage const& msg)
if (mRemoteOverlayVersion <
Peer::FIRST_VERSION_SUPPORTING_GENERALIZED_TX_SET)
{
// The peer wouldn't be able to accept the generalized tx set,
// but it wouldn't be correct to say we don't have it. So we
// just let the request to timeout.
// The peer wouldn't be able to accept the generalized tx
// set, but it wouldn't be correct to say we don't have it.
// So we just let the request to timeout.
return;
}
newMsg.type(GENERALIZED_TX_SET);
Expand Down Expand Up @@ -1122,8 +1122,9 @@ Peer::recvGetTxSet(StellarMessage const& msg)
: GENERALIZED_TX_SET;
// If peer is not aware of generalized tx sets and we don't have the
// requested hash, then it probably requests an old-style tx set we
// don't have. Another option is that the peer is in incorrect state,
// but it's also ok to say we don't have the requested old-style tx set.
// don't have. Another option is that the peer is in incorrect
// state, but it's also ok to say we don't have the requested
// old-style tx set.
if (messageType == GENERALIZED_TX_SET &&
mRemoteOverlayVersion <
Peer::FIRST_VERSION_SUPPORTING_GENERALIZED_TX_SET)
Expand Down Expand Up @@ -1594,8 +1595,8 @@ Peer::recvAuth(StellarMessage const& msg)
return;
}

// Subtle: after successful auth, must send sendMore message first to tell
// the other peer about the local node's reading capacity.
// Subtle: after successful auth, must send sendMore message first to
// tell the other peer about the local node's reading capacity.
auto weakSelf = std::weak_ptr<Peer>(self);
auto sendCb = [weakSelf](StellarMessage const& msg) {
auto self = weakSelf.lock();
Expand Down
17 changes: 14 additions & 3 deletions src/overlay/PeerDoor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace stellar
{
constexpr uint32 const LISTEN_QUEUE_LIMIT = 100;

using asio::ip::tcp;
using namespace std;
Expand All @@ -33,7 +34,7 @@ PeerDoor::start()
mAcceptor.open(endpoint.protocol());
mAcceptor.set_option(asio::ip::tcp::acceptor::reuse_address(true));
mAcceptor.bind(endpoint);
mAcceptor.listen();
mAcceptor.listen(LISTEN_QUEUE_LIMIT);
acceptNextPeer();
}
}
Expand Down Expand Up @@ -74,9 +75,19 @@ PeerDoor::handleKnock(shared_ptr<TCPPeer::SocketType> socket)
{
CLOG_DEBUG(Overlay, "PeerDoor handleKnock()");
Peer::pointer peer = TCPPeer::accept(mApp, socket);
if (peer)

// Still call addInboundConnection to update metrics
mApp.getOverlayManager().maybeAddInboundConnection(peer);

if (!peer)
{
mApp.getOverlayManager().addInboundConnection(peer);
asio::error_code ec;
socket->close(ec);
if (ec)
{
CLOG_WARNING(Overlay, "TCPPeer: close socket failed: {}",
ec.message());
}
}
acceptNextPeer();
}
Expand Down
Loading

0 comments on commit 5664eff

Please sign in to comment.