Skip to content

Commit

Permalink
remove singleton
Browse files Browse the repository at this point in the history
  • Loading branch information
a114j0y committed Jan 21, 2025
1 parent 729350c commit 95cf55a
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 149 deletions.
40 changes: 12 additions & 28 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@ using namespace swss;

int gBatchSize = 0;

RingBuffer* Orch::gRingBuffer = nullptr;
RingBuffer* Executor::gRingBuffer = nullptr;
std::shared_ptr<RingBuffer> Orch::gRingBuffer = nullptr;
std::shared_ptr<RingBuffer> Executor::gRingBuffer = nullptr;

RingBuffer::RingBuffer(int size): buffer(size)
{
if (size <= 1) {
throw std::invalid_argument("Buffer size must be greater than 1");
}
}

void RingBuffer::pauseThread()
{
Expand All @@ -35,17 +42,6 @@ void RingBuffer::notify()
cv.notify_all();
}

RingBuffer* RingBuffer::instance = nullptr;

RingBuffer* RingBuffer::get()
{
if (instance == nullptr) {
instance = new RingBuffer();
SWSS_LOG_NOTICE("Orchagent RingBuffer created at %p!", (void *)instance);
}
return instance;
}

void RingBuffer::setIdle(bool idle)
{
idle_status = idle;
Expand All @@ -58,7 +54,7 @@ bool RingBuffer::IsIdle() const

bool RingBuffer::IsFull() const
{
return (tail + 1) % RING_SIZE == head;
return (tail + 1) % static_cast<int>(buffer.size()) == head;
}

bool RingBuffer::IsEmpty() const
Expand All @@ -71,7 +67,7 @@ bool RingBuffer::push(AnyTask ringEntry)
if (IsFull())
return false;
buffer[tail] = std::move(ringEntry);
tail = (tail + 1) % RING_SIZE;
tail = (tail + 1) % static_cast<int>(buffer.size());
return true;
}

Expand All @@ -80,7 +76,7 @@ bool RingBuffer::pop(AnyTask& ringEntry)
if (IsEmpty())
return false;
ringEntry = std::move(buffer[head]);
head = (head + 1) % RING_SIZE;
head = (head + 1) % static_cast<int>(buffer.size());
return true;
}

Expand All @@ -94,18 +90,6 @@ bool RingBuffer::serves(const std::string& tableName)
return m_consumerSet.find(tableName) != m_consumerSet.end();
}

void RingBuffer::release()
{
if (instance)
delete instance;
instance = nullptr;
}
RingBuffer* RingBuffer::reset()
{
release();
return get();
}

Orch::Orch(DBConnector *db, const string tableName, int pri)
{
addConsumer(db, tableName, pri);
Expand Down
21 changes: 3 additions & 18 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class Executor : public swss::Selectable
}

Orch *getOrch() const { return m_orch; }
static RingBuffer* gRingBuffer;
static std::shared_ptr<RingBuffer> gRingBuffer;
void pushRingBuffer(AnyTask&& func);

