From bf7428041b584551280b0736a039fda407dfc457 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 20 Feb 2024 13:07:38 +0100 Subject: [PATCH] CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot This is inefficient and involves unfair scheduling. The latter implies possible bad surprises regarding waiting durations on busy nodes. Instead, use AsioConditionVariable#Wait() if there are no free slots. It's notified by others' CpuBoundWork#~CpuBoundWork() once finished. --- lib/base/io-engine.cpp | 59 ++++++++++++++++++++++++++++++++++-------- lib/base/io-engine.hpp | 14 ++++++++-- 2 files changed, 60 insertions(+), 13 deletions(-) diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 871981a82ad..eb3ca105256 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -16,28 +16,61 @@ using namespace icinga; -CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&) +CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand) : m_Done(false) { auto& ioEngine (IoEngine::Get()); + auto& sem (ioEngine.m_CpuBoundSemaphore); + std::unique_lock lock (sem.Mutex); - for (;;) { - auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); + if (sem.FreeSlots) { + --sem.FreeSlots; + return; + } - if (availableSlots < 1) { - ioEngine.m_CpuBoundSemaphore.fetch_add(1); - IoEngine::YieldCurrentCoroutine(yc); - continue; - } + AsioConditionVariable cv (ioEngine.GetIoContext()); + + sem.Waiting.emplace(&strand, &cv); + lock.unlock(); - break; + try { + cv.Wait(yc); + } catch (...) { + Done(); + throw; } } +class SetAsioCV +{ +public: + inline SetAsioCV(AsioConditionVariable& cv) : m_CV(&cv) + { + } + + inline void operator()() + { + m_CV->Set(); + } + +private: + AsioConditionVariable* m_CV; +}; + void CpuBoundWork::Done() { if (!m_Done) { - IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); + auto& sem (IoEngine::Get().m_CpuBoundSemaphore); + std::unique_lock lock (sem.Mutex); + + if (sem.Waiting.empty()) { + ++sem.FreeSlots; + } else { + auto next (sem.Waiting.front()); + sem.Waiting.pop(); + + boost::asio::post(*next.first, SetAsioCV(*next.second)); + } m_Done = true; } @@ -58,7 +91,11 @@ boost::asio::io_context& IoEngine::GetIoContext() IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext) { m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin); - m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u); + + { + std::unique_lock lock (m_CpuBoundSemaphore.Mutex); + m_CpuBoundSemaphore.FreeSlots = Configuration::Concurrency * 3u / 2u; + } for (auto& thread : m_Threads) { thread = std::thread(&IoEngine::RunEventLoop, this); diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 622a92dd00c..c714450f5de 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -8,8 +8,11 @@ #include "base/logger.hpp" #include "base/shared-object.hpp" #include +#include #include #include +#include +#include #include #include #include @@ -31,7 +34,7 @@ namespace icinga class CpuBoundWork { public: - CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&); + CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand); CpuBoundWork(const CpuBoundWork&) = delete; CpuBoundWork(CpuBoundWork&&) = delete; CpuBoundWork& operator=(const CpuBoundWork&) = delete; @@ -48,6 +51,8 @@ class CpuBoundWork bool m_Done; }; +class AsioConditionVariable; + /** * Async I/O engine * @@ -120,7 +125,12 @@ class IoEngine boost::asio::executor_work_guard m_KeepAlive; std::vector m_Threads; boost::asio::deadline_timer m_AlreadyExpiredTimer; - std::atomic_int_fast32_t m_CpuBoundSemaphore; + + struct { + std::mutex Mutex; + uint_fast32_t FreeSlots; + std::queue> Waiting; + } m_CpuBoundSemaphore; }; class TerminateIoThread : public std::exception