From 95cf55a8804499b65d31395f086aef5a87b0a730 Mon Sep 17 00:00:00 2001 From: Joy Date: Mon, 16 Dec 2024 22:57:45 +0000 Subject: [PATCH] remove singleton --- orchagent/orch.cpp | 40 +++---- orchagent/orch.h | 21 +--- orchagent/orchdaemon.cpp | 5 +- orchagent/orchdaemon.h | 2 +- tests/mock_tests/orchdaemon_ut.cpp | 167 ++++++++++++----------------- 5 files changed, 86 insertions(+), 149 deletions(-) diff --git a/orchagent/orch.cpp b/orchagent/orch.cpp index e9a87c9920..3bfcbee538 100644 --- a/orchagent/orch.cpp +++ b/orchagent/orch.cpp @@ -17,8 +17,15 @@ using namespace swss; int gBatchSize = 0; -RingBuffer* Orch::gRingBuffer = nullptr; -RingBuffer* Executor::gRingBuffer = nullptr; +std::shared_ptr Orch::gRingBuffer = nullptr; +std::shared_ptr Executor::gRingBuffer = nullptr; + +RingBuffer::RingBuffer(int size): buffer(size) +{ + if (size <= 1) { + throw std::invalid_argument("Buffer size must be greater than 1"); + } +} void RingBuffer::pauseThread() { @@ -35,17 +42,6 @@ void RingBuffer::notify() cv.notify_all(); } -RingBuffer* RingBuffer::instance = nullptr; - -RingBuffer* RingBuffer::get() -{ - if (instance == nullptr) { - instance = new RingBuffer(); - SWSS_LOG_NOTICE("Orchagent RingBuffer created at %p!", (void *)instance); - } - return instance; -} - void RingBuffer::setIdle(bool idle) { idle_status = idle; @@ -58,7 +54,7 @@ bool RingBuffer::IsIdle() const bool RingBuffer::IsFull() const { - return (tail + 1) % RING_SIZE == head; + return (tail + 1) % static_cast(buffer.size()) == head; } bool RingBuffer::IsEmpty() const @@ -71,7 +67,7 @@ bool RingBuffer::push(AnyTask ringEntry) if (IsFull()) return false; buffer[tail] = std::move(ringEntry); - tail = (tail + 1) % RING_SIZE; + tail = (tail + 1) % static_cast(buffer.size()); return true; } @@ -80,7 +76,7 @@ bool RingBuffer::pop(AnyTask& ringEntry) if (IsEmpty()) return false; ringEntry = std::move(buffer[head]); - head = (head + 1) % RING_SIZE; + head = (head + 1) % static_cast(buffer.size()); return true; } @@ -94,18 +90,6 @@ bool RingBuffer::serves(const std::string& tableName) return m_consumerSet.find(tableName) != m_consumerSet.end(); } -void RingBuffer::release() -{ - if (instance) - delete instance; - instance = nullptr; -} -RingBuffer* RingBuffer::reset() -{ - release(); - return get(); -} - Orch::Orch(DBConnector *db, const string tableName, int pri) { addConsumer(db, tableName, pri); diff --git a/orchagent/orch.h b/orchagent/orch.h index 78d6a6579e..1023ee5fda 100644 --- a/orchagent/orch.h +++ b/orchagent/orch.h @@ -134,7 +134,7 @@ class Executor : public swss::Selectable } Orch *getOrch() const { return m_orch; } - static RingBuffer* gRingBuffer; + static std::shared_ptr gRingBuffer; void pushRingBuffer(AnyTask&& func); protected: @@ -187,7 +187,6 @@ class ConsumerBase : public Executor { class RingBuffer { private: - static RingBuffer* instance; std::vector buffer; int head = 0; int tail = 0; @@ -197,22 +196,8 @@ class RingBuffer std::mutex mtx; bool idle_status = true; -protected: - RingBuffer(): buffer(RING_SIZE) {} - ~RingBuffer() { - instance = nullptr; - } - public: - RingBuffer(const RingBuffer&) = delete; - RingBuffer(RingBuffer&&) = delete; - RingBuffer& operator= (const RingBuffer&) = delete; - RingBuffer& operator= (RingBuffer&&) = delete; - - static void release(); - static RingBuffer* reset(); - static RingBuffer* get(); - + RingBuffer(int size=RING_SIZE); bool thread_created = false; std::atomic thread_exited{false}; @@ -290,7 +275,7 @@ class Orch Orch(const std::vector& tables); virtual ~Orch() = default; - static RingBuffer* gRingBuffer; + static std::shared_ptr gRingBuffer; std::vector getSelectables(); diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index ae44d0fa95..7518732b5d 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -148,13 +148,14 @@ void OrchDaemon::popRingBuffer() * This function initializes gRingBuffer, otherwise it's nullptr. */ void OrchDaemon::enableRingBuffer() { - gRingBuffer = RingBuffer::get(); + gRingBuffer = std::make_shared(); Executor::gRingBuffer = gRingBuffer; Orch::gRingBuffer = gRingBuffer; + SWSS_LOG_NOTICE("RingBuffer created at %p!", (void *)gRingBuffer.get()); } void OrchDaemon::disableRingBuffer() { - RingBuffer::release(); + gRingBuffer = nullptr; Executor::gRingBuffer = nullptr; Orch::gRingBuffer = nullptr; } diff --git a/orchagent/orchdaemon.h b/orchagent/orchdaemon.h index d561c47dbf..ed778347b2 100644 --- a/orchagent/orchdaemon.h +++ b/orchagent/orchdaemon.h @@ -98,7 +98,7 @@ class OrchDaemon */ void popRingBuffer(); - RingBuffer* gRingBuffer = nullptr; + std::shared_ptr gRingBuffer = nullptr; std::thread ring_thread; diff --git a/tests/mock_tests/orchdaemon_ut.cpp b/tests/mock_tests/orchdaemon_ut.cpp index 29edaa1b68..b684cc01aa 100644 --- a/tests/mock_tests/orchdaemon_ut.cpp +++ b/tests/mock_tests/orchdaemon_ut.cpp @@ -1,4 +1,7 @@ +#define protected public +#include "orch.h" #include "orchdaemon.h" +#undef protected #include "dbconnector.h" #include #include @@ -26,11 +29,6 @@ namespace orchdaemon_test OrchDaemon* orchd; - RingBuffer* gRingBuffer = RingBuffer::get(); - - std::shared_ptr consumer; - - std::shared_ptr orch; OrchDaemonTest() { mock_sai_switch = &mock_sai_switch_; @@ -48,14 +46,6 @@ namespace orchdaemon_test delete orchd; }; - void SetUp() override { - gRingBuffer = RingBuffer::reset(); - } - - void TearDown() override - { - RingBuffer::release(); - } }; TEST_F(OrchDaemonTest, logRotate) @@ -65,137 +55,114 @@ namespace orchdaemon_test orchd->logRotate(); } + TEST_F(OrchDaemonTest, ringBuffer) + { + int test_ring_size = 2; + + auto ring = new RingBuffer(test_ring_size); + + for (int i = 0; i < test_ring_size - 1; i++) + { + EXPECT_TRUE(ring->push([](){})); + } + EXPECT_FALSE(ring->push([](){})); + + AnyTask task; + for (int i = 0; i < test_ring_size - 1; i++) + { + EXPECT_TRUE(ring->pop(task)); + } + + EXPECT_FALSE(ring->pop(task)); + + ring->setIdle(true); + EXPECT_TRUE(ring->IsIdle()); + delete ring; + } + TEST_F(OrchDaemonTest, RingThread) { orchd->enableRingBuffer(); + // verify ring buffer is created EXPECT_TRUE(Executor::gRingBuffer != nullptr); EXPECT_TRUE(Executor::gRingBuffer == Orch::gRingBuffer); orchd->ring_thread = std::thread(&OrchDaemon::popRingBuffer, orchd); + auto gRingBuffer = orchd->gRingBuffer; - while (!RingBuffer::get()->thread_created) + // verify ring_thread is created + while (!gRingBuffer->thread_created) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } bool task_executed = false; AnyTask task = [&task_executed]() { task_executed = true;}; - RingBuffer::get()->push(task); + gRingBuffer->push(task); - EXPECT_TRUE(RingBuffer::get()->IsIdle()); + // verify ring thread is conditional locked + EXPECT_TRUE(gRingBuffer->IsIdle()); + EXPECT_FALSE(task_executed); - RingBuffer::get()->notify(); + gRingBuffer->notify(); - while (!RingBuffer::get()->IsEmpty() || !RingBuffer::get()->IsIdle()) + // verify notify() would activate the ring thread when buffer is not empty + while (!gRingBuffer->IsEmpty() || !gRingBuffer->IsIdle()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } EXPECT_TRUE(task_executed); - EXPECT_TRUE(orchd->ring_thread.joinable()); - delete orchd; + // verify the destructor of orchdaemon will stop the ring thread EXPECT_FALSE(orchd->ring_thread.joinable()); + // verify the destructor of orchdaemon also resets ring buffer + EXPECT_TRUE(Executor::gRingBuffer == nullptr); - // reset the orchd + // reset the orchd for other testcases orchd = new OrchDaemon(&appl_db, &config_db, &state_db, &counters_db, nullptr); } - TEST_F(OrchDaemonTest, PushAnyTask) + TEST_F(OrchDaemonTest, PushRingBuffer) { orchd->enableRingBuffer(); - orch = make_shared(&appl_db, "ROUTE_TABLE", 0); - consumer = make_shared(new swss::ConsumerStateTable(&appl_db, "ROUTE_TABLE", 128, 1), orch.get(), "ROUTE_TABLE"); + auto gRingBuffer = orchd->gRingBuffer; + + std::vector tables = {"ROUTE_TABLE", "OTHER_TABLE"}; + auto orch = make_shared(&appl_db, tables); + auto route_consumer = dynamic_cast(orch->getExecutor("ROUTE_TABLE")); + auto other_consumer = dynamic_cast(orch->getExecutor("OTHER_TABLE")); EXPECT_TRUE(gRingBuffer->serves("ROUTE_TABLE")); EXPECT_FALSE(gRingBuffer->serves("OTHER_TABLE")); - EXPECT_TRUE(gRingBuffer->IsEmpty()); - - int x = 1; - int y = 3; - AnyTask t1 = [&](){x=2;}; - AnyTask t2 = [](){}; - AnyTask t3 = [&](){x=3;y=2;}; - - gRingBuffer->push(t1); - gRingBuffer->push(t2); - EXPECT_FALSE(gRingBuffer->IsEmpty()); - gRingBuffer->pop(t3); - t3(); - EXPECT_TRUE(x==2); - EXPECT_TRUE(y==3); + int x = 0; + route_consumer->pushRingBuffer([&](){x=3;}); + // verify `pushRingBuffer` is equivalent to executing the task immediately + EXPECT_TRUE(gRingBuffer->IsEmpty() && gRingBuffer->IsIdle() && !gRingBuffer->thread_created && x==3); - EXPECT_TRUE(gRingBuffer->pop(t3)); - EXPECT_FALSE(gRingBuffer->pop(t3)); + gRingBuffer->thread_created = true; // set the flag to assume the ring thread is created (actually not) - consumer->pushRingBuffer([&](){x=3;}); - EXPECT_TRUE(x==3); + // verify `pushRingBuffer` is equivalent to executing the task immediately when ring is empty and idle + other_consumer->pushRingBuffer([&](){x=4;}); + EXPECT_TRUE(gRingBuffer->IsEmpty() && gRingBuffer->IsIdle() && x==4); - gRingBuffer->thread_created = true; - consumer->pushRingBuffer([&](){x=4;}); - EXPECT_TRUE(x==3); + route_consumer->pushRingBuffer([&](){x=5;}); + // verify `pushRingBuffer` would not execute the task if thread_created is true + // it only pushes the task to the ring buffer, without executing it + EXPECT_TRUE(!gRingBuffer->IsEmpty() && x==4); - gRingBuffer->pop(t3); - t3(); - EXPECT_TRUE(x==4); + AnyTask task; + gRingBuffer->pop(task); + task(); + // hence the task needs to be popped and explicitly executed + EXPECT_TRUE(gRingBuffer->IsEmpty() && x==5); orchd->disableRingBuffer(); } - TEST_F(OrchDaemonTest, ThreadPauseAndNotify) { - - bool thread_finished = false; - std::thread t([this, &thread_finished]() { - gRingBuffer->setIdle(true); - gRingBuffer->pauseThread(); - thread_finished = true; - }); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - AnyTask task = []() { }; - EXPECT_TRUE(gRingBuffer->push(task)); - gRingBuffer->notify(); - - t.join(); - EXPECT_TRUE(thread_finished); - } - - TEST_F(OrchDaemonTest, MultiThread) { - std::vector producers; - std::vector consumers; - - for (int i = 0; i < 3; i++) { - producers.emplace_back([this]() { - AnyTask task = []() { }; - for (int j = 0; j < 10; j++) { - gRingBuffer->push(task); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - }); - } - - for (int i = 0; i < 3; i++) { - consumers.emplace_back([this]() { - for (int j = 0; j < 10; j++) { - AnyTask task; - while (!gRingBuffer->pop(task)) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } - }); - } - - for (auto& t : producers) { - t.join(); - } - for (auto& t : consumers) { - t.join(); - } - } - }