Skip to content

Commit

Permalink
redispipeline publish at flush
Browse files Browse the repository at this point in the history
  • Loading branch information
a114j0y committed Oct 31, 2024
1 parent 0b5d994 commit d50bde6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 35 deletions.
84 changes: 49 additions & 35 deletions common/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,63 @@ using namespace std;
namespace swss {

ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName)
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false)
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false, false)
{
m_pipeowned = true;
}

ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
: ProducerStateTable(pipeline, tableName, buffered, false) {}

ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered, bool flushPub)
: TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDBConnector()))
, TableName_KeySet(tableName)
, m_flushPub(flushPub)
, m_buffered(buffered)
, m_pipeowned(false)
, m_tempViewActive(false)
, m_pipe(pipeline)
{
resetRedisScript();

string luaClear =
"redis.call('DEL', KEYS[1])\n"
"local keys = redis.call('KEYS', KEYS[2] .. '*')\n"
"for i,k in pairs(keys) do\n"
" redis.call('DEL', k)\n"
"end\n"
"redis.call('DEL', KEYS[3])\n";
m_shaClear = m_pipe->loadRedisScript(luaClear);

string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua");
m_shaApplyView = m_pipe->loadRedisScript(luaApplyView);
}

ProducerStateTable::~ProducerStateTable()
{
if (m_pipeowned)
{
delete m_pipe;
}
}

void ProducerStateTable::resetRedisScript()
{
if (m_flushPub && m_buffered)
m_pipe->addChannel(getChannelName(m_pipe->getDbId()));

// num in luaSet and luaDel means number of elements that were added to the key set,
// not including all the elements already present into the set.
string luaSet =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"for i = 0, #KEYS - 3 do\n"
" redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
"end\n"
" if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaSet = m_pipe->loadRedisScript(luaSet);

string luaDel =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"redis.call('SADD', KEYS[4], ARGV[2])\n"
"redis.call('DEL', KEYS[3])\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaDel = m_pipe->loadRedisScript(luaDel);
"redis.call('DEL', KEYS[3])\n";

string luaBatchedSet =
"local added = 0\n"
Expand All @@ -59,48 +83,38 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
" redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n"
" end\n"
" idx = idx + tonumber(ARGV[idx]) * 2 + 1\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);

string luaBatchedDel =
"local added = 0\n"
"for i = 0, #KEYS - 5 do\n"
" added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n"
" redis.call('SADD', KEYS[3], KEYS[5 + i])\n"
" redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);

string luaClear =
"redis.call('DEL', KEYS[1])\n"
"local keys = redis.call('KEYS', KEYS[2] .. '*')\n"
"for i,k in pairs(keys) do\n"
" redis.call('DEL', k)\n"
"end\n"
"redis.call('DEL', KEYS[3])\n";
m_shaClear = m_pipe->loadRedisScript(luaClear);

string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua");
m_shaApplyView = m_pipe->loadRedisScript(luaApplyView);
}

ProducerStateTable::~ProducerStateTable()
{
if (m_pipeowned)
if (!m_flushPub || !m_buffered)
{
delete m_pipe;
string luaPub =
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
luaSet += luaPub;
luaDel += luaPub;
luaBatchedSet += luaPub;
luaBatchedDel += luaPub;
}

m_shaSet = m_pipe->loadRedisScript(luaSet);
m_shaDel = m_pipe->loadRedisScript(luaDel);
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);
}

void ProducerStateTable::setBuffered(bool buffered)
{
m_buffered = buffered;
resetRedisScript();
}

void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &values,
Expand Down
4 changes: 4 additions & 0 deletions common/producerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
public:
ProducerStateTable(DBConnector *db, const std::string &tableName);
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered, bool flushPub);
virtual ~ProducerStateTable();

void setBuffered(bool buffered);
Expand Down Expand Up @@ -51,6 +52,7 @@ class ProducerStateTable : public TableBase, public TableName_KeySet

void apply_temp_view();
private:
bool m_flushPub;
bool m_buffered;
bool m_pipeowned;
bool m_tempViewActive;
Expand All @@ -62,6 +64,8 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
std::string m_shaClear;
std::string m_shaApplyView;
TableDump m_tempViewState;

void resetRedisScript();
};

}
44 changes: 44 additions & 0 deletions common/redispipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

#include <string>
#include <queue>
#include <unordered_set>
#include <functional>
#include <chrono>
#include <iostream>
#include "redisreply.h"
#include "rediscommand.h"
#include "dbconnector.h"
Expand All @@ -22,9 +25,11 @@ class RedisPipeline {
RedisPipeline(const DBConnector *db, size_t sz = 128)
: COMMAND_MAX(sz)
, m_remaining(0)
, m_shaPub("")
{
m_db = db->newConnector(NEWCONNECTOR_TIMEOUT);
initializeOwnerTid();
lastHeartBeat = std::chrono::steady_clock::now();
}

~RedisPipeline() {
Expand Down Expand Up @@ -113,11 +118,19 @@ class RedisPipeline {

void flush()
{
lastHeartBeat = std::chrono::steady_clock::now();

if (m_remaining == 0) {
return;
}

while(m_remaining)
{
// Construct an object to use its dtor, so that resource is released
RedisReply r(pop());
}

publish();
}

size_t size()
Expand Down Expand Up @@ -145,12 +158,43 @@ class RedisPipeline {
m_ownerTid = gettid();
}

void addChannel(std::string channel)
{
if (m_channels.find(channel) != m_channels.end())
return;

m_channels.insert(channel);
m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G');";
m_shaPub = loadRedisScript(m_luaPub);
}

int getIdleTime(std::chrono::time_point<std::chrono::steady_clock> tcurrent=std::chrono::steady_clock::now())
{
return static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(tcurrent - lastHeartBeat).count());
}

void publish() {
if (m_shaPub.empty()) {
return;
}
RedisCommand cmd;
cmd.format(
"EVALSHA %s 0",
m_shaPub.c_str());
RedisReply r(m_db, cmd);
}

private:
DBConnector *m_db;
std::queue<int> m_expectedTypes;
size_t m_remaining;
long int m_ownerTid;

std::string m_luaPub;
std::string m_shaPub;
std::chrono::time_point<std::chrono::steady_clock> lastHeartBeat; // marks the timestamp of latest pipeline flush being invoked
std::unordered_set<std::string> m_channels;

void mayflush()
{
if (m_remaining >= COMMAND_MAX)
Expand Down

0 comments on commit d50bde6

Please sign in to comment.