Skip to content

Commit

Permalink
modify destructor of rb and remove template
Browse files Browse the repository at this point in the history
  • Loading branch information
a114j0y committed Jan 14, 2025
1 parent e30a097 commit 6322b0f
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 329 deletions.
66 changes: 64 additions & 2 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,70 @@ using namespace swss;

int gBatchSize = 0;

OrchRing* Orch::gRingBuffer = nullptr;
OrchRing* Executor::gRingBuffer = nullptr;
RingBuffer* Orch::gRingBuffer = nullptr;
RingBuffer* Executor::gRingBuffer = nullptr;

void RingBuffer::pause_thread()
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&](){ return !IsEmpty(); });
}

void RingBuffer::notify()
{
if (!IsEmpty() && Idle)
cv.notify_all();
}

RingBuffer* RingBuffer::instance = nullptr;

RingBuffer* RingBuffer::Get()
{
if (instance == nullptr) {
static RingBuffer instance_;
SWSS_LOG_NOTICE("Orchagent RingBuffer created at %p!", (void *)&instance_);
instance = &instance_;
}
return instance;
}

bool RingBuffer::IsFull()
{
return (tail + 1) % RING_SIZE == head;
}

bool RingBuffer::IsEmpty()
{
return tail == head;
}

bool RingBuffer::push(AnyTask ringEntry)
{
if (IsFull())
return false;
buffer[tail] = std::move(ringEntry);
tail = (tail + 1) % RING_SIZE;
return true;
}

bool RingBuffer::pop(AnyTask& ringEntry)
{
if (IsEmpty())
return false;
ringEntry = std::move(buffer[head]);
head = (head + 1) % RING_SIZE;
return true;
}

void RingBuffer::addExecutor(Executor* executor)
{
m_consumerSet.insert(executor->getName());
}

bool RingBuffer::Serves(const std::string& tableName)
{
return m_consumerSet.find(tableName) != m_consumerSet.end();
}

Orch::Orch(DBConnector *db, const string tableName, int pri)
{
Expand Down
119 changes: 17 additions & 102 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const char state_db_key_delimiter = '|';
#define DEFAULT_KEY_SEPARATOR ":"
#define VLAN_SUB_INTERFACE_SEPARATOR "."

#define ORCH_RING_SIZE 30
#define RING_SIZE 30
#define SLEEP_MSECONDS 500

const int default_orch_pri = 0;
Expand Down Expand Up @@ -94,9 +94,8 @@ typedef std::pair<std::string, int> table_name_with_pri_t;
class Orch;

using AnyTask = std::function<void()>;
template<typename DataType, int RingSize>

class RingBuffer;
typedef RingBuffer<AnyTask, ORCH_RING_SIZE> OrchRing;

// Design assumption
// 1. one Orch can have one or more Executor
Expand Down Expand Up @@ -135,7 +134,7 @@ class Executor : public swss::Selectable
}

Orch *getOrch() const { return m_orch; }
static OrchRing* gRingBuffer;
static RingBuffer* gRingBuffer;
void pushRingBuffer(AnyTask&& func);

protected:
Expand Down Expand Up @@ -185,32 +184,31 @@ class ConsumerBase : public Executor {
size_t refillToSync(swss::Table* table);
};

template<typename DataType, int RingSize>
class RingBuffer
{
private:
static RingBuffer<DataType, RingSize>* instance;
std::vector<DataType> buffer;
static RingBuffer* instance;
std::vector<AnyTask> buffer;
int head = 0;
int tail = 0;
ConsumerMap m_consumerMap;
std::set<std::string> m_consumerSet;

std::condition_variable cv;
std::mutex mtx;

protected:
RingBuffer<DataType, RingSize>(): buffer(RingSize) {}
~RingBuffer<DataType, RingSize>() {
delete instance;
RingBuffer(): buffer(RING_SIZE) {}
~RingBuffer() {
instance = nullptr;
}

public:
RingBuffer<DataType, RingSize>(const RingBuffer<DataType, RingSize>&) = delete;
RingBuffer<DataType, RingSize>(RingBuffer<DataType, RingSize>&&) = delete;
RingBuffer<DataType, RingSize>& operator= (const RingBuffer<DataType, RingSize>&) = delete;
RingBuffer<DataType, RingSize>& operator= (RingBuffer<DataType, RingSize>&&) = delete;
RingBuffer(const RingBuffer&) = delete;
RingBuffer(RingBuffer&&) = delete;
RingBuffer& operator= (const RingBuffer&) = delete;
RingBuffer& operator= (RingBuffer&&) = delete;

static RingBuffer<DataType, RingSize>* Get();
static RingBuffer* Get();
bool threadCreated = false;
bool Idle = true;

Expand All @@ -222,96 +220,13 @@ class RingBuffer
bool IsFull();
bool IsEmpty();

bool push(DataType entry);
bool pop(DataType& entry);
bool push(AnyTask entry);
bool pop(AnyTask& entry);

void addExecutor(Executor* executor);
bool Serves(const std::string& tableName);
};

template<typename DataType, int RingSize>
void RingBuffer<DataType, RingSize>::pause_thread()
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&](){ return !IsEmpty(); });
}

template<typename DataType, int RingSize>
void RingBuffer<DataType, RingSize>::notify()
{
if (!IsEmpty() && Idle)
cv.notify_all();
}

template<typename DataType, int RingSize>
RingBuffer<DataType, RingSize>* RingBuffer<DataType, RingSize>::instance = nullptr;

template<typename DataType, int RingSize>
RingBuffer<DataType, RingSize>* RingBuffer<DataType, RingSize>::Get()
{
if (instance == nullptr) {
instance = new RingBuffer<DataType, RingSize>();
SWSS_LOG_NOTICE("Orchagent RingBuffer created at %p!", (void *)instance);
}
return instance;
}

template<typename DataType, int RingSize>
bool RingBuffer<DataType, RingSize>::IsFull()
{
return (tail + 1) % RingSize == head;
}

template<typename DataType, int RingSize>
bool RingBuffer<DataType, RingSize>::IsEmpty()
{
return tail == head;
}

template<typename DataType, int RingSize>
bool RingBuffer<DataType, RingSize>::push(DataType ringEntry)
{
if (IsFull())
return false;
buffer[tail] = std::move(ringEntry);
tail = (tail + 1) % RingSize;
return true;
}

template<typename DataType, int RingSize>
bool RingBuffer<DataType, RingSize>::pop(DataType& ringEntry)
{
if (IsEmpty())
return false;
ringEntry = std::move(buffer[head]);
head = (head + 1) % RingSize;
return true;
}

template<typename DataType, int RingSize>
void RingBuffer<DataType, RingSize>::addExecutor(Executor* executor)
{
auto inserted = m_consumerMap.emplace(std::piecewise_construct,
std::forward_as_tuple(executor->getName()),
std::forward_as_tuple(executor));

// If there is duplication of executorName in m_consumerMap, logic error
if (!inserted.second)
{
SWSS_LOG_THROW("Duplicated executorName in m_consumerMap: %s", executor->getName().c_str());
}
}

template<typename DataType, int RingSize>
bool RingBuffer<DataType, RingSize>::Serves(const std::string& tableName)
{
for (auto &it : m_consumerMap) {
if (it.first == tableName)
return true;
}
return false;
}

class Consumer : public ConsumerBase {
public:
Consumer(swss::ConsumerTableBase *select, Orch *orch, const std::string &name)
Expand Down Expand Up @@ -369,7 +284,7 @@ class Orch
Orch(const std::vector<TableConnector>& tables);
virtual ~Orch() = default;

static OrchRing* gRingBuffer;
static RingBuffer* gRingBuffer;

std::vector<swss::Selectable*> getSelectables();

Expand Down
2 changes: 1 addition & 1 deletion orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void OrchDaemon::popRingBuffer()
* This function initializes gRingBuffer, otherwise it's nullptr.
*/
void OrchDaemon::enableRingBuffer() {
gRingBuffer = OrchRing::Get();
gRingBuffer = RingBuffer::Get();
Executor::gRingBuffer = gRingBuffer;
Orch::gRingBuffer = gRingBuffer;
}
Expand Down
2 changes: 1 addition & 1 deletion orchagent/orchdaemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class OrchDaemon

protected:
/* Orchdaemon instance points to the same ring buffer during its lifetime */
OrchRing* gRingBuffer = nullptr;
RingBuffer* gRingBuffer = nullptr;
std::atomic<bool> ring_thread_exited{false};
};

Expand Down
1 change: 0 additions & 1 deletion tests/mock_tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ tests_SOURCES = aclorch_ut.cpp \
stporch_ut.cpp \
flexcounter_ut.cpp \
mock_orch_test.cpp \
ring_ut.cpp \
$(top_srcdir)/warmrestart/warmRestartHelper.cpp \
$(top_srcdir)/lib/gearboxutils.cpp \
$(top_srcdir)/lib/subintf.cpp \
Expand Down
Loading

0 comments on commit 6322b0f

Please sign in to comment.