Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[PMEM-SHUFFLE-58] Add blocking queue to reduce the effort to persist …
Browse files Browse the repository at this point in the history
…metadata in ReplicaService
  • Loading branch information
Eugene-Mark committed Sep 9, 2021
1 parent ff9078e commit c4e2f29
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 3 deletions.
27 changes: 27 additions & 0 deletions rpmp/pmpool/proxy/replicaService/Queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
std::mutex d_mutex;
std::condition_variable d_condition;
std::deque<T> d_queue;
public:
void push(T const& value) {
{
std::unique_lock<std::mutex> lock(this->d_mutex);
d_queue.push_front(value);
}
this->d_condition.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
T rc(std::move(this->d_queue.back()));
this->d_queue.pop_back();
return rc;
}
};
34 changes: 31 additions & 3 deletions rpmp/pmpool/proxy/replicaService/ReplicaService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "ReplicaService.h"
#include "pmpool/proxy/metastore/JsonUtil.h"
#include <thread>

using namespace std;

Expand Down Expand Up @@ -54,7 +55,9 @@ int ReplicaWorker::entry() {
}

ReplicaService::ReplicaService(std::shared_ptr<Config> config, std::shared_ptr<RLog> log, std::shared_ptr<Proxy> proxyServer, std::shared_ptr<MetastoreFacade> metastore) :
config_(config), log_(log), proxyServer_(proxyServer), metastore_(metastore) {}
config_(config), log_(log), proxyServer_(proxyServer), metastore_(metastore) {

}

ReplicaService::~ReplicaService() {
worker_->stop();
Expand All @@ -65,6 +68,20 @@ void ReplicaService::enqueue_recv_msg(std::shared_ptr<ReplicaRequest> request) {
worker_->addTask(request);
}

void ReplicaService::asyncUpdate(){
while(true){
ReplicaRecord record = blocking_queue_.pop();
updateRecord(record.key, record.node, record.size);
}
}

void ReplicaService::replicaAsyncUpdate(){
while(true){
ReplicaRecord record = replica_blocking_queue_.pop();
updateRecord(record.key, record.node, record.size);
}
}

/**
* Update data status once it's been put to the node successfully
**/
Expand Down Expand Up @@ -171,7 +188,9 @@ void ReplicaService::handle_recv_msg(std::shared_ptr<ReplicaRequest> request) {
removeReplica(rc.key);
}
addReplica(rc.key, rc.node);
updateRecord(rc.key, rc.node, rc.size);
ReplicaRecord record = {rc.key, rc.node, rc.size};
blocking_queue_.push(record);

unordered_set<PhysicalNode, PhysicalNodeHash> nodes = proxyServer_->getNodes(rc.key);
if (nodes.count(rc.node)) {
nodes.erase(rc.node);
Expand Down Expand Up @@ -199,7 +218,8 @@ void ReplicaService::handle_recv_msg(std::shared_ptr<ReplicaRequest> request) {
uint32_t replicaNum = dataReplica_ < proxyServer_->getNodeNum() ? dataReplica_ : proxyServer_->getNodeNum();
uint32_t minReplica = replicaNum < minReplica_ ? replicaNum : minReplica_;
addReplica(rc.key, rc.node);
updateRecord(rc.key, rc.node, rc.size);
ReplicaRecord record = {rc.key, rc.node, rc.size};
replica_blocking_queue_.push(record);
if (getReplica(rc.key).size() == minReplica) {
proxyServer_->notifyClient(rc.key);
}
Expand All @@ -208,6 +228,13 @@ void ReplicaService::handle_recv_msg(std::shared_ptr<ReplicaRequest> request) {
}
}

bool ReplicaService::startInternalService(){
std::thread t_metastoreUpdater(&ReplicaService::asyncUpdate, shared_from_this());
t_metastoreUpdater.detach();
std::thread t_replicaMetastoreUpdater(&ReplicaService::replicaAsyncUpdate, shared_from_this());
t_replicaMetastoreUpdater.detach();
}

bool ReplicaService::startService() {
int worker_number = config_->get_network_worker_num();
int buffer_number = config_->get_network_buffer_num();
Expand Down Expand Up @@ -237,6 +264,7 @@ bool ReplicaService::startService() {

server_->start();
server_->listen(config_->get_current_proxy_addr().c_str(), config_->get_replica_service_port().c_str());
startInternalService();
log_->get_console_log()->info("ReplicaService started at {0}:{1}", config_->get_current_proxy_addr(), config_->get_replica_service_port());
return true;
}
Expand Down
13 changes: 13 additions & 0 deletions rpmp/pmpool/proxy/replicaService/ReplicaService.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "pmpool/proxy/metastore/MetastoreFacade.h"

#include "json/json.h"
#include <thread>
#include "pmpool/proxy/replicaService/Queue.h"

const string JOB_STATUS = "JOB_STATUS";
const string NODES = "NODES";
Expand All @@ -35,6 +37,12 @@ using moodycamel::BlockingConcurrentQueue;

class ReplicaService;

struct ReplicaRecord{
uint64_t key;
PhysicalNode node;
uint64_t size;
};

class ReplicaRecvCallback : public Callback {
public:
ReplicaRecvCallback() = delete;
Expand Down Expand Up @@ -103,12 +111,15 @@ class ReplicaService : public std::enable_shared_from_this<ReplicaService> {
void wait();
Connection* getConnection(string node);
ChunkMgr* getChunkMgr();
bool startInternalService();

private:
void addReplica(uint64_t key, PhysicalNode node);
std::unordered_set<PhysicalNode, PhysicalNodeHash> getReplica(uint64_t key);
void removeReplica(uint64_t key);
void updateRecord(uint64_t key, PhysicalNode node, uint64_t size);
void asyncUpdate();
void replicaAsyncUpdate();
std::shared_ptr<ReplicaWorker> worker_;
std::shared_ptr<ChunkMgr> chunkMgr_;
std::shared_ptr<Config> config_;
Expand All @@ -131,6 +142,8 @@ class ReplicaService : public std::enable_shared_from_this<ReplicaService> {
std::mutex prrcMtx;
std::shared_ptr<Proxy> proxyServer_;
map<string, Connection*> node2Connection;
queue<ReplicaRecord> blocking_queue_;
queue<ReplicaRecord> replica_blocking_queue_;
};

#endif // RPMP_REPLICASERVICE_H

0 comments on commit c4e2f29

Please sign in to comment.