protected:
Expand Down Expand Up @@ -187,7 +187,6 @@ class ConsumerBase : public Executor {
class RingBuffer
{
private:
static RingBuffer* instance;
std::vector<AnyTask> buffer;
int head = 0;
int tail = 0;
Expand All @@ -197,22 +196,8 @@ class RingBuffer
std::mutex mtx;
bool idle_status = true;

protected:
RingBuffer(): buffer(RING_SIZE) {}
~RingBuffer() {
instance = nullptr;
}

public:
RingBuffer(const RingBuffer&) = delete;
RingBuffer(RingBuffer&&) = delete;
RingBuffer& operator= (const RingBuffer&) = delete;
RingBuffer& operator= (RingBuffer&&) = delete;

static void release();
static RingBuffer* reset();
static RingBuffer* get();

RingBuffer(int size=RING_SIZE);
bool thread_created = false;
std::atomic<bool> thread_exited{false};

Expand Down Expand Up @@ -290,7 +275,7 @@ class Orch
Orch(const std::vector<TableConnector>& tables);
virtual ~Orch() = default;

static RingBuffer* gRingBuffer;
static std::shared_ptr<RingBuffer> gRingBuffer;

std::vector<swss::Selectable*> getSelectables();

Expand Down
5 changes: 3 additions & 2 deletions orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,14 @@ void OrchDaemon::popRingBuffer()
* This function initializes gRingBuffer, otherwise it's nullptr.
*/
void OrchDaemon::enableRingBuffer() {
gRingBuffer = RingBuffer::get();
gRingBuffer = std::make_shared<RingBuffer>();
Executor::gRingBuffer = gRingBuffer;
Orch::gRingBuffer = gRingBuffer;
SWSS_LOG_NOTICE("RingBuffer created at %p!", (void *)gRingBuffer.get());
}

void OrchDaemon::disableRingBuffer() {
RingBuffer::release();
gRingBuffer = nullptr;
Executor::gRingBuffer = nullptr;
Orch::gRingBuffer = nullptr;
}
Expand Down
2 changes: 1 addition & 1 deletion orchagent/orchdaemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class OrchDaemon
*/
void popRingBuffer();

RingBuffer* gRingBuffer = nullptr;
std::shared_ptr<RingBuffer> gRingBuffer = nullptr;

std::thread ring_thread;

Expand Down
167 changes: 67 additions & 100 deletions tests/mock_tests/orchdaemon_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#define protected public
#include "orch.h"
#include "orchdaemon.h"
#undef protected
#include "dbconnector.h"
#include <gtest/gtest.h>
#include <gmock/gmock.h>
Expand Down Expand Up @@ -26,11 +29,6 @@ namespace orchdaemon_test

OrchDaemon* orchd;

RingBuffer* gRingBuffer = RingBuffer::get();

std::shared_ptr<Consumer> consumer;

std::shared_ptr<Orch> orch;
OrchDaemonTest()
{
mock_sai_switch = &mock_sai_switch_;
Expand All @@ -48,14 +46,6 @@ namespace orchdaemon_test
delete orchd;
};

void SetUp() override {
gRingBuffer = RingBuffer::reset();
}

void TearDown() override
{
RingBuffer::release();
}
};

TEST_F(OrchDaemonTest, logRotate)
Expand All @@ -65,137 +55,114 @@ namespace orchdaemon_test
orchd->logRotate();
}

TEST_F(OrchDaemonTest, ringBuffer)
{
int test_ring_size = 2;

auto ring = new RingBuffer(test_ring_size);

for (int i = 0; i < test_ring_size - 1; i++)
{
EXPECT_TRUE(ring->push([](){}));
}
EXPECT_FALSE(ring->push([](){}));

AnyTask task;
for (int i = 0; i < test_ring_size - 1; i++)
{
EXPECT_TRUE(ring->pop(task));
}

EXPECT_FALSE(ring->pop(task));

ring->setIdle(true);
EXPECT_TRUE(ring->IsIdle());
delete ring;
}

TEST_F(OrchDaemonTest, RingThread)
{
orchd->enableRingBuffer();

// verify ring buffer is created
EXPECT_TRUE(Executor::gRingBuffer != nullptr);
EXPECT_TRUE(Executor::gRingBuffer == Orch::gRingBuffer);

orchd->ring_thread = std::thread(&OrchDaemon::popRingBuffer, orchd);
auto gRingBuffer = orchd->gRingBuffer;

while (!RingBuffer::get()->thread_created)
// verify ring_thread is created
while (!gRingBuffer->thread_created)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

bool task_executed = false;
AnyTask task = [&task_executed]() { task_executed = true;};
RingBuffer::get()->push(task);
gRingBuffer->push(task);

EXPECT_TRUE(RingBuffer::get()->IsIdle());
// verify ring thread is conditional locked
EXPECT_TRUE(gRingBuffer->IsIdle());
EXPECT_FALSE(task_executed);

RingBuffer::get()->notify();
gRingBuffer->notify();

while (!RingBuffer::get()->IsEmpty() || !RingBuffer::get()->IsIdle())
// verify notify() would activate the ring thread when buffer is not empty
while (!gRingBuffer->IsEmpty() || !gRingBuffer->IsIdle())
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

EXPECT_TRUE(task_executed);

EXPECT_TRUE(orchd->ring_thread.joinable());

delete orchd;

// verify the destructor of orchdaemon will stop the ring thread
EXPECT_FALSE(orchd->ring_thread.joinable());
// verify the destructor of orchdaemon also resets ring buffer
EXPECT_TRUE(Executor::gRingBuffer == nullptr);

// reset the orchd
// reset the orchd for other testcases
orchd = new OrchDaemon(&appl_db, &config_db, &state_db, &counters_db, nullptr);
}

