Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#ifndef CGRAPH_UTHREADBASE_H
#define CGRAPH_UTHREADBASE_H

#include <atomic>
#include <thread>

#include "../UThreadObject.h"
Expand Down Expand Up @@ -80,10 +81,8 @@ class UThreadBase : public UThreadObject {
* @param task
*/
CVoid runTask(const UTask& task) {
is_running_ = true;
task();
total_task_num_++;
is_running_ = false;
}


Expand All @@ -92,12 +91,10 @@ class UThreadBase : public UThreadObject {
* @param tasks
*/
CVoid runTasks(const UTaskArr& tasks) {
is_running_ = true;
for (auto& task : tasks) {
task();
}
total_task_num_ += tasks.size();
is_running_ = false;
}


Expand Down Expand Up @@ -127,7 +124,7 @@ class UThreadBase : public UThreadObject {
*/
CBool wakeup() {
CBool result = false;
if (!is_running_) {
if (!is_running_.load(std::memory_order_relaxed)) {
cv_.notify_one();
result = true;
}
Expand All @@ -154,6 +151,7 @@ class UThreadBase : public UThreadObject {
*/
CVoid loopProcess() {
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(config_)
is_running_ = true;
if (config_->batch_task_enable_) {
while (done_) {
processTasks(); // 批量任务获取执行接口
Expand Down Expand Up @@ -243,7 +241,7 @@ class UThreadBase : public UThreadObject {
protected:
CBool done_; // 线程状态标记
CBool is_init_; // 标记初始化状态
CBool is_running_; // 是否正在执行
std::atomic<bool> is_running_; // 是否正在执行
CInt type_ = 0; // 用于区分线程类型(主线程、辅助线程)
CULong total_task_num_ = 0; // 处理的任务的数字

Expand Down
11 changes: 9 additions & 2 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,15 @@ class UThreadPrimary : public UThreadBase {
CGRAPH_YIELD();
if (cur_empty_epoch_ >= config_->primary_thread_busy_epoch_) {
CGRAPH_UNIQUE_LOCK lk(mutex_);
cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_),
[this] { return 0 == cur_empty_epoch_ || !wsq_.isEmpty() || !pool_task_queue_->empty() || !done_; });
const auto ready = [this] {
return 0 == cur_empty_epoch_ || !wsq_.isEmpty() || !pool_task_queue_->empty() || !done_;
};

if (!ready()) {
is_running_ = false;
cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_), ready);
is_running_ = true;
}
cur_empty_epoch_ = 0;
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ class UThreadSecondary : public UThreadBase {
* @notice 目的是降低cpu的占用率
*/
CVoid waitRunTask(const CMSec ms) {
is_running_ = false;
const auto& task = pool_task_queue_->popWithTimeout(ms);
is_running_ = true;
if (task) {
runTask(*task);
}
Expand All @@ -105,7 +107,7 @@ class UThreadSecondary : public UThreadBase {
* @return
*/
bool freeze() {
if (likely(is_running_)) {
if (likely(is_running_.load(std::memory_order_relaxed))) {
cur_ttl_++;
cur_ttl_ = (std::min)(cur_ttl_, config_->secondary_thread_ttl_);
} else {
Expand Down
25 changes: 14 additions & 11 deletions src/UtilsCtrl/ThreadPool/UThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ CIndex UThreadPool::dispatch(const CIndex origIndex) {
if (CGRAPH_DEFAULT_TASK_STRATEGY == origIndex) {
realIndex = cur_index_.fetch_add(1, std::memory_order_relaxed) % config_.max_thread_size_;
if (realIndex >= 0 && realIndex < config_.default_thread_size_
&& primary_threads_[realIndex]->is_running_) {
&& primary_threads_[realIndex]->is_running_.load(std::memory_order_relaxed)) {
// 如果是默认调度,并且被放置到 正在running 的pt中,则切换为 trigger_one 的策略,防止阻塞
realIndex = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY;
}
Expand Down Expand Up @@ -255,7 +255,7 @@ CVoid UThreadPool::monitor() {

// 如果 primary线程都在执行,则表示忙碌
const bool busy = !primary_threads_.empty() && std::all_of(primary_threads_.begin(), primary_threads_.end(),
[](UThreadPrimaryPtr ptr) { return ptr && ptr->is_running_; });
[](UThreadPrimaryPtr ptr) { return ptr && ptr->is_running_.load(std::memory_order_relaxed); });

// 如果忙碌或者priority_task_queue_中有任务,则需要添加 secondary线程
if (busy || !priority_task_queue_.empty()) {
Expand All @@ -271,21 +271,24 @@ CVoid UThreadPool::monitor() {
}


CSize UThreadPool::wakeupAllThread() const {
CSize UThreadPool::wakeupAllThread() {
CSize size = 0;
for (auto& pt : primary_threads_) {
if (pt->wakeup()) {
++size;
if (wakeup_mutex_.try_lock()) {
for (const auto& pt : primary_threads_) {
if (pt->wakeup()) {
++size;
}
}
}

for (auto& st : secondary_threads_) {
if (st->wakeup()) {
++size;
for (const auto& st : secondary_threads_) {
if (st->wakeup()) {
++size;
}
}
wakeup_mutex_.unlock();
}

return size;
}

CGRAPH_NAMESPACE_END
CGRAPH_NAMESPACE_END
5 changes: 4 additions & 1 deletion src/UtilsCtrl/ThreadPool/UThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <map>
#include <future>
#include <thread>
#include <atomic>
#include <mutex>
#include <memory>
#include <functional>

Expand Down Expand Up @@ -179,7 +181,7 @@ class UThreadPool : public UThreadObject {
* 通知所有thread 开启
* @return
*/
CSize wakeupAllThread() const;
CSize wakeupAllThread();

protected:
/**
Expand Down Expand Up @@ -208,6 +210,7 @@ class UThreadPool : public UThreadObject {
std::thread monitor_thread_; // 监控线程
std::map<CSize, int> thread_record_map_; // 线程记录的信息
std::mutex st_mutex_; // 辅助线程发生变动的时候,加的mutex信息
std::mutex wakeup_mutex_; // 防止wakeupAllThread并发重入
};

using UThreadPoolPtr = UThreadPool *;
Expand Down
Loading