diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h index 9c652e14..6b14b52f 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h @@ -9,6 +9,7 @@ #ifndef CGRAPH_UTHREADBASE_H #define CGRAPH_UTHREADBASE_H +#include #include #include "../UThreadObject.h" @@ -80,10 +81,8 @@ class UThreadBase : public UThreadObject { * @param task */ CVoid runTask(const UTask& task) { - is_running_ = true; task(); total_task_num_++; - is_running_ = false; } @@ -92,12 +91,10 @@ class UThreadBase : public UThreadObject { * @param tasks */ CVoid runTasks(const UTaskArr& tasks) { - is_running_ = true; for (auto& task : tasks) { task(); } total_task_num_ += tasks.size(); - is_running_ = false; } @@ -127,7 +124,7 @@ class UThreadBase : public UThreadObject { */ CBool wakeup() { CBool result = false; - if (!is_running_) { + if (!is_running_.load(std::memory_order_relaxed)) { cv_.notify_one(); result = true; } @@ -154,6 +151,7 @@ class UThreadBase : public UThreadObject { */ CVoid loopProcess() { CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(config_) + is_running_ = true; if (config_->batch_task_enable_) { while (done_) { processTasks(); // 批量任务获取执行接口 @@ -243,7 +241,7 @@ class UThreadBase : public UThreadObject { protected: CBool done_; // 线程状态标记 CBool is_init_; // 标记初始化状态 - CBool is_running_; // 是否正在执行 + std::atomic is_running_; // 是否正在执行 CInt type_ = 0; // 用于区分线程类型(主线程、辅助线程) CULong total_task_num_ = 0; // 处理的任务的数字 diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h index 2094badd..d7bc5f35 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h @@ -118,8 +118,15 @@ class UThreadPrimary : public UThreadBase { CGRAPH_YIELD(); if (cur_empty_epoch_ >= config_->primary_thread_busy_epoch_) { CGRAPH_UNIQUE_LOCK lk(mutex_); - cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_), - [this] { return 0 == cur_empty_epoch_ || !wsq_.isEmpty() || !pool_task_queue_->empty() || !done_; }); + const auto ready = [this] { + return 0 == cur_empty_epoch_ || !wsq_.isEmpty() || !pool_task_queue_->empty() || !done_; + }; + + if (!ready()) { + is_running_ = false; + cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_), ready); + is_running_ = true; + } cur_empty_epoch_ = 0; } } diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h index c7c691f8..46482342 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h @@ -93,7 +93,9 @@ class UThreadSecondary : public UThreadBase { * @notice 目的是降低cpu的占用率 */ CVoid waitRunTask(const CMSec ms) { + is_running_ = false; const auto& task = pool_task_queue_->popWithTimeout(ms); + is_running_ = true; if (task) { runTask(*task); } @@ -105,7 +107,7 @@ class UThreadSecondary : public UThreadBase { * @return */ bool freeze() { - if (likely(is_running_)) { + if (likely(is_running_.load(std::memory_order_relaxed))) { cur_ttl_++; cur_ttl_ = (std::min)(cur_ttl_, config_->secondary_thread_ttl_); } else { diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.cpp b/src/UtilsCtrl/ThreadPool/UThreadPool.cpp index 2eb3d016..0b546b17 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.cpp +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.cpp @@ -211,7 +211,7 @@ CIndex UThreadPool::dispatch(const CIndex origIndex) { if (CGRAPH_DEFAULT_TASK_STRATEGY == origIndex) { realIndex = cur_index_.fetch_add(1, std::memory_order_relaxed) % config_.max_thread_size_; if (realIndex >= 0 && realIndex < config_.default_thread_size_ - && primary_threads_[realIndex]->is_running_) { + && primary_threads_[realIndex]->is_running_.load(std::memory_order_relaxed)) { // 如果是默认调度,并且被放置到 正在running 的pt中,则切换为 trigger_one 的策略,防止阻塞 realIndex = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY; } @@ -255,7 +255,7 @@ CVoid UThreadPool::monitor() { // 如果 primary线程都在执行,则表示忙碌 const bool busy = !primary_threads_.empty() && std::all_of(primary_threads_.begin(), primary_threads_.end(), - [](UThreadPrimaryPtr ptr) { return ptr && ptr->is_running_; }); + [](UThreadPrimaryPtr ptr) { return ptr && ptr->is_running_.load(std::memory_order_relaxed); }); // 如果忙碌或者priority_task_queue_中有任务,则需要添加 secondary线程 if (busy || !priority_task_queue_.empty()) { @@ -271,21 +271,24 @@ CVoid UThreadPool::monitor() { } -CSize UThreadPool::wakeupAllThread() const { +CSize UThreadPool::wakeupAllThread() { CSize size = 0; - for (auto& pt : primary_threads_) { - if (pt->wakeup()) { - ++size; + if (wakeup_mutex_.try_lock()) { + for (const auto& pt : primary_threads_) { + if (pt->wakeup()) { + ++size; + } } - } - for (auto& st : secondary_threads_) { - if (st->wakeup()) { - ++size; + for (const auto& st : secondary_threads_) { + if (st->wakeup()) { + ++size; + } } + wakeup_mutex_.unlock(); } return size; } -CGRAPH_NAMESPACE_END \ No newline at end of file +CGRAPH_NAMESPACE_END diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.h b/src/UtilsCtrl/ThreadPool/UThreadPool.h index d51bb61b..801f49a3 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.h +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.h @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include #include @@ -179,7 +181,7 @@ class UThreadPool : public UThreadObject { * 通知所有thread 开启 * @return */ - CSize wakeupAllThread() const; + CSize wakeupAllThread(); protected: /** @@ -208,6 +210,7 @@ class UThreadPool : public UThreadObject { std::thread monitor_thread_; // 监控线程 std::map thread_record_map_; // 线程记录的信息 std::mutex st_mutex_; // 辅助线程发生变动的时候,加的mutex信息 + std::mutex wakeup_mutex_; // 防止wakeupAllThread并发重入 }; using UThreadPoolPtr = UThreadPool *;