TEST_F(OrchDaemonTest, PushAnyTask)
TEST_F(OrchDaemonTest, PushRingBuffer)
{
orchd->enableRingBuffer();

orch = make_shared<Orch>(&appl_db, "ROUTE_TABLE", 0);
consumer = make_shared<Consumer>(new swss::ConsumerStateTable(&appl_db, "ROUTE_TABLE", 128, 1), orch.get(), "ROUTE_TABLE");
auto gRingBuffer = orchd->gRingBuffer;

std::vector<std::string> tables = {"ROUTE_TABLE", "OTHER_TABLE"};
auto orch = make_shared<Orch>(&appl_db, tables);
auto route_consumer = dynamic_cast<Consumer *>(orch->getExecutor("ROUTE_TABLE"));
auto other_consumer = dynamic_cast<Consumer *>(orch->getExecutor("OTHER_TABLE"));

EXPECT_TRUE(gRingBuffer->serves("ROUTE_TABLE"));
EXPECT_FALSE(gRingBuffer->serves("OTHER_TABLE"));
EXPECT_TRUE(gRingBuffer->IsEmpty());

int x = 1;
int y = 3;
AnyTask t1 = [&](){x=2;};
AnyTask t2 = [](){};
AnyTask t3 = [&](){x=3;y=2;};

gRingBuffer->push(t1);
gRingBuffer->push(t2);
EXPECT_FALSE(gRingBuffer->IsEmpty());

gRingBuffer->pop(t3);
t3();
EXPECT_TRUE(x==2);
EXPECT_TRUE(y==3);
int x = 0;
route_consumer->pushRingBuffer([&](){x=3;});
// verify `pushRingBuffer` is equivalent to executing the task immediately
EXPECT_TRUE(gRingBuffer->IsEmpty() && gRingBuffer->IsIdle() && !gRingBuffer->thread_created && x==3);

EXPECT_TRUE(gRingBuffer->pop(t3));
EXPECT_FALSE(gRingBuffer->pop(t3));
gRingBuffer->thread_created = true; // set the flag to assume the ring thread is created (actually not)

consumer->pushRingBuffer([&](){x=3;});
EXPECT_TRUE(x==3);
// verify `pushRingBuffer` is equivalent to executing the task immediately when ring is empty and idle
other_consumer->pushRingBuffer([&](){x=4;});
EXPECT_TRUE(gRingBuffer->IsEmpty() && gRingBuffer->IsIdle() && x==4);

gRingBuffer->thread_created = true;
consumer->pushRingBuffer([&](){x=4;});
EXPECT_TRUE(x==3);
route_consumer->pushRingBuffer([&](){x=5;});
// verify `pushRingBuffer` would not execute the task if thread_created is true
// it only pushes the task to the ring buffer, without executing it
EXPECT_TRUE(!gRingBuffer->IsEmpty() && x==4);

gRingBuffer->pop(t3);
t3();
EXPECT_TRUE(x==4);
AnyTask task;
gRingBuffer->pop(task);
task();
// hence the task needs to be popped and explicitly executed
EXPECT_TRUE(gRingBuffer->IsEmpty() && x==5);

orchd->disableRingBuffer();
}

TEST_F(OrchDaemonTest, ThreadPauseAndNotify) {

bool thread_finished = false;
std::thread t([this, &thread_finished]() {
gRingBuffer->setIdle(true);
gRingBuffer->pauseThread();
thread_finished = true;
});
std::this_thread::sleep_for(std::chrono::milliseconds(100));

AnyTask task = []() { };
EXPECT_TRUE(gRingBuffer->push(task));
gRingBuffer->notify();

t.join();
EXPECT_TRUE(thread_finished);
}

TEST_F(OrchDaemonTest, MultiThread) {
std::vector<std::thread> producers;
std::vector<std::thread> consumers;

for (int i = 0; i < 3; i++) {
producers.emplace_back([this]() {
AnyTask task = []() { };
for (int j = 0; j < 10; j++) {
gRingBuffer->push(task);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
});
}

for (int i = 0; i < 3; i++) {
consumers.emplace_back([this]() {
for (int j = 0; j < 10; j++) {
AnyTask task;
while (!gRingBuffer->pop(task)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
});
}

for (auto& t : producers) {
t.join();
}
for (auto& t : consumers) {
t.join();
}
}

}

0 comments on commit 95cf55a

Please sign in to comment.