Skip to content

Commit

Permalink
[fpmsyncd] support flush with a timer
Browse files Browse the repository at this point in the history
  • Loading branch information
a114j0y committed Jul 22, 2024
1 parent 2367bca commit f3d9c55
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
57 changes: 53 additions & 4 deletions fpmsyncd/fpmsyncd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "netlink.h"
#include "notificationconsumer.h"
#include "subscriberstatetable.h"
#include "performancetimer.h"
#include "warmRestartHelper.h"
#include "fpmsyncd/fpmlink.h"
#include "fpmsyncd/routesync.h"
Expand All @@ -16,6 +17,18 @@
using namespace std;
using namespace swss;

// SELECT_TIMEOUT specifies the maximum wait time in milliseconds (-1 == infinite)
static int SELECT_TIMEOUT;
#define INFINITE -1
#define FLUSH_TIMEOUT 500 // 500 milliseconds
static int gFlushTimeout = FLUSH_TIMEOUT;
// redispipeline has a maximum capacity of 50000 entries
#define ROUTE_SYNC_PPL_SIZE 50000
// consider the traffic is small if pipeline contains < 500 entries
#define SMALL_TRAFFIC 500
#define PRINT_ALL 1
#define VERBOSE true

/*
* Default warm-restart timer interval for routing-stack app. To be used only if
* no explicit value has been defined in configuration.
Expand Down Expand Up @@ -48,6 +61,41 @@ static bool eoiuFlagsSet(Table &bgpStateTable)
return true;
}

/**
* @brief fpmsyncd invokes redispipeline's flush with a timer
*
* redispipeline would automatically flush itself when full,
* but fpmsyncd can invoke pipeline's flush even if it's not full yet.
*
* By setting SELECT_TIMEOUT, fpmsyncd controls the flush interval.
*
* @param pipeline reference to the pipeline to be flushed
* @param scheduled if true, timer for fpmsyncd flush expired
*/
void flushPipeline(RedisPipeline& pipeline, bool scheduled) {
size_t remaining = pipeline.size();
if (remaining == 0) {
SELECT_TIMEOUT = INFINITE;
return;
}
static swss::PerformanceTimer timer("FPMFLUSH", PRINT_ALL, VERBOSE);
int idle = pipeline.getIdleTime();
// flush right away
if (remaining < SMALL_TRAFFIC || idle >= gFlushTimeout || idle <= 0) {
timer.start();
pipeline.flush();
timer.stop();

timer.inc(remaining);
SELECT_TIMEOUT = INFINITE;
SWSS_LOG_DEBUG("Pipeline flushed");

return;
}
// postpone the flush
SELECT_TIMEOUT = gFlushTimeout - idle;
}

int main(int argc, char **argv)
{
swss::Logger::linkToDbNative("fpmsyncd");
Expand All @@ -61,7 +109,7 @@ int main(int argc, char **argv)
DBConnector applStateDb("APPL_STATE_DB", 0);
std::unique_ptr<NotificationConsumer> routeResponseChannel;

RedisPipeline pipeline(&db);
RedisPipeline pipeline(&db, ROUTE_SYNC_PPL_SIZE);
RouteSync sync(&pipeline);

DBConnector stateDb("STATE_DB", 0);
Expand Down Expand Up @@ -152,12 +200,14 @@ int main(int argc, char **argv)
sync.m_warmStartHelper.setState(WarmStart::WSDISABLED);
}

SELECT_TIMEOUT = INFINITE;

while (true)
{
Selectable *temps;

/* Reading FPM messages forever (and calling "readMe" to read them) */
s.select(&temps);
auto ret = s.select(&temps, SELECT_TIMEOUT);

/*
* Upon expiration of the warm-restart timer or eoiu Hold Timer, proceed to run the
Expand Down Expand Up @@ -286,8 +336,7 @@ int main(int argc, char **argv)
}
else if (!warmStartEnabled || sync.m_warmStartHelper.isReconciled())
{
pipeline.flush();
SWSS_LOG_DEBUG("Pipeline flushed");
flushPipeline(pipeline, ret==Select::TIMEOUT);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion fpmsyncd/routesync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ static decltype(auto) makeNlAddr(const T& ip)


RouteSync::RouteSync(RedisPipeline *pipeline) :
m_routeTable(pipeline, APP_ROUTE_TABLE_NAME, true),
m_routeTable(pipeline, APP_ROUTE_TABLE_NAME, true, true),
m_label_routeTable(pipeline, APP_LABEL_ROUTE_TABLE_NAME, true),
m_vnet_routeTable(pipeline, APP_VNET_RT_TABLE_NAME, true),
m_vnet_tunnelTable(pipeline, APP_VNET_RT_TUNNEL_TABLE_NAME, true),
Expand Down

0 comments on commit f3d9c55

Please sign in to comment.