-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy paththreadpool.h
160 lines (140 loc) · 6.19 KB
/
threadpool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#pragma once
/*
==========================================================================
* 类ThreadPool是本代码的核心类,类中自动维护线程池的创建和任务队列的派送
* 其中的TaskFun是任务函数
* 其中的TaskCallbackFun是回调函数
*用法:定义一个ThreadPool变量,TaskFun函数和TaskCallbackFun回调函数,然后调用ThreadPool的QueueTaskItem()函数即可
Author: TTGuoying
Date: 2018/02/19 23:15
==========================================================================
*/
#pragma once
#include <Windows.h>
#include <list>
#include <queue>
#include <memory>
using std::list;
using std::queue;
using std::shared_ptr;
#define THRESHOLE_OF_WAIT_TASK 20
typedef int(*TaskFun)(PVOID param); // 任务函数
typedef void(*TaskCallbackFun)(int result); // 回调函数
class ThreadPool
{
private:
// 线程类(内部类)
class Thread
{
public:
Thread(ThreadPool* threadPool);
~Thread();
BOOL isBusy(); // 是否有任务在执行
void ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback); // 执行任务
private:
ThreadPool* threadPool; // 所属线程池
BOOL busy; // 是否有任务在执行
BOOL exit; // 是否退出
HANDLE thread; // 线程句柄
TaskFun task; // 要执行的任务
PVOID param; // 任务参数
TaskCallbackFun taskCb; // 回调的任务
static unsigned int __stdcall ThreadProc(PVOID pM); // 线程函数
};
// IOCP的通知种类
enum WAIT_OPERATION_TYPE
{
GET_TASK,
EXIT
};
// 待执行的任务类
class WaitTask
{
public:
WaitTask(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL bLong)
{
this->task = task;
this->param = param;
this->taskCb = taskCb;
this->bLong = bLong;
}
~WaitTask() { task = NULL; taskCb = NULL; bLong = FALSE; param = NULL; }
TaskFun task; // 要执行的任务
PVOID param; // 任务参数
TaskCallbackFun taskCb; // 回调的任务
BOOL bLong; // 是否时长任务
};
// 从任务列表取任务的线程函数
static unsigned int __stdcall GetTaskThreadProc(PVOID pM)
{
ThreadPool* threadPool = (ThreadPool*)pM;
BOOL bRet = FALSE;
DWORD dwBytes = 0;
WAIT_OPERATION_TYPE opType;
OVERLAPPED* ol;
while (WAIT_OBJECT_0 != WaitForSingleObject(threadPool->stopEvent, 0))
{
BOOL bRet = GetQueuedCompletionStatus(threadPool->completionPort, &dwBytes, (PULONG_PTR)&opType, &ol, INFINITE);
// 收到退出标志
if (EXIT == (DWORD)opType)
{
break;
}
else if (GET_TASK == (DWORD)opType)
{
threadPool->GetTaskExcute();
}
}
return 0;
}
//线程临界区锁
class CriticalSectionLock
{
private:
CRITICAL_SECTION cs;//临界区
public:
CriticalSectionLock() { InitializeCriticalSection(&cs); }
~CriticalSectionLock() { DeleteCriticalSection(&cs); }
void Lock() { EnterCriticalSection(&cs); }
void UnLock() { LeaveCriticalSection(&cs); }
};
public:
ThreadPool(size_t minNumOfThread = 2, size_t maxNumOfThread = 10);
~ThreadPool();
BOOL QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb = NULL, BOOL longFun = FALSE); // 任务入队
private:
size_t getCurNumOfThread() { return getIdleThreadNum() + getBusyThreadNum(); } // 获取线程池中的当前线程数
size_t GetMaxNumOfThread() { return maxNumOfThread - numOfLongFun; } // 获取线程池中的最大线程数
void SetMaxNumOfThread(size_t size) // 设置线程池中的最大线程数
{
if (size < numOfLongFun)
{
maxNumOfThread = size + numOfLongFun;
}
else
maxNumOfThread = size;
}
size_t GetMinNumOfThread() { return minNumOfThread; } // 获取线程池中的最小线程数
void SetMinNumOfThread(size_t size) { minNumOfThread = size; } // 设置线程池中的最小线程数
size_t getIdleThreadNum() { return idleThreadList.size(); } // 获取线程池中的线程数
size_t getBusyThreadNum() { return busyThreadList.size(); } // 获取线程池中的线程数
void CreateIdleThread(size_t size); // 创建空闲线程
void DeleteIdleThread(size_t size); // 删除空闲线程
Thread* GetIdleThread(); // 获取空闲线程
void MoveBusyThreadToIdleList(Thread* busyThread); // 忙碌线程加入空闲列表
void MoveThreadToBusyList(Thread* thread); // 线程加入忙碌列表
void GetTaskExcute(); // 从任务队列中取任务执行
WaitTask* GetTask(); // 从任务队列中取任务
CriticalSectionLock idleThreadLock; // 空闲线程列表锁
list<Thread*> idleThreadList; // 空闲线程列表
CriticalSectionLock busyThreadLock; // 忙碌线程列表锁
list<Thread*> busyThreadList; // 忙碌线程列表
CriticalSectionLock waitTaskLock;
list<WaitTask*> waitTaskList; // 任务列表
HANDLE dispatchThrad; // 分发任务线程
HANDLE stopEvent; // 通知线程退出的时间
HANDLE completionPort; // 完成端口
size_t maxNumOfThread; // 线程池中最大的线程数
size_t minNumOfThread; // 线程池中最小的线程数
size_t numOfLongFun; // 线程池中最小的线程数
};