diff --git a/fpmsyncd/fpmsyncd.cpp b/fpmsyncd/fpmsyncd.cpp index 5e16a6a6ca..a79324690b 100644 --- a/fpmsyncd/fpmsyncd.cpp +++ b/fpmsyncd/fpmsyncd.cpp @@ -7,6 +7,7 @@ #include "netlink.h" #include "notificationconsumer.h" #include "subscriberstatetable.h" +#include "performancetimer.h" #include "warmRestartHelper.h" #include "fpmsyncd/fpmlink.h" #include "fpmsyncd/routesync.h" @@ -16,6 +17,18 @@ using namespace std; using namespace swss; +// SELECT_TIMEOUT specifies the maximum wait time in milliseconds (-1 == infinite) +static int SELECT_TIMEOUT; +#define INFINITE -1 +#define FLUSH_TIMEOUT 500 // 500 milliseconds +static int gFlushTimeout = FLUSH_TIMEOUT; +// redispipeline has a maximum capacity of 50000 entries +#define ROUTE_SYNC_PPL_SIZE 50000 +// consider the traffic is small if pipeline contains < 500 entries +#define SMALL_TRAFFIC 500 +#define PRINT_ALL 1 +#define VERBOSE true + /* * Default warm-restart timer interval for routing-stack app. To be used only if * no explicit value has been defined in configuration. @@ -48,6 +61,41 @@ static bool eoiuFlagsSet(Table &bgpStateTable) return true; } +/** + * @brief fpmsyncd invokes redispipeline's flush with a timer + * + * redispipeline would automatically flush itself when full, + * but fpmsyncd can invoke pipeline's flush even if it's not full yet. + * + * By setting SELECT_TIMEOUT, fpmsyncd controls the flush interval. + * + * @param pipeline reference to the pipeline to be flushed + * @param scheduled if true, timer for fpmsyncd flush expired + */ +void flushPipeline(RedisPipeline& pipeline, bool scheduled) { + size_t remaining = pipeline.size(); + if (remaining == 0) { + SELECT_TIMEOUT = INFINITE; + return; + } + static swss::PerformanceTimer timer("FPMFLUSH", PRINT_ALL, VERBOSE); + int idle = pipeline.getIdleTime(); + // flush right away + if (remaining < SMALL_TRAFFIC || idle >= gFlushTimeout || idle <= 0|| scheduled) { + timer.start(); + pipeline.flush(); + timer.stop(); + + timer.inc(remaining); + SELECT_TIMEOUT = INFINITE; + SWSS_LOG_DEBUG("Pipeline flushed"); + + return; + } + // postpone the flush + SELECT_TIMEOUT = gFlushTimeout - idle; +} + int main(int argc, char **argv) { swss::Logger::linkToDbNative("fpmsyncd"); @@ -61,7 +109,7 @@ int main(int argc, char **argv) DBConnector applStateDb("APPL_STATE_DB", 0); std::unique_ptr routeResponseChannel; - RedisPipeline pipeline(&db); + RedisPipeline pipeline(&db, ROUTE_SYNC_PPL_SIZE); RouteSync sync(&pipeline); DBConnector stateDb("STATE_DB", 0); @@ -152,12 +200,14 @@ int main(int argc, char **argv) sync.m_warmStartHelper.setState(WarmStart::WSDISABLED); } + SELECT_TIMEOUT = INFINITE; + while (true) { Selectable *temps; /* Reading FPM messages forever (and calling "readMe" to read them) */ - s.select(&temps); + auto ret = s.select(&temps, SELECT_TIMEOUT); /* * Upon expiration of the warm-restart timer or eoiu Hold Timer, proceed to run the @@ -286,8 +336,7 @@ int main(int argc, char **argv) } else if (!warmStartEnabled || sync.m_warmStartHelper.isReconciled()) { - pipeline.flush(); - SWSS_LOG_DEBUG("Pipeline flushed"); + flushPipeline(pipeline, ret==Select::TIMEOUT); } } } diff --git a/fpmsyncd/routesync.cpp b/fpmsyncd/routesync.cpp index 0f6ee41188..963a01b1fb 100644 --- a/fpmsyncd/routesync.cpp +++ b/fpmsyncd/routesync.cpp @@ -76,7 +76,7 @@ static decltype(auto) makeNlAddr(const T& ip) RouteSync::RouteSync(RedisPipeline *pipeline) : - m_routeTable(pipeline, APP_ROUTE_TABLE_NAME, true), + m_routeTable(pipeline, APP_ROUTE_TABLE_NAME, true, true), m_label_routeTable(pipeline, APP_LABEL_ROUTE_TABLE_NAME, true), m_vnet_routeTable(pipeline, APP_VNET_RT_TABLE_NAME, true), m_vnet_tunnelTable(pipeline, APP_VNET_RT_TUNNEL_TABLE_NAME, true),