Skip to content

Commit

Permalink
[orchagent] implement ring buffer feature with a flag
Browse files Browse the repository at this point in the history
  • Loading branch information
a114j0y committed Oct 2, 2024
1 parent 465391d commit 80e7a90
Show file tree
Hide file tree
Showing 5 changed files with 334 additions and 22 deletions.
15 changes: 13 additions & 2 deletions orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ extern size_t gMaxBulkSize;
#define DEFAULT_BATCH_SIZE 128
extern int gBatchSize;

bool gRingMode = false;

bool gSyncMode = false;
sai_redis_communication_mode_t gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC;
string gAsicInstance;
Expand All @@ -72,7 +74,7 @@ bool gTraditionalFlexCounter = false;

void usage()
{
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode]" << endl;
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode] [-R]" << endl;
cout << " -h: display this message" << endl;
cout << " -r record_type: record orchagent logs with type (default 3)" << endl;
cout << " Bit 0: sairedis.rec, Bit 1: swss.rec, Bit 2: responsepublisher.rec. For example:" << endl;
Expand All @@ -92,6 +94,7 @@ void usage()
cout << " -k max bulk size in bulk mode (default 1000)" << endl;
cout << " -q zmq_server_address: ZMQ server address (default disable ZMQ)" << endl;
cout << " -c counter mode (traditional|asic_db), default: asic_db" << endl;
cout << " -R: enable the ring buffer thread" << endl;
}

void sighup_handler(int signo)
Expand Down Expand Up @@ -346,7 +349,7 @@ int main(int argc, char **argv)
string responsepublisher_rec_filename = Recorder::RESPPUB_FNAME;
int record_type = 3; // Only swss and sairedis recordings enabled by default.

while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:")) != -1)
while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:R")) != -1)
{
switch (opt)
{
Expand Down Expand Up @@ -437,6 +440,9 @@ int main(int argc, char **argv)
enable_zmq = true;
}
break;
case 'R':
gRingMode = true;
break;
default: /* '?' */
exit(EXIT_FAILURE);
}
Expand Down Expand Up @@ -782,6 +788,11 @@ int main(int argc, char **argv)
orchDaemon = make_shared<FabricOrchDaemon>(&appl_db, &config_db, &state_db, chassis_app_db.get(), zmq_server.get());
}

if (gRingMode) {
/* Initialize the ring before OrchDaemon initializing Orchs */
orchDaemon->enableRingBuffer();
}

if (!orchDaemon->init())
{
SWSS_LOG_ERROR("Failed to initialize orchestration daemon");
Expand Down
102 changes: 91 additions & 11 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@
#include "zmqconsumerstatetable.h"
#include "sai_serialize.h"

#define PRINT_ALL 1
#define VERBOSE true

using namespace swss;

int gBatchSize = 0;

int ROUTE_SYNC_PPL_SIZE = 50000;

OrchRing* Orch::gRingBuffer = nullptr;
OrchRing* Executor::gRingBuffer = nullptr;

Orch::Orch(DBConnector *db, const string tableName, int pri)
{
addConsumer(db, tableName, pri);
Expand Down Expand Up @@ -155,6 +163,10 @@ size_t ConsumerBase::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries
return entries.size();
}

size_t ConsumerBase::addToSync(std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries) {
return addToSync(*entries);
}

// TODO: Table should be const
size_t ConsumerBase::refillToSync(Table* table)
{
Expand Down Expand Up @@ -239,25 +251,93 @@ void ConsumerBase::dumpPendingTasks(vector<string> &ts)

void Consumer::execute()
{
// ConsumerBase::execute_impl<swss::ConsumerTableBase>();
static swss::PerformanceTimer timer("POPS", PRINT_ALL, VERBOSE);

SWSS_LOG_ENTER();

size_t update_size = 0;
auto table = static_cast<swss::ConsumerTableBase *>(getSelectable());
do
size_t total_size = 0;

while (true)
{
std::deque<KeyOpFieldsValuesTuple> entries;
table->pops(entries);
update_size = addToSync(entries);
} while (update_size != 0);
size_t popped_size = 0; // number of entries popped from the redis table

if (gRingBuffer && gRingBuffer->Serves(getName())) {
timer.start();
}

drain();
auto entries = std::make_shared<std::deque<KeyOpFieldsValuesTuple>>();
getConsumerTable()->pops(*entries);

popped_size = entries->size();
total_size += popped_size;

pushRingBuffer([=](){
addToSync(entries);
});

if (gRingBuffer && gRingBuffer->Serves(getName())) {
timer.stop();
timer.inc((int)popped_size);
}

if (!gBatchSize || popped_size * 10 <= (size_t)gBatchSize || total_size >= ROUTE_SYNC_PPL_SIZE) {
// some program doesn't initialize gBatchSize and use TableConsumable::DEFAULT_POP_BATCH_SIZE instead
break;
}
}

pushRingBuffer([=](){
drain();
});
}

void Executor::pushRingBuffer(AnyTask&& task)
{
if (!gRingBuffer || !gRingBuffer->threadCreated)
{
// execute the task right now in this thread if gRingBuffer is not initialized
// or the ring thread is not created, or this executor is not served by gRingBuffer
task();
}
else if (!gRingBuffer->Serves(getName())) // not served by ring thread
{
while (!gRingBuffer->IsEmpty() || !gRingBuffer->Idle) {
gRingBuffer->notify();
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS));
}
// if ring thread is enabled, make sure to execute task after the ring finishes its work
task();
}
else
{
// if this executor is served by gRingBuffer, push the task to gRingBuffer
// and notify the ring thread to flush gRingBuffer
while (!gRingBuffer->push(task)) {
gRingBuffer->notify();
SWSS_LOG_WARN("ring is full...push again");
}
gRingBuffer->notify();
}
}

void Consumer::drain()
{
if (!m_toSync.empty())
((Orch *)m_orch)->doTask((Consumer&)*this);
if (m_toSync.empty())
return;

if (getName() == APP_ROUTE_TABLE_NAME) {
static swss::PerformanceTimer timer("DRAIN", PRINT_ALL, VERBOSE);
size_t before = m_toSync.size();
timer.start();
m_orch->doTask(*this);
timer.stop();
size_t after = m_toSync.size();
timer.inc(before - after);
}
else
{
m_orch->doTask(*this);
}
}

size_t Orch::addExistingData(const string& tableName)
Expand Down
Loading

0 comments on commit 80e7a90

Please sign in to comment.