From c4e2f29d98fa8385841b4b28112b92827acdc5f2 Mon Sep 17 00:00:00 2001 From: Eugene-Mark Date: Thu, 9 Sep 2021 15:02:51 +0800 Subject: [PATCH] [PMEM-SHUFFLE-58] Add blocking queue to reduce the effort to persist metadata in ReplicaService --- rpmp/pmpool/proxy/replicaService/Queue.h | 27 +++++++++++++++ .../proxy/replicaService/ReplicaService.cc | 34 +++++++++++++++++-- .../proxy/replicaService/ReplicaService.h | 13 +++++++ 3 files changed, 71 insertions(+), 3 deletions(-) create mode 100644 rpmp/pmpool/proxy/replicaService/Queue.h diff --git a/rpmp/pmpool/proxy/replicaService/Queue.h b/rpmp/pmpool/proxy/replicaService/Queue.h new file mode 100644 index 00000000..9f0be82b --- /dev/null +++ b/rpmp/pmpool/proxy/replicaService/Queue.h @@ -0,0 +1,27 @@ +#include +#include +#include + +template +class queue +{ +private: + std::mutex d_mutex; + std::condition_variable d_condition; + std::deque d_queue; +public: + void push(T const& value) { + { + std::unique_lock lock(this->d_mutex); + d_queue.push_front(value); + } + this->d_condition.notify_one(); + } + T pop() { + std::unique_lock lock(this->d_mutex); + this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); }); + T rc(std::move(this->d_queue.back())); + this->d_queue.pop_back(); + return rc; + } +}; \ No newline at end of file diff --git a/rpmp/pmpool/proxy/replicaService/ReplicaService.cc b/rpmp/pmpool/proxy/replicaService/ReplicaService.cc index 1b220181..74632b58 100644 --- a/rpmp/pmpool/proxy/replicaService/ReplicaService.cc +++ b/rpmp/pmpool/proxy/replicaService/ReplicaService.cc @@ -2,6 +2,7 @@ #include "ReplicaService.h" #include "pmpool/proxy/metastore/JsonUtil.h" +#include using namespace std; @@ -54,7 +55,9 @@ int ReplicaWorker::entry() { } ReplicaService::ReplicaService(std::shared_ptr config, std::shared_ptr log, std::shared_ptr proxyServer, std::shared_ptr metastore) : - config_(config), log_(log), proxyServer_(proxyServer), metastore_(metastore) {} + config_(config), log_(log), proxyServer_(proxyServer), metastore_(metastore) { + + } ReplicaService::~ReplicaService() { worker_->stop(); @@ -65,6 +68,20 @@ void ReplicaService::enqueue_recv_msg(std::shared_ptr request) { worker_->addTask(request); } +void ReplicaService::asyncUpdate(){ + while(true){ + ReplicaRecord record = blocking_queue_.pop(); + updateRecord(record.key, record.node, record.size); + } +} + +void ReplicaService::replicaAsyncUpdate(){ + while(true){ + ReplicaRecord record = replica_blocking_queue_.pop(); + updateRecord(record.key, record.node, record.size); + } +} + /** * Update data status once it's been put to the node successfully **/ @@ -171,7 +188,9 @@ void ReplicaService::handle_recv_msg(std::shared_ptr request) { removeReplica(rc.key); } addReplica(rc.key, rc.node); - updateRecord(rc.key, rc.node, rc.size); + ReplicaRecord record = {rc.key, rc.node, rc.size}; + blocking_queue_.push(record); + unordered_set nodes = proxyServer_->getNodes(rc.key); if (nodes.count(rc.node)) { nodes.erase(rc.node); @@ -199,7 +218,8 @@ void ReplicaService::handle_recv_msg(std::shared_ptr request) { uint32_t replicaNum = dataReplica_ < proxyServer_->getNodeNum() ? dataReplica_ : proxyServer_->getNodeNum(); uint32_t minReplica = replicaNum < minReplica_ ? replicaNum : minReplica_; addReplica(rc.key, rc.node); - updateRecord(rc.key, rc.node, rc.size); + ReplicaRecord record = {rc.key, rc.node, rc.size}; + replica_blocking_queue_.push(record); if (getReplica(rc.key).size() == minReplica) { proxyServer_->notifyClient(rc.key); } @@ -208,6 +228,13 @@ void ReplicaService::handle_recv_msg(std::shared_ptr request) { } } +bool ReplicaService::startInternalService(){ + std::thread t_metastoreUpdater(&ReplicaService::asyncUpdate, shared_from_this()); + t_metastoreUpdater.detach(); + std::thread t_replicaMetastoreUpdater(&ReplicaService::replicaAsyncUpdate, shared_from_this()); + t_replicaMetastoreUpdater.detach(); +} + bool ReplicaService::startService() { int worker_number = config_->get_network_worker_num(); int buffer_number = config_->get_network_buffer_num(); @@ -237,6 +264,7 @@ bool ReplicaService::startService() { server_->start(); server_->listen(config_->get_current_proxy_addr().c_str(), config_->get_replica_service_port().c_str()); + startInternalService(); log_->get_console_log()->info("ReplicaService started at {0}:{1}", config_->get_current_proxy_addr(), config_->get_replica_service_port()); return true; } diff --git a/rpmp/pmpool/proxy/replicaService/ReplicaService.h b/rpmp/pmpool/proxy/replicaService/ReplicaService.h index 63645719..5f8f755c 100644 --- a/rpmp/pmpool/proxy/replicaService/ReplicaService.h +++ b/rpmp/pmpool/proxy/replicaService/ReplicaService.h @@ -21,6 +21,8 @@ #include "pmpool/proxy/metastore/MetastoreFacade.h" #include "json/json.h" +#include +#include "pmpool/proxy/replicaService/Queue.h" const string JOB_STATUS = "JOB_STATUS"; const string NODES = "NODES"; @@ -35,6 +37,12 @@ using moodycamel::BlockingConcurrentQueue; class ReplicaService; +struct ReplicaRecord{ + uint64_t key; + PhysicalNode node; + uint64_t size; +}; + class ReplicaRecvCallback : public Callback { public: ReplicaRecvCallback() = delete; @@ -103,12 +111,15 @@ class ReplicaService : public std::enable_shared_from_this { void wait(); Connection* getConnection(string node); ChunkMgr* getChunkMgr(); + bool startInternalService(); private: void addReplica(uint64_t key, PhysicalNode node); std::unordered_set getReplica(uint64_t key); void removeReplica(uint64_t key); void updateRecord(uint64_t key, PhysicalNode node, uint64_t size); + void asyncUpdate(); + void replicaAsyncUpdate(); std::shared_ptr worker_; std::shared_ptr chunkMgr_; std::shared_ptr config_; @@ -131,6 +142,8 @@ class ReplicaService : public std::enable_shared_from_this { std::mutex prrcMtx; std::shared_ptr proxyServer_; map node2Connection; + queue blocking_queue_; + queue replica_blocking_queue_; }; #endif // RPMP_REPLICASERVICE_H