diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..e58f005 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,5 @@ +# To shield the difference between Windows and Linux systems. + +*.h text eol=native +*.cpp text eol=native +*.inl text eol=native \ No newline at end of file diff --git a/src/CBasic/CStatus.h b/src/CBasic/CStatus.h index 227289b..3d68f97 100644 --- a/src/CBasic/CStatus.h +++ b/src/CBasic/CStatus.h @@ -46,18 +46,34 @@ class CSTATUS { } CSTATUS(const CSTATUS &status) { + if (status.isOK()) { + return; + } + this->error_code_ = status.error_code_; this->error_info_ = status.error_info_; this->error_locate_ = status.error_locate_; } CSTATUS(const CSTATUS &&status) noexcept { + if (status.isOK()) { + return; + } + this->error_code_ = status.error_code_; this->error_info_ = status.error_info_; this->error_locate_ = status.error_locate_; } - CSTATUS& operator=(const CSTATUS& status) = default; + CSTATUS& operator=(const CSTATUS& status) { + if (!status.isOK()) { + // 如果status是正常的话,则所有数据保持不变 + this->error_code_ = status.error_code_; + this->error_info_ = status.error_info_; + this->error_locate_ = status.error_locate_; + } + return (*this); + } CSTATUS& operator+=(const CSTATUS& cur) { /** diff --git a/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h b/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h index a018a0f..a8a0386 100644 --- a/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h +++ b/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h @@ -43,7 +43,7 @@ class UAtomicQueue : public UQueueObject { */ CBool tryPop(T& value) { CBool result = false; - if (mutex_.try_lock()) { + if (!queue_.empty() && mutex_.try_lock()) { if (!queue_.empty()) { value = std::move(*queue_.front()); queue_.pop(); @@ -64,7 +64,7 @@ class UAtomicQueue : public UQueueObject { */ CBool tryPop(std::vector& values, int maxPoolBatchSize) { CBool result = false; - if (mutex_.try_lock()) { + if (!queue_.empty() && mutex_.try_lock()) { while (!queue_.empty() && maxPoolBatchSize-- > 0) { values.emplace_back(std::move(*queue_.front())); queue_.pop(); diff --git a/src/UtilsCtrl/ThreadPool/Queue/ULockFreeRingBufferQueue.h b/src/UtilsCtrl/ThreadPool/Queue/ULockFreeRingBufferQueue.h new file mode 100644 index 0000000..50dfcc3 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Queue/ULockFreeRingBufferQueue.h @@ -0,0 +1,77 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: ULockFreeRingBufferQueue.h +@Time: 2023/10/7 21:35 +@Desc: +***************************/ + +#ifndef CGRAPH_ULOCKFREERINGBUFFERQUEUE_H +#define CGRAPH_ULOCKFREERINGBUFFERQUEUE_H + +#include +#include + +#include "UQueueObject.h" + +CGRAPH_NAMESPACE_BEGIN + +template +class ULockFreeRingBufferQueue : public UQueueObject { +public: + explicit ULockFreeRingBufferQueue() { + head_ = 0; + tail_ = 0; + ring_buffer_.resize(CAPACITY); + } + + ~ULockFreeRingBufferQueue() override { + ring_buffer_.clear(); + } + + /** + * 写入一个任务 + * @param value + */ + CVoid push(T&& value) { + int curTail = tail_.load(std::memory_order_relaxed); + int nextTail = (curTail + 1) % CAPACITY; + + while (nextTail == head_.load(std::memory_order_acquire)) { + // 队列已满,等待其他线程出队 + std::this_thread::yield(); + } + + ring_buffer_[curTail] = std::move(value); + tail_.store(nextTail, std::memory_order_release); + } + + /** + * 尝试弹出一个任务 + * @param value + * @return + */ + CBool tryPop(T& value) { + int curHead = head_.load(std::memory_order_relaxed); + if (curHead == tail_.load(std::memory_order_acquire)) { + // 队列已空,直接返回false + return false; + } + + value = std::move(ring_buffer_[curHead]); + + int nextHead = (curHead + 1) % CAPACITY; + head_.store(nextHead, std::memory_order_release); + return true; + } + + +private: + std::atomic head_; // 开始元素(较早写入的)的位置 + std::atomic tail_; // 尾部的位置 + std::vector > ring_buffer_; // 环形队列 +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_ULOCKFREERINGBUFFERQUEUE_H diff --git a/src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h b/src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h index ade3b09..359308b 100644 --- a/src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h +++ b/src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h @@ -13,5 +13,6 @@ #include "UWorkStealingQueue.h" #include "UAtomicPriorityQueue.h" #include "UAtomicRingBufferQueue.h" +#include "ULockFreeRingBufferQueue.h" #endif //CGRAPH_UQUEUEINCLUDE_H diff --git a/src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h b/src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h index 48d4fac..6b05bae 100644 --- a/src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h +++ b/src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h @@ -13,21 +13,20 @@ #include #include "UQueueObject.h" -#include "../Task/UTaskInclude.h" -#include "../Lock/ULockInclude.h" CGRAPH_NAMESPACE_BEGIN +template class UWorkStealingQueue : public UQueueObject { public: /** * 向队列中写入信息 * @param task */ - CVoid push(UTask&& task) { + CVoid push(T&& task) { while (true) { if (lock_.try_lock()) { - deque_.emplace_front(std::move(task)); + deque_.emplace_front(std::forward(task)); lock_.unlock(); break; } else { @@ -42,10 +41,47 @@ class UWorkStealingQueue : public UQueueObject { * @param task * @return */ - CBool tryPush(UTask&& task) { + CBool tryPush(T&& task) { CBool result = false; if (lock_.try_lock()) { - deque_.emplace_back(std::move(task)); + deque_.emplace_back(std::forward(task)); + lock_.unlock(); + result = true; + } + return result; + } + + + /** + * 向队列中写入信息 + * @param task + */ + CVoid push(std::vector& tasks) { + while (true) { + if (lock_.try_lock()) { + for (const auto& task : tasks) { + deque_.emplace_front(std::forward(task)); + } + lock_.unlock(); + break; + } else { + std::this_thread::yield(); + } + } + } + + + /** + * 尝试批量写入内容 + * @param tasks + * @return + */ + CBool tryPush(std::vector& tasks) { + CBool result = false; + if (lock_.try_lock()) { + for (const auto& task : tasks) { + deque_.emplace_back(std::forward(task)); + } lock_.unlock(); result = true; } @@ -58,12 +94,12 @@ class UWorkStealingQueue : public UQueueObject { * @param task * @return */ - CBool tryPop(UTask& task) { + CBool tryPop(T& task) { // 这里不使用raii锁,主要是考虑到多线程的情况下,可能会重复进入 bool result = false; - if (lock_.try_lock()) { + if (!deque_.empty() && lock_.try_lock()) { if (!deque_.empty()) { - task = std::move(deque_.front()); // 从前方弹出 + task = std::forward(deque_.front()); // 从前方弹出 deque_.pop_front(); result = true; } @@ -80,11 +116,11 @@ class UWorkStealingQueue : public UQueueObject { * @param maxLocalBatchSize * @return */ - CBool tryPop(UTaskArrRef taskArr, int maxLocalBatchSize) { + CBool tryPop(std::vector& taskArr, int maxLocalBatchSize) { bool result = false; - if (lock_.try_lock()) { + if (!deque_.empty() && lock_.try_lock()) { while (!deque_.empty() && maxLocalBatchSize--) { - taskArr.emplace_back(std::move(deque_.front())); + taskArr.emplace_back(std::forward(deque_.front())); deque_.pop_front(); result = true; } @@ -100,11 +136,11 @@ class UWorkStealingQueue : public UQueueObject { * @param task * @return */ - CBool trySteal(UTask& task) { + CBool trySteal(T& task) { bool result = false; - if (lock_.try_lock()) { + if (!deque_.empty() && lock_.try_lock()) { if (!deque_.empty()) { - task = std::move(deque_.back()); // 从后方窃取 + task = std::forward(deque_.back()); // 从后方窃取 deque_.pop_back(); result = true; } @@ -120,11 +156,11 @@ class UWorkStealingQueue : public UQueueObject { * @param taskArr * @return */ - CBool trySteal(UTaskArrRef taskArr, int maxStealBatchSize) { + CBool trySteal(std::vector& taskArr, int maxStealBatchSize) { bool result = false; - if (lock_.try_lock()) { + if (!deque_.empty() && lock_.try_lock()) { while (!deque_.empty() && maxStealBatchSize--) { - taskArr.emplace_back(std::move(deque_.back())); + taskArr.emplace_back(std::forward(deque_.back())); deque_.pop_back(); result = true; } @@ -139,7 +175,7 @@ class UWorkStealingQueue : public UQueueObject { CGRAPH_NO_ALLOWED_COPY(UWorkStealingQueue) private: - std::deque deque_; // 存放任务的双向队列 + std::deque deque_; // 存放任务的双向队列 std::mutex lock_; // 用于处理deque_的锁 }; diff --git a/src/UtilsCtrl/ThreadPool/Semaphore/USemaphore.h b/src/UtilsCtrl/ThreadPool/Semaphore/USemaphore.h new file mode 100644 index 0000000..3c53ba0 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Semaphore/USemaphore.h @@ -0,0 +1,51 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: USemaphore.h +@Time: 2023/10/9 22:01 +@Desc: +***************************/ + +#ifndef CGRAPH_USEMAPHORE_H +#define CGRAPH_USEMAPHORE_H + +CGRAPH_NAMESPACE_BEGIN + +#include +#include + +#include "../UThreadObject.h" + +class USemaphore : public UThreadObject { +public: + /** + * 触发一次信号 + */ + CVoid signal() { + CGRAPH_UNIQUE_LOCK lk(mutex_); + cnt_++; + if (cnt_ <= 0) { + cv_.notify_one(); + } + } + + /** + * 等待信号触发 + */ + CVoid wait() { + CGRAPH_UNIQUE_LOCK lk(mutex_); + cnt_--; + if (cnt_ < 0) { + cv_.wait(lk); + } + } + +private: + CInt cnt_ = 0; // 记录当前的次数 + std::mutex mutex_; + std::condition_variable cv_; +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_USEMAPHORE_H diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h index 5bbe498..ddfc398 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h @@ -9,6 +9,9 @@ #ifndef CGRAPH_UTHREADPRIMARY_H #define CGRAPH_UTHREADPRIMARY_H +#include +#include + #include "UThreadBase.h" CGRAPH_NAMESPACE_BEGIN @@ -17,7 +20,6 @@ class UThreadPrimary : public UThreadBase { protected: explicit UThreadPrimary() { index_ = CGRAPH_SECONDARY_THREAD_COMMON_ID; - steal_range_ = 0; pool_threads_ = nullptr; type_ = CGRAPH_THREAD_TYPE_PRIMARY; } @@ -29,7 +31,7 @@ class UThreadPrimary : public UThreadBase { CGRAPH_ASSERT_NOT_NULL(config_) is_init_ = true; - steal_range_ = config_->calcStealRange(); + buildStealTargets(); thread_ = std::move(std::thread(&UThreadPrimary::run, this)); setSchedParam(); setAffinity(index_); @@ -64,7 +66,7 @@ class UThreadPrimary : public UThreadBase { * 线程执行函数 * @return */ - CStatus run() override { + CStatus run() final { CGRAPH_FUNCTION_BEGIN CGRAPH_ASSERT_INIT(true) CGRAPH_ASSERT_NOT_NULL(pool_threads_) @@ -91,7 +93,7 @@ class UThreadPrimary : public UThreadBase { if (popTask(task) || popPoolTask(task) || stealTask(task)) { runTask(task); } else { - std::this_thread::yield(); + fatWait(); } } @@ -102,7 +104,22 @@ class UThreadPrimary : public UThreadBase { // 尝试从主线程中获取/盗取批量task,如果成功,则依次执行 runTasks(tasks); } else { - std::this_thread::yield(); + fatWait(); + } + } + + + /** + * 如果总是进入无task的状态,则开始休眠 + * 休眠一定时间后,然后恢复执行状态,避免出现 + */ + CVoid fatWait() { + cur_empty_epoch_++; + std::this_thread::yield(); + if (cur_empty_epoch_ >= config_->primary_thread_busy_epoch_) { + CGRAPH_UNIQUE_LOCK lk(mutex_); + cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_)); + cur_empty_epoch_ = 0; } } @@ -117,6 +134,7 @@ class UThreadPrimary : public UThreadBase { || secondary_queue_.tryPush(std::move(task)))) { std::this_thread::yield(); } + cv_.notify_one(); } @@ -164,16 +182,15 @@ class UThreadPrimary : public UThreadBase { * 窃取的时候,仅从相邻的primary线程中窃取 * 待窃取相邻的数量,不能超过默认primary线程数 */ - for (int i = 0; i < steal_range_; i++) { + for (auto& target : steal_targets_) { /** * 从线程中周围的thread中,窃取任务。 * 如果成功,则返回true,并且执行任务。 * steal 的时候,先从第二个队列里偷,从而降低触碰锁的概率 */ - int curIndex = (index_ + i + 1) % config_->default_thread_size_; - if (likely((*pool_threads_)[curIndex]) - && (((*pool_threads_)[curIndex])->secondary_queue_.trySteal(task)) - || ((*pool_threads_)[curIndex])->primary_queue_.trySteal(task)) { + if (likely((*pool_threads_)[target]) + && (((*pool_threads_)[target])->secondary_queue_.trySteal(task)) + || ((*pool_threads_)[target])->primary_queue_.trySteal(task)) { return true; } } @@ -192,13 +209,12 @@ class UThreadPrimary : public UThreadBase { return false; } - for (int i = 0; i < steal_range_; i++) { - int curIndex = (index_ + i + 1) % config_->default_thread_size_; - if (likely((*pool_threads_)[curIndex])) { - bool result = ((*pool_threads_)[curIndex])->secondary_queue_.trySteal(tasks, config_->max_steal_batch_size_); + for (auto& target : steal_targets_) { + if (likely((*pool_threads_)[target])) { + bool result = ((*pool_threads_)[target])->secondary_queue_.trySteal(tasks, config_->max_steal_batch_size_); auto leftSize = config_->max_steal_batch_size_ - tasks.size(); if (leftSize > 0) { - result |= ((*pool_threads_)[curIndex])->primary_queue_.trySteal(tasks, leftSize); + result |= ((*pool_threads_)[target])->primary_queue_.trySteal(tasks, leftSize); } if (result) { @@ -216,12 +232,30 @@ class UThreadPrimary : public UThreadBase { return false; } + + /** + * 构造 steal 范围的 target,避免每次盗取的时候,重复计算 + * @return + */ + CVoid buildStealTargets() { + steal_targets_.clear(); + for (int i = 0; i < config_->calcStealRange(); i++) { + auto target = (index_ + i + 1) % config_->default_thread_size_; + steal_targets_.push_back(target); + } + steal_targets_.shrink_to_fit(); + } + private: int index_; // 线程index - int steal_range_; // 偷窃的范围信息 - UWorkStealingQueue primary_queue_; // 内部队列信息 - UWorkStealingQueue secondary_queue_; // 第二个队列,用于减少触锁概率,提升性能 + int cur_empty_epoch_ = 0; // 当前空转的轮数信息 + UWorkStealingQueue primary_queue_; // 内部队列信息 + UWorkStealingQueue secondary_queue_; // 第二个队列,用于减少触锁概率,提升性能 std::vector* pool_threads_; // 用于存放线程池中的线程信息 + std::vector steal_targets_; // 被偷的目标信息 + + std::mutex mutex_; + std::condition_variable cv_; friend class UThreadPool; friend class UAllocator; diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h index 4ab5dbd..be4bd87 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h @@ -56,7 +56,7 @@ class UThreadSecondary : public UThreadBase { } - CStatus run() override { + CStatus run() final { CGRAPH_FUNCTION_BEGIN CGRAPH_ASSERT_INIT(true) diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.cpp b/src/UtilsCtrl/ThreadPool/UThreadPool.cpp index 2f28ead..4293b1a 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.cpp +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.cpp @@ -229,7 +229,7 @@ CVoid UThreadPool::monitor() { CGRAPH_SLEEP_SECOND(1) } - int span = config_.monitor_span_; + auto span = config_.monitor_span_; while (config_.monitor_enable_ && is_init_ && span--) { CGRAPH_SLEEP_SECOND(1) // 保证可以快速退出 } diff --git a/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h b/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h index 71996da..89a080e 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h +++ b/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h @@ -23,8 +23,10 @@ struct UThreadPoolConfig : public CStruct { int max_local_batch_size_ = CGRAPH_MAX_LOCAL_BATCH_SIZE; int max_pool_batch_size_ = CGRAPH_MAX_POOL_BATCH_SIZE; int max_steal_batch_size_ = CGRAPH_MAX_STEAL_BATCH_SIZE; + int primary_thread_busy_epoch_ = CGRAPH_PRIMARY_THREAD_BUSY_EPOCH; + int primary_thread_empty_interval_ = CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL; int secondary_thread_ttl_ = CGRAPH_SECONDARY_THREAD_TTL; - int monitor_span_ = CGRAPH_MONITOR_SPAN; + CSec monitor_span_ = CGRAPH_MONITOR_SPAN; CMSec queue_emtpy_interval_ = CGRAPH_QUEUE_EMPTY_INTERVAL; int primary_thread_policy_ = CGRAPH_PRIMARY_THREAD_POLICY; int secondary_thread_policy_ = CGRAPH_SECONDARY_THREAD_POLICY; diff --git a/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h b/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h index 7867e0c..358f312 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h +++ b/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h @@ -49,17 +49,19 @@ static const int CGRAPH_LONG_TIME_TASK_STRATEGY = -101; /** * 以下为线程池配置信息 */ -static const int CGRAPH_DEFAULT_THREAD_SIZE = 0; // 默认开启主线程个数 -static const int CGRAPH_SECONDARY_THREAD_SIZE = 8; // 默认开启辅助线程个数 +static const int CGRAPH_DEFAULT_THREAD_SIZE = 8; // 默认开启主线程个数 +static const int CGRAPH_SECONDARY_THREAD_SIZE = 0; // 默认开启辅助线程个数 static const int CGRAPH_MAX_THREAD_SIZE = 16; // 最大线程个数 static const int CGRAPH_MAX_TASK_STEAL_RANGE = 2; // 盗取机制相邻范围 static const bool CGRAPH_BATCH_TASK_ENABLE = false; // 是否开启批量任务功能 static const int CGRAPH_MAX_LOCAL_BATCH_SIZE = 2; // 批量执行本地任务最大值 static const int CGRAPH_MAX_POOL_BATCH_SIZE = 2; // 批量执行通用任务最大值 static const int CGRAPH_MAX_STEAL_BATCH_SIZE = 2; // 批量盗取任务最大值 +static const int CGRAPH_PRIMARY_THREAD_BUSY_EPOCH = 10; // 主线程进入wait状态的轮数,数值越大,理论性能越高,但空转可能性也越大 +static const CMSec CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL = 3; // 主线程进入休眠状态的默认时间 static const int CGRAPH_SECONDARY_THREAD_TTL = 10; // 辅助线程ttl,单位为s static const bool CGRAPH_MONITOR_ENABLE = false; // 是否开启监控程序 -static const int CGRAPH_MONITOR_SPAN = 5; // 监控线程执行间隔,单位为s +static const CSec CGRAPH_MONITOR_SPAN = 5; // 监控线程执行间隔,单位为s static const CMSec CGRAPH_QUEUE_EMPTY_INTERVAL = 3; // 队列为空时,等待的时间。仅针对辅助线程,单位为ms static const bool CGRAPH_BIND_CPU_ENABLE = false; // 是否开启绑定cpu模式(仅针对主线程) static const int CGRAPH_PRIMARY_THREAD_POLICY = CGRAPH_THREAD_SCHED_OTHER; // 主线程调度策略 diff --git a/src/UtilsCtrl/ThreadPool/UThreadPoolInclude.h b/src/UtilsCtrl/ThreadPool/UThreadPoolInclude.h index e279dad..7c87584 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPoolInclude.h +++ b/src/UtilsCtrl/ThreadPool/UThreadPoolInclude.h @@ -17,5 +17,6 @@ #include "Task/UTaskInclude.h" #include "Thread/UThreadInclude.h" #include "Lock/ULockInclude.h" +#include "Semaphore/USemaphore.h" #endif //CGRAPH_UTHREADPOOLINCLUDE_H