diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index 393b8f67bc..926dcd00f0 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -2,6 +2,7 @@ // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#include "util/Fs.h" #include "work/ConditionalWork.h" #include "work/WorkWithCallback.h" #include "xdr/Stellar-ledger-entries.h" @@ -1211,6 +1212,15 @@ ApplicationImpl::syncOwnMetrics() TracyPlot("process.action.queue", qsize); mMetrics->NewCounter({"process", "action", "overloaded"}) .set_count(static_cast(getClock().actionQueueIsOverloaded())); + + // Update overlay inbound-connections and file-handle metrics. + if (mOverlayManager) + { + mMetrics->NewCounter({"overlay", "inbound", "live"}) + .set_count(*mOverlayManager->getLiveInboundPeersCounter()); + } + mMetrics->NewCounter({"process", "file", "handles"}) + .set_count(fs::getOpenHandleCount()); } void diff --git a/src/main/Config.cpp b/src/main/Config.cpp index 67b3d586fb..3a0705e440 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -1634,7 +1634,7 @@ Config::adjust() auto const originalMaxPendingConnections = MAX_PENDING_CONNECTIONS; int maxFsConnections = std::min( - std::numeric_limits::max(), fs::getMaxConnections()); + std::numeric_limits::max(), fs::getMaxHandles()); auto totalAuthenticatedConnections = TARGET_PEER_CONNECTIONS + MAX_ADDITIONAL_PEER_CONNECTIONS; diff --git a/src/overlay/OverlayManager.h b/src/overlay/OverlayManager.h index 755ebe15ea..4278318fa3 100644 --- a/src/overlay/OverlayManager.h +++ b/src/overlay/OverlayManager.h @@ -118,7 +118,7 @@ class OverlayManager virtual Peer::pointer getConnectedPeer(PeerBareAddress const& address) = 0; // Add new pending inbound connection. - virtual void addInboundConnection(Peer::pointer peer) = 0; + virtual void maybeAddInboundConnection(Peer::pointer peer) = 0; // Add new pending outbound connection. Return true if connection was added. virtual bool addOutboundConnection(Peer::pointer peer) = 0; @@ -136,6 +136,8 @@ class OverlayManager virtual bool acceptAuthenticatedPeer(Peer::pointer peer) = 0; virtual bool isPreferred(Peer* peer) const = 0; + virtual bool isPossiblyPreferred(std::string const& ip) const = 0; + virtual bool haveSpaceForConnection(std::string const& ip) const = 0; virtual bool isFloodMessage(StellarMessage const& msg) = 0; @@ -150,6 +152,9 @@ class OverlayManager // Return the current in-memory set of pending peers. virtual std::vector getPendingPeers() const = 0; + // return the counter of live inbound peers (shared with TCPPeer) + virtual std::shared_ptr getLiveInboundPeersCounter() const = 0; + // Return number of pending peers virtual int getPendingPeersCount() const = 0; diff --git a/src/overlay/OverlayManagerImpl.cpp b/src/overlay/OverlayManagerImpl.cpp index 88b2927eb2..6685588f73 100644 --- a/src/overlay/OverlayManagerImpl.cpp +++ b/src/overlay/OverlayManagerImpl.cpp @@ -270,6 +270,7 @@ OverlayManagerImpl::OverlayManagerImpl(Application& app) mApp.getConfig().MAX_ADDITIONAL_PEER_CONNECTIONS) , mOutboundPeers(*this, mApp.getMetrics(), "outbound", "cancel", mApp.getConfig().TARGET_PEER_CONNECTIONS) + , mLiveInboundPeersCounter(make_shared(0)) , mPeerManager(app) , mDoor(mApp) , mAuth(mApp) @@ -283,7 +284,6 @@ OverlayManagerImpl::OverlayManagerImpl(Application& app) , mDemandTimer(app) , mResolvingPeersWithBackoff(true) , mResolvingPeersRetryCount(0) - { mPeerSources[PeerType::INBOUND] = std::make_unique( mPeerManager, RandomPeerSource::nextAttemptCutoff(PeerType::INBOUND)); @@ -801,53 +801,38 @@ OverlayManagerImpl::updateSizeCounters() } void -OverlayManagerImpl::addInboundConnection(Peer::pointer peer) +OverlayManagerImpl::maybeAddInboundConnection(Peer::pointer peer) { ZoneScoped; - releaseAssert(peer->getRole() == Peer::REMOTE_CALLED_US); mInboundPeers.mConnectionsAttempted.Mark(); - auto haveSpace = mInboundPeers.mPending.size() < - mApp.getConfig().MAX_INBOUND_PENDING_CONNECTIONS; - if (!haveSpace && mInboundPeers.mPending.size() < - mApp.getConfig().MAX_INBOUND_PENDING_CONNECTIONS + - Config::POSSIBLY_PREFERRED_EXTRA) + if (peer) { - // for peers that are possibly preferred (they have the same IP as some - // preferred peer we enocuntered in past), we allow an extra - // Config::POSSIBLY_PREFERRED_EXTRA incoming pending connections, that - // are not available for non-preferred peers - haveSpace = isPossiblyPreferred(peer->getIP()); - } + releaseAssert(peer->getRole() == Peer::REMOTE_CALLED_US); + bool haveSpace = haveSpaceForConnection(peer->getIP()); - if (mShuttingDown || !haveSpace) - { - if (!mShuttingDown) + if (mShuttingDown || !haveSpace) { - CLOG_DEBUG( - Overlay, - "Peer rejected - all pending inbound connections are taken: {}", - peer->toString()); - CLOG_DEBUG(Overlay, "If you wish to allow for more pending " - "inbound connections, please update your " - "MAX_PENDING_CONNECTIONS setting in " - "configuration file."); + mInboundPeers.mConnectionsCancelled.Mark(); + peer->drop("all pending inbound connections are taken", + Peer::DropDirection::WE_DROPPED_REMOTE, + Peer::DropMode::IGNORE_WRITE_QUEUE); + return; } - + CLOG_DEBUG(Overlay, "New (inbound) connected peer {}", + peer->toString()); + mInboundPeers.mConnectionsEstablished.Mark(); + mInboundPeers.mPending.push_back(peer); + updateSizeCounters(); + } + else + { mInboundPeers.mConnectionsCancelled.Mark(); - peer->drop("all pending inbound connections are taken", - Peer::DropDirection::WE_DROPPED_REMOTE, - Peer::DropMode::IGNORE_WRITE_QUEUE); - return; } - CLOG_DEBUG(Overlay, "New (inbound) connected peer {}", peer->toString()); - mInboundPeers.mConnectionsEstablished.Mark(); - mInboundPeers.mPending.push_back(peer); - updateSizeCounters(); } bool -OverlayManagerImpl::isPossiblyPreferred(std::string const& ip) +OverlayManagerImpl::isPossiblyPreferred(std::string const& ip) const { return std::any_of( std::begin(mConfigurationPreferredPeers), @@ -855,6 +840,49 @@ OverlayManagerImpl::isPossiblyPreferred(std::string const& ip) [&](PeerBareAddress const& address) { return address.getIP() == ip; }); } +bool +OverlayManagerImpl::haveSpaceForConnection(std::string const& ip) const +{ + auto totalAuthenticated = getInboundAuthenticatedPeers().size(); + auto totalTracked = *getLiveInboundPeersCounter(); + + size_t totalPendingCount = 0; + if (totalTracked > totalAuthenticated) + { + totalPendingCount = totalTracked - totalAuthenticated; + } + auto adjustedInCount = + std::max(mInboundPeers.mPending.size(), totalPendingCount); + + auto haveSpace = + adjustedInCount < mApp.getConfig().MAX_INBOUND_PENDING_CONNECTIONS; + + if (!haveSpace && + adjustedInCount < mApp.getConfig().MAX_INBOUND_PENDING_CONNECTIONS + + Config::POSSIBLY_PREFERRED_EXTRA) + { + // for peers that are possibly preferred (they have the same IP as some + // preferred peer we enocuntered in past), we allow an extra + // Config::POSSIBLY_PREFERRED_EXTRA incoming pending connections, that + // are not available for non-preferred peers + haveSpace = isPossiblyPreferred(ip); + } + + if (!haveSpace) + { + CLOG_DEBUG( + Overlay, + "Peer rejected - all pending inbound connections are taken: {}", + ip); + CLOG_DEBUG(Overlay, "If you wish to allow for more pending " + "inbound connections, please update your " + "MAX_PENDING_CONNECTIONS setting in " + "configuration file."); + } + + return haveSpace; +} + bool OverlayManagerImpl::addOutboundConnection(Peer::pointer peer) { @@ -955,6 +983,12 @@ OverlayManagerImpl::getAuthenticatedPeers() const return result; } +std::shared_ptr +OverlayManagerImpl::getLiveInboundPeersCounter() const +{ + return mLiveInboundPeersCounter; +} + int OverlayManagerImpl::getPendingPeersCount() const { diff --git a/src/overlay/OverlayManagerImpl.h b/src/overlay/OverlayManagerImpl.h index 1f2a6c5e68..058db03a11 100644 --- a/src/overlay/OverlayManagerImpl.h +++ b/src/overlay/OverlayManagerImpl.h @@ -74,6 +74,8 @@ class OverlayManagerImpl : public OverlayManager PeersList mInboundPeers; PeersList mOutboundPeers; + std::shared_ptr mLiveInboundPeersCounter; + PeersList& getPeersList(Peer* peer); PeerManager mPeerManager; @@ -141,7 +143,7 @@ class OverlayManagerImpl : public OverlayManager std::optional const hash = std::nullopt) override; void connectTo(PeerBareAddress const& address) override; - void addInboundConnection(Peer::pointer peer) override; + void maybeAddInboundConnection(Peer::pointer peer) override; bool addOutboundConnection(Peer::pointer peer) override; void removePeer(Peer* peer) override; void storeConfigPeers(); @@ -153,6 +155,9 @@ class OverlayManagerImpl : public OverlayManager std::vector const& getInboundPendingPeers() const override; std::vector const& getOutboundPendingPeers() const override; std::vector getPendingPeers() const override; + + virtual std::shared_ptr getLiveInboundPeersCounter() const override; + int getPendingPeersCount() const override; std::map const& getInboundAuthenticatedPeers() const override; @@ -221,7 +226,8 @@ class OverlayManagerImpl : public OverlayManager int availableOutboundAuthenticatedSlots() const; int nonPreferredAuthenticatedCount() const; - bool isPossiblyPreferred(std::string const& ip); + virtual bool isPossiblyPreferred(std::string const& ip) const override; + virtual bool haveSpaceForConnection(std::string const& ip) const override; void updateSizeCounters(); diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp index f4118d6a10..f1c1f49afa 100644 --- a/src/overlay/Peer.cpp +++ b/src/overlay/Peer.cpp @@ -596,13 +596,13 @@ Peer::sendMessage(std::shared_ptr msg, bool log) CLOG_TRACE(Overlay, "send: {} to : {}", msgSummary(*msg), mApp.getConfig().toShortString(mPeerID)); - // There are really _two_ layers of queues, one in Scheduler for actions and - // one in Peer (and its subclasses) for outgoing writes. We enforce a - // similar load-shedding discipline here as in Scheduler: if there is more - // than the scheduler latency-window worth of material in the write queue, - // and we're being asked to add messages that are being generated _from_ a - // droppable action, we drop the message rather than enqueue it. This avoids - // growing our queues indefinitely. + // There are really _two_ layers of queues, one in Scheduler for actions + // and one in Peer (and its subclasses) for outgoing writes. We enforce + // a similar load-shedding discipline here as in Scheduler: if there is + // more than the scheduler latency-window worth of material in the write + // queue, and we're being asked to add messages that are being generated + // _from_ a droppable action, we drop the message rather than enqueue + // it. This avoids growing our queues indefinitely. if (mApp.getClock().currentSchedulerActionType() == Scheduler::ActionType::DROPPABLE_ACTION && sendQueueIsOverloaded()) @@ -854,8 +854,8 @@ Peer::recvMessage(StellarMessage const& stellarMsg) if (!mApp.getLedgerManager().isSynced() && ignoreIfOutOfSync) { - // For transactions, exit early during the state rebuild, as we can't - // properly verify them + // For transactions, exit early during the state rebuild, as we + // can't properly verify them return; } @@ -1090,9 +1090,9 @@ Peer::recvGetTxSet(StellarMessage const& msg) if (mRemoteOverlayVersion < Peer::FIRST_VERSION_SUPPORTING_GENERALIZED_TX_SET) { - // The peer wouldn't be able to accept the generalized tx set, - // but it wouldn't be correct to say we don't have it. So we - // just let the request to timeout. + // The peer wouldn't be able to accept the generalized tx + // set, but it wouldn't be correct to say we don't have it. + // So we just let the request to timeout. return; } newMsg.type(GENERALIZED_TX_SET); @@ -1122,8 +1122,9 @@ Peer::recvGetTxSet(StellarMessage const& msg) : GENERALIZED_TX_SET; // If peer is not aware of generalized tx sets and we don't have the // requested hash, then it probably requests an old-style tx set we - // don't have. Another option is that the peer is in incorrect state, - // but it's also ok to say we don't have the requested old-style tx set. + // don't have. Another option is that the peer is in incorrect + // state, but it's also ok to say we don't have the requested + // old-style tx set. if (messageType == GENERALIZED_TX_SET && mRemoteOverlayVersion < Peer::FIRST_VERSION_SUPPORTING_GENERALIZED_TX_SET) @@ -1594,8 +1595,8 @@ Peer::recvAuth(StellarMessage const& msg) return; } - // Subtle: after successful auth, must send sendMore message first to tell - // the other peer about the local node's reading capacity. + // Subtle: after successful auth, must send sendMore message first to + // tell the other peer about the local node's reading capacity. auto weakSelf = std::weak_ptr(self); auto sendCb = [weakSelf](StellarMessage const& msg) { auto self = weakSelf.lock(); diff --git a/src/overlay/PeerDoor.cpp b/src/overlay/PeerDoor.cpp index 1da5b70da0..4c2aacfda5 100644 --- a/src/overlay/PeerDoor.cpp +++ b/src/overlay/PeerDoor.cpp @@ -13,6 +13,7 @@ namespace stellar { +constexpr uint32 const LISTEN_QUEUE_LIMIT = 100; using asio::ip::tcp; using namespace std; @@ -33,7 +34,7 @@ PeerDoor::start() mAcceptor.open(endpoint.protocol()); mAcceptor.set_option(asio::ip::tcp::acceptor::reuse_address(true)); mAcceptor.bind(endpoint); - mAcceptor.listen(); + mAcceptor.listen(LISTEN_QUEUE_LIMIT); acceptNextPeer(); } } @@ -74,9 +75,19 @@ PeerDoor::handleKnock(shared_ptr socket) { CLOG_DEBUG(Overlay, "PeerDoor handleKnock()"); Peer::pointer peer = TCPPeer::accept(mApp, socket); - if (peer) + + // Still call addInboundConnection to update metrics + mApp.getOverlayManager().maybeAddInboundConnection(peer); + + if (!peer) { - mApp.getOverlayManager().addInboundConnection(peer); + asio::error_code ec; + socket->close(ec); + if (ec) + { + CLOG_WARNING(Overlay, "TCPPeer: close socket failed: {}", + ec.message()); + } } acceptNextPeer(); } diff --git a/src/overlay/TCPPeer.cpp b/src/overlay/TCPPeer.cpp index 16f90c390c..bed780eb2e 100644 --- a/src/overlay/TCPPeer.cpp +++ b/src/overlay/TCPPeer.cpp @@ -34,8 +34,15 @@ using namespace std; TCPPeer::TCPPeer(Application& app, Peer::PeerRole role, std::shared_ptr socket) - : Peer(app, role), mSocket(socket) + : Peer(app, role) + , mSocket(socket) + , mLiveInboundPeersCounter( + app.getOverlayManager().getLiveInboundPeersCounter()) { + if (mRole == REMOTE_CALLED_US) + { + (*mLiveInboundPeersCounter)++; + } } TCPPeer::pointer @@ -54,17 +61,21 @@ TCPPeer::initiate(Application& app, PeerBareAddress const& address) socket->next_layer().async_connect( endpoint, [result](asio::error_code const& error) { asio::error_code ec; + asio::error_code lingerEc; if (!error) { asio::ip::tcp::no_delay nodelay(true); + asio::ip::tcp::socket::linger linger(false, 0); result->mSocket->next_layer().set_option(nodelay, ec); + result->mSocket->next_layer().set_option(linger, lingerEc); } else { ec = error; } - result->connectHandler(ec); + auto finalEc = ec ? ec : lingerEc; + result->connectHandler(finalEc); }); return result; } @@ -73,22 +84,36 @@ TCPPeer::pointer TCPPeer::accept(Application& app, shared_ptr socket) { assertThreadIsMain(); + + // First check if there's enough space to accept peer + // If not, do not even create a peer instance as to not trigger any + // additional reads and memory allocations + if (!app.getOverlayManager().haveSpaceForConnection(TCPPeer::getIP(socket))) + { + return nullptr; + } + shared_ptr result; asio::error_code ec; + asio::error_code lingerEc; asio::ip::tcp::no_delay nodelay(true); + asio::ip::tcp::socket::linger linger(false, 0); socket->next_layer().set_option(nodelay, ec); + socket->next_layer().set_option(linger, lingerEc); - if (!ec) + if (!ec && !lingerEc) { CLOG_DEBUG(Overlay, "TCPPeer:accept"); result = make_shared(app, REMOTE_CALLED_US, socket); + result->mAddress = PeerBareAddress{result->getIP(), 0}; result->startRecurrentTimer(); result->startRead(); } else { - CLOG_DEBUG(Overlay, "TCPPeer:accept error {}", ec.message()); + CLOG_DEBUG(Overlay, "TCPPeer:accept error {}", + ec ? ec.message() : lingerEc.message()); } return result; @@ -98,6 +123,10 @@ TCPPeer::~TCPPeer() { assertThreadIsMain(); Peer::shutdown(); + if (mRole == REMOTE_CALLED_US) + { + (*mLiveInboundPeersCounter)--; + } if (mSocket) { // Ignore: this indicates an attempt to cancel events @@ -115,11 +144,17 @@ TCPPeer::~TCPPeer() std::string TCPPeer::getIP() const +{ + return getIP(mSocket); +} + +std::string +TCPPeer::getIP(std::shared_ptr socket) { std::string result; asio::error_code ec; - auto ep = mSocket->next_layer().remote_endpoint(ec); + auto ep = socket->next_layer().remote_endpoint(ec); if (ec) { CLOG_ERROR(Overlay, "Could not determine remote endpoint: {}", @@ -680,13 +715,14 @@ TCPPeer::recvMessage() } catch (xdr::xdr_runtime_error& e) { - CLOG_ERROR(Overlay, "recvMessage got a corrupt xdr: {}", e.what()); + CLOG_ERROR(Overlay, "{} - recvMessage got a corrupt xdr: {}", + toString(), e.what()); sendErrorAndDrop(ERR_DATA, "received corrupt XDR", Peer::DropMode::IGNORE_WRITE_QUEUE); } catch (CryptoError const& e) { - CLOG_ERROR(Overlay, "Crypto error: {}", e.what()); + CLOG_ERROR(Overlay, "{} - Crypto error: {}", toString(), e.what()); sendErrorAndDrop(ERR_DATA, "crypto error", Peer::DropMode::IGNORE_WRITE_QUEUE); } diff --git a/src/overlay/TCPPeer.h b/src/overlay/TCPPeer.h index acc4496ee6..8e07ab640f 100644 --- a/src/overlay/TCPPeer.h +++ b/src/overlay/TCPPeer.h @@ -65,6 +65,29 @@ class TCPPeer : public Peer std::size_t expected_length); void shutdown(); + // This tracks the count of TCPPeers that are live and and originate in + // inbound connections. + // + // This is subtle: TCPPeers can be kept alive by shared references stored + // in ASIO completion events, because TCPPeers own buffers that ASIO + // operations write into. If we stored weak references in ASIO completion + // events, it would be possible for a TCPPeer to be destructed and + // write-buffers freed during the ASIO write into those buffers, which + // would cause memory corruption. + // + // As a retult, the lifetime of a TCPPeer is _not_ the same as the time it + // is known to the OverlayManager. We can drop a TCPPeer from the + // OverlayManager's registration a while before it's actually destroyed. + // To properly manage load, therefore, we have to separately track the + // number of actually-live TCPPeers. Since we're really only concerned + // with load-shedding inbound connections (we make our own outbound ones), + // we only track the inbound-live number. + // + // Furthermore the counter _itself_ has to be accessed as a shared pointer + // because any other central place we might track the live count (overlay + // manager or metrics) may be dead before the TCPPeer destructor runs. + std::shared_ptr mLiveInboundPeersCounter; + public: typedef std::shared_ptr pointer; @@ -76,6 +99,7 @@ class TCPPeer : public Peer static pointer initiate(Application& app, PeerBareAddress const& address); static pointer accept(Application& app, std::shared_ptr socket); + static std::string getIP(std::shared_ptr socket); virtual ~TCPPeer(); diff --git a/src/overlay/test/LoopbackPeer.cpp b/src/overlay/test/LoopbackPeer.cpp index 7b6269e305..9ecd7cbd61 100644 --- a/src/overlay/test/LoopbackPeer.cpp +++ b/src/overlay/test/LoopbackPeer.cpp @@ -52,7 +52,7 @@ LoopbackPeer::initiate(Application& app, Application& otherApp) PeerBareAddress{peer->getIP(), peer->getApp().getConfig().PEER_PORT}; app.getOverlayManager().addOutboundConnection(peer); - otherApp.getOverlayManager().addInboundConnection(otherPeer); + otherApp.getOverlayManager().maybeAddInboundConnection(otherPeer); // if connection was dropped during addPendingPeer, we don't want do call // connectHandler if (peer->mState != Peer::CONNECTED || otherPeer->mState != Peer::CONNECTED) diff --git a/src/util/Fs.cpp b/src/util/Fs.cpp index d0e5ad5f7a..f21799269a 100644 --- a/src/util/Fs.cpp +++ b/src/util/Fs.cpp @@ -8,6 +8,7 @@ #include "util/GlobalChecks.h" #include "util/Logging.h" #include +#include #include #include @@ -427,8 +428,8 @@ size(std::string const& filename) #ifdef _WIN32 -int -getMaxConnections() +int64_t +getMaxHandles() { // on Windows, there is no limit on handles // only limits based on ephemeral ports, etc @@ -436,8 +437,8 @@ getMaxConnections() } #else -int -getMaxConnections() +int64_t +getMaxHandles() { struct rlimit rl; if (getrlimit(RLIMIT_NOFILE, &rl) == 0) @@ -449,5 +450,55 @@ getMaxConnections() return 64; } #endif + +#if defined(_WIN32) +int64_t +getOpenHandleCount() +{ + HANDLE proc = + OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, GetCurrentProcessId()); + if (proc) + { + DWORD count{0}; + if (GetProcessHandleCount(proc, &count)) + { + return static_cast(count); + } + CloseHandle(proc); + } + return 0; +} +#elif defined(__APPLE__) +int64_t +getOpenHandleCount() +{ + int64_t n{0}; + for (auto const& _fd : std::filesystem::directory_iterator("/dev/fd")) + { + std::ignore = _fd; + ++n; + } + return n; +} +#elif defined(__linux__) +int64_t +getOpenHandleCount() +{ + int64_t n{0}; + for (auto const& _fd : std::filesystem::directory_iterator("/proc/self/fd")) + { + std::ignore = _fd; + ++n; + } + return n; +} +#else +int64_t +getOpenHandleCount() +{ + return 0; +} +#endif + } } diff --git a/src/util/Fs.h b/src/util/Fs.h index a87514b365..7af9a4697c 100644 --- a/src/util/Fs.h +++ b/src/util/Fs.h @@ -109,6 +109,11 @@ void checkGzipSuffix(std::string const& filename); void checkNoGzipSuffix(std::string const& filename); // returns the maximum number of connections that can be done at the same time -int getMaxConnections(); +int64_t getMaxHandles(); + +// On linux, count the number of fds in /proc/self/fd. On windows, get the +// process handle count. On other unixes return 0. +int64_t getOpenHandleCount(); + } }