Skip to content

Commit

Permalink
feat: 优化主线程执行策略,提升整体性能
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Nov 1, 2023
1 parent 4c9a7f1 commit d6306ef
Show file tree
Hide file tree
Showing 13 changed files with 271 additions and 46 deletions.
5 changes: 5 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# To shield the difference between Windows and Linux systems.

*.h text eol=native
*.cpp text eol=native
*.inl text eol=native
18 changes: 17 additions & 1 deletion src/CBasic/CStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,34 @@ class CSTATUS {
}

CSTATUS(const CSTATUS &status) {
if (status.isOK()) {
return;
}

this->error_code_ = status.error_code_;
this->error_info_ = status.error_info_;
this->error_locate_ = status.error_locate_;
}

CSTATUS(const CSTATUS &&status) noexcept {
if (status.isOK()) {
return;
}

this->error_code_ = status.error_code_;
this->error_info_ = status.error_info_;
this->error_locate_ = status.error_locate_;
}

CSTATUS& operator=(const CSTATUS& status) = default;
CSTATUS& operator=(const CSTATUS& status) {
if (!status.isOK()) {
// 如果status是正常的话,则所有数据保持不变
this->error_code_ = status.error_code_;
this->error_info_ = status.error_info_;
this->error_locate_ = status.error_locate_;
}
return (*this);
}

CSTATUS& operator+=(const CSTATUS& cur) {
/**
Expand Down
4 changes: 2 additions & 2 deletions src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class UAtomicQueue : public UQueueObject {
*/
CBool tryPop(T& value) {
CBool result = false;
if (mutex_.try_lock()) {
if (!queue_.empty() && mutex_.try_lock()) {
if (!queue_.empty()) {
value = std::move(*queue_.front());
queue_.pop();
Expand All @@ -64,7 +64,7 @@ class UAtomicQueue : public UQueueObject {
*/
CBool tryPop(std::vector<T>& values, int maxPoolBatchSize) {
CBool result = false;
if (mutex_.try_lock()) {
if (!queue_.empty() && mutex_.try_lock()) {
while (!queue_.empty() && maxPoolBatchSize-- > 0) {
values.emplace_back(std::move(*queue_.front()));
queue_.pop();
Expand Down
77 changes: 77 additions & 0 deletions src/UtilsCtrl/ThreadPool/Queue/ULockFreeRingBufferQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/***************************
@Author: Chunel
@Contact: [email protected]
@File: ULockFreeRingBufferQueue.h
@Time: 2023/10/7 21:35
@Desc:
***************************/

#ifndef CGRAPH_ULOCKFREERINGBUFFERQUEUE_H
#define CGRAPH_ULOCKFREERINGBUFFERQUEUE_H

#include <atomic>
#include <memory>

#include "UQueueObject.h"

CGRAPH_NAMESPACE_BEGIN

template<typename T, CInt CAPACITY = 32>
class ULockFreeRingBufferQueue : public UQueueObject {
public:
explicit ULockFreeRingBufferQueue() {
head_ = 0;
tail_ = 0;
ring_buffer_.resize(CAPACITY);
}

~ULockFreeRingBufferQueue() override {
ring_buffer_.clear();
}

/**
* 写入一个任务
* @param value
*/
CVoid push(T&& value) {
int curTail = tail_.load(std::memory_order_relaxed);
int nextTail = (curTail + 1) % CAPACITY;

while (nextTail == head_.load(std::memory_order_acquire)) {
// 队列已满,等待其他线程出队
std::this_thread::yield();
}

ring_buffer_[curTail] = std::move(value);
tail_.store(nextTail, std::memory_order_release);
}

/**
* 尝试弹出一个任务
* @param value
* @return
*/
CBool tryPop(T& value) {
int curHead = head_.load(std::memory_order_relaxed);
if (curHead == tail_.load(std::memory_order_acquire)) {
// 队列已空,直接返回false
return false;
}

value = std::move(ring_buffer_[curHead]);

int nextHead = (curHead + 1) % CAPACITY;
head_.store(nextHead, std::memory_order_release);
return true;
}


private:
std::atomic<CInt> head_; // 开始元素(较早写入的)的位置
std::atomic<CInt> tail_; // 尾部的位置
std::vector<std::unique_ptr<T> > ring_buffer_; // 环形队列
};

CGRAPH_NAMESPACE_END

#endif //CGRAPH_ULOCKFREERINGBUFFERQUEUE_H
1 change: 1 addition & 0 deletions src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
#include "UWorkStealingQueue.h"
#include "UAtomicPriorityQueue.h"
#include "UAtomicRingBufferQueue.h"
#include "ULockFreeRingBufferQueue.h"

#endif //CGRAPH_UQUEUEINCLUDE_H
74 changes: 55 additions & 19 deletions src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@
#include <deque>

#include "UQueueObject.h"
#include "../Task/UTaskInclude.h"
#include "../Lock/ULockInclude.h"

CGRAPH_NAMESPACE_BEGIN

template<typename T>
class UWorkStealingQueue : public UQueueObject {
public:
/**
* 向队列中写入信息
* @param task
*/
CVoid push(UTask&& task) {
CVoid push(T&& task) {
while (true) {
if (lock_.try_lock()) {
deque_.emplace_front(std::move(task));
deque_.emplace_front(std::forward<T>(task));
lock_.unlock();
break;
} else {
Expand All @@ -42,10 +41,47 @@ class UWorkStealingQueue : public UQueueObject {
* @param task
* @return
*/
CBool tryPush(UTask&& task) {
CBool tryPush(T&& task) {
CBool result = false;
if (lock_.try_lock()) {
deque_.emplace_back(std::move(task));
deque_.emplace_back(std::forward<T>(task));
lock_.unlock();
result = true;
}
return result;
}


/**
* 向队列中写入信息
* @param task
*/
CVoid push(std::vector<T>& tasks) {
while (true) {
if (lock_.try_lock()) {
for (const auto& task : tasks) {
deque_.emplace_front(std::forward<T>(task));
}
lock_.unlock();
break;
} else {
std::this_thread::yield();
}
}
}


/**
* 尝试批量写入内容
* @param tasks
* @return
*/
CBool tryPush(std::vector<T>& tasks) {
CBool result = false;
if (lock_.try_lock()) {
for (const auto& task : tasks) {
deque_.emplace_back(std::forward<T>(task));
}
lock_.unlock();
result = true;
}
Expand All @@ -58,12 +94,12 @@ class UWorkStealingQueue : public UQueueObject {
* @param task
* @return
*/
CBool tryPop(UTask& task) {
CBool tryPop(T& task) {
// 这里不使用raii锁,主要是考虑到多线程的情况下,可能会重复进入
bool result = false;
if (lock_.try_lock()) {
if (!deque_.empty() && lock_.try_lock()) {
if (!deque_.empty()) {
task = std::move(deque_.front()); // 从前方弹出
task = std::forward<T>(deque_.front()); // 从前方弹出
deque_.pop_front();
result = true;
}
Expand All @@ -80,11 +116,11 @@ class UWorkStealingQueue : public UQueueObject {
* @param maxLocalBatchSize
* @return
*/
CBool tryPop(UTaskArrRef taskArr, int maxLocalBatchSize) {
CBool tryPop(std::vector<T>& taskArr, int maxLocalBatchSize) {
bool result = false;
if (lock_.try_lock()) {
if (!deque_.empty() && lock_.try_lock()) {
while (!deque_.empty() && maxLocalBatchSize--) {
taskArr.emplace_back(std::move(deque_.front()));
taskArr.emplace_back(std::forward<T>(deque_.front()));
deque_.pop_front();
result = true;
}
Expand All @@ -100,11 +136,11 @@ class UWorkStealingQueue : public UQueueObject {
* @param task
* @return
*/
CBool trySteal(UTask& task) {
CBool trySteal(T& task) {
bool result = false;
if (lock_.try_lock()) {
if (!deque_.empty() && lock_.try_lock()) {
if (!deque_.empty()) {
task = std::move(deque_.back()); // 从后方窃取
task = std::forward<T>(deque_.back()); // 从后方窃取
deque_.pop_back();
result = true;
}
Expand All @@ -120,11 +156,11 @@ class UWorkStealingQueue : public UQueueObject {
* @param taskArr
* @return
*/
CBool trySteal(UTaskArrRef taskArr, int maxStealBatchSize) {
CBool trySteal(std::vector<T>& taskArr, int maxStealBatchSize) {
bool result = false;
if (lock_.try_lock()) {
if (!deque_.empty() && lock_.try_lock()) {
while (!deque_.empty() && maxStealBatchSize--) {
taskArr.emplace_back(std::move(deque_.back()));
taskArr.emplace_back(std::forward<T>(deque_.back()));
deque_.pop_back();
result = true;
}
Expand All @@ -139,7 +175,7 @@ class UWorkStealingQueue : public UQueueObject {
CGRAPH_NO_ALLOWED_COPY(UWorkStealingQueue)

private:
std::deque<UTask> deque_; // 存放任务的双向队列
std::deque<T> deque_; // 存放任务的双向队列
std::mutex lock_; // 用于处理deque_的锁
};

Expand Down
51 changes: 51 additions & 0 deletions src/UtilsCtrl/ThreadPool/Semaphore/USemaphore.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/***************************
@Author: Chunel
@Contact: [email protected]
@File: USemaphore.h
@Time: 2023/10/9 22:01
@Desc:
***************************/

#ifndef CGRAPH_USEMAPHORE_H
#define CGRAPH_USEMAPHORE_H

CGRAPH_NAMESPACE_BEGIN

#include <mutex>
#include <condition_variable>

#include "../UThreadObject.h"

class USemaphore : public UThreadObject {
public:
/**
* 触发一次信号
*/
CVoid signal() {
CGRAPH_UNIQUE_LOCK lk(mutex_);
cnt_++;
if (cnt_ <= 0) {
cv_.notify_one();
}
}

/**
* 等待信号触发
*/
CVoid wait() {
CGRAPH_UNIQUE_LOCK lk(mutex_);
cnt_--;
if (cnt_ < 0) {
cv_.wait(lk);
}
}

private:
CInt cnt_ = 0; // 记录当前的次数
std::mutex mutex_;
std::condition_variable cv_;
};

CGRAPH_NAMESPACE_END

#endif //CGRAPH_USEMAPHORE_H
Loading

0 comments on commit d6306ef

Please sign in to comment.