Skip to content

Commit

Permalink
improve orchd destructor
Browse files Browse the repository at this point in the history
  • Loading branch information
a114j0y committed Jan 14, 2025
1 parent 6322b0f commit 729350c
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 137 deletions.
50 changes: 37 additions & 13 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,48 @@ int gBatchSize = 0;
RingBuffer* Orch::gRingBuffer = nullptr;
RingBuffer* Executor::gRingBuffer = nullptr;

void RingBuffer::pause_thread()
void RingBuffer::pauseThread()
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&](){ return !IsEmpty(); });
cv.wait(lock, [&](){ return !IsEmpty() || thread_exited; });
}

void RingBuffer::notify()
{
if (!IsEmpty() && Idle)
// buffer not empty but rthread idle
bool task_pending = !IsEmpty() && IsIdle();

if (thread_exited || task_pending)
cv.notify_all();
}

RingBuffer* RingBuffer::instance = nullptr;

RingBuffer* RingBuffer::Get()
RingBuffer* RingBuffer::get()
{
if (instance == nullptr) {
static RingBuffer instance_;
SWSS_LOG_NOTICE("Orchagent RingBuffer created at %p!", (void *)&instance_);
instance = &instance_;
instance = new RingBuffer();
SWSS_LOG_NOTICE("Orchagent RingBuffer created at %p!", (void *)instance);
}
return instance;
}

bool RingBuffer::IsFull()
void RingBuffer::setIdle(bool idle)
{
idle_status = idle;
}

bool RingBuffer::IsIdle() const
{
return idle_status;
}

bool RingBuffer::IsFull() const
{
return (tail + 1) % RING_SIZE == head;
}

bool RingBuffer::IsEmpty()
bool RingBuffer::IsEmpty() const
{
return tail == head;
}
Expand Down Expand Up @@ -77,11 +89,23 @@ void RingBuffer::addExecutor(Executor* executor)
m_consumerSet.insert(executor->getName());
}

bool RingBuffer::Serves(const std::string& tableName)
bool RingBuffer::serves(const std::string& tableName)
{
return m_consumerSet.find(tableName) != m_consumerSet.end();
}

void RingBuffer::release()
{
if (instance)
delete instance;
instance = nullptr;
}
RingBuffer* RingBuffer::reset()
{
release();
return get();
}

Orch::Orch(DBConnector *db, const string tableName, int pri)
{
addConsumer(db, tableName, pri);
Expand Down Expand Up @@ -338,15 +362,15 @@ void Consumer::execute()

void Executor::pushRingBuffer(AnyTask&& task)
{
if (!gRingBuffer || !gRingBuffer->threadCreated)
if (!gRingBuffer || !gRingBuffer->thread_created)
{
// execute the task right now in this thread if gRingBuffer is not initialized
// or the ring thread is not created, or this executor is not served by gRingBuffer
task();
}
else if (!gRingBuffer->Serves(getName())) // not served by ring thread
else if (!gRingBuffer->serves(getName())) // not served by ring thread
{
while (!gRingBuffer->IsEmpty() || !gRingBuffer->Idle) {
while (!gRingBuffer->IsEmpty() || !gRingBuffer->IsIdle()) {
gRingBuffer->notify();
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS));
}
Expand Down
20 changes: 13 additions & 7 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class RingBuffer

std::condition_variable cv;
std::mutex mtx;
bool idle_status = true;

protected:
RingBuffer(): buffer(RING_SIZE) {}
Expand All @@ -208,23 +209,28 @@ class RingBuffer
RingBuffer& operator= (const RingBuffer&) = delete;
RingBuffer& operator= (RingBuffer&&) = delete;

static RingBuffer* Get();
bool threadCreated = false;
bool Idle = true;
static void release();
static RingBuffer* reset();
static RingBuffer* get();

bool thread_created = false;
std::atomic<bool> thread_exited{false};

// pause the ring thread if the buffer is empty
void pause_thread();
void pauseThread();
// wake up the ring thread in case it's locked but not empty
void notify();

bool IsFull();
bool IsEmpty();
bool IsFull() const;
bool IsEmpty() const;
bool IsIdle() const;

bool push(AnyTask entry);
bool pop(AnyTask& entry);

void addExecutor(Executor* executor);
bool Serves(const std::string& tableName);
bool serves(const std::string& tableName);
void setIdle(bool idle);
};

class Consumer : public ConsumerBase {
Expand Down
34 changes: 22 additions & 12 deletions orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,13 @@ OrchDaemon::~OrchDaemon()
SWSS_LOG_ENTER();

// Stop the ring thread before delete orch pointers
if (gRingBuffer) {
ring_thread_exited = true;
if (ring_thread.joinable()) {
// notify the ring_thread to exit
gRingBuffer->thread_exited = true;
gRingBuffer->notify();
// wait for the ring_thread to exit
ring_thread.join();
disableRingBuffer();
}

/*
Expand All @@ -119,36 +123,42 @@ void OrchDaemon::popRingBuffer()
SWSS_LOG_ENTER();

// make sure there is only one thread created to run popRingBuffer()
if (!gRingBuffer || gRingBuffer->threadCreated)
if (!gRingBuffer || gRingBuffer->thread_created)
return;

gRingBuffer->threadCreated = true;
gRingBuffer->thread_created = true;
SWSS_LOG_NOTICE("OrchDaemon starts the popRingBuffer thread!");

while (!ring_thread_exited)
while (!gRingBuffer->thread_exited)
{
gRingBuffer->pause_thread();
gRingBuffer->pauseThread();

gRingBuffer->Idle = false;
gRingBuffer->setIdle(false);

AnyTask func;
while (gRingBuffer->pop(func)) {
func();
}

gRingBuffer->Idle = true;
gRingBuffer->setIdle(true);
}
}

/**
* This function initializes gRingBuffer, otherwise it's nullptr.
*/
void OrchDaemon::enableRingBuffer() {
gRingBuffer = RingBuffer::Get();
gRingBuffer = RingBuffer::get();
Executor::gRingBuffer = gRingBuffer;
Orch::gRingBuffer = gRingBuffer;
}

void OrchDaemon::disableRingBuffer() {
RingBuffer::release();
Executor::gRingBuffer = nullptr;
Orch::gRingBuffer = nullptr;
}

bool OrchDaemon::init()
{
SWSS_LOG_ENTER();
Expand Down Expand Up @@ -920,7 +930,7 @@ void OrchDaemon::start()
if (gRingBuffer)
{

if (!gRingBuffer->IsEmpty() || !gRingBuffer->Idle)
if (!gRingBuffer->IsEmpty() || !gRingBuffer->IsIdle())
{
gRingBuffer->notify();
}
Expand Down Expand Up @@ -948,7 +958,7 @@ void OrchDaemon::start()
/* After each iteration, periodically check all m_toSync map to
* execute all the remaining tasks that need to be retried. */

if (!gRingBuffer || (gRingBuffer->IsEmpty() && gRingBuffer->Idle))
if (!gRingBuffer || (gRingBuffer->IsEmpty() && gRingBuffer->IsIdle()))
for (Orch *o : m_orchList)
o->doTask();

Expand All @@ -967,7 +977,7 @@ void OrchDaemon::start()
// but should finish data that already in the ring
if (gRingBuffer)
{
while (!gRingBuffer->IsEmpty() || !gRingBuffer->Idle)
while (!gRingBuffer->IsEmpty() || !gRingBuffer->IsIdle())
{
gRingBuffer->notify();
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS));
Expand Down
12 changes: 6 additions & 6 deletions orchagent/orchdaemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class OrchDaemon
{
public:
OrchDaemon(DBConnector *, DBConnector *, DBConnector *, DBConnector *, ZmqServer *);
~OrchDaemon();
virtual ~OrchDaemon();

virtual bool init();
void start();
Expand Down Expand Up @@ -92,13 +92,17 @@ class OrchDaemon
* they are connected to the same ring.
*/
void enableRingBuffer();
void disableRingBuffer();
/**
* This method describes how the ring consumer consumes this ring.
*/
void popRingBuffer();

private:
RingBuffer* gRingBuffer = nullptr;

std::thread ring_thread;

private:
DBConnector *m_applDb;
DBConnector *m_configDb;
DBConnector *m_stateDb;
Expand All @@ -120,10 +124,6 @@ class OrchDaemon

void freezeAndHeartBeat(unsigned int duration);

protected:
/* Orchdaemon instance points to the same ring buffer during its lifetime */
RingBuffer* gRingBuffer = nullptr;
std::atomic<bool> ring_thread_exited{false};
};

class FabricOrchDaemon : public OrchDaemon
Expand Down
Loading

0 comments on commit 729350c

Please sign in to comment.