c++11简易线程池

motivation:用空间换时间,牺牲服务器的硬件资源从而换取运行效率。在服务器启动时就初始化整个池,运行时若需要线程则直接从池中获取,无需再动态分配;线程完成任务后也会直接回到池中,无需执行系统调用释放资源

线程池主要分为三部分:

  • 任务队列:一个class,用于存储需要处理的任务,工作线程从中取用任务,处理完的任务会被删除
  • 工作线程(任务队列的消费者):有N个,不停地从任务队列中取出任务并处理,任务队列为空时工作线程会被阻塞
  • 管理者线程(不处理任务队列中的任务):只有1个,作用是周期性地检测任务队列中的任务数量和工作线程的个数,可根据任务多少适当创建或释放工作线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#include<iostream>
#include<thread>
#include<mutex>
#include<queue>
#include<atomic>
#include<chrono>
#include<signal.h>
#include<condition_variable>


/************************* 任务队列 ****************************/

using callback = void(*)(void*);

struct task {
task() {
fun = nullptr;
args = nullptr;
}

task(callback f, void* args) {
this->fun = f;
this->args = args;
}

callback fun;
void* args;
};



class taskQueue {
private:
std::queue<task> m_Qtask;
std::mutex m_mutexQueue;

public:
taskQueue() {};
~taskQueue() {};

void addTask(task newTask);
task takeTask();
int getTaskNum();

};


void taskQueue::addTask(task newTask) {
// lock_guard模板类使用raii,析构时自动释放锁
std::lock_guard<std::mutex> lock(m_mutexQueue);
m_Qtask.push(newTask);
}


task taskQueue::takeTask() {
std::lock_guard<std::mutex> lock(m_mutexQueue);
task tmp;
if (!m_Qtask.empty()) {
tmp = m_Qtask.front();
m_Qtask.pop();
}
return tmp;
}


int taskQueue::getTaskNum() {
return m_Qtask.size();
}

/************************* 线程池 ****************************/
class threadPool {
private:
taskQueue* m_taskQueue;
std::thread* m_managerThread;
std::vector<std::thread> m_workThreads;

int m_minNum;
int m_maxNum;
std::atomic<int> m_busyNum; // 工作中的线程数
std::atomic<int> m_aliveNum; // 存活的的线程数
std::atomic<int> m_exitNum; // 要销毁的线程数
static const int m_addThreadNum = 2; // 一次加入的线程数

std::mutex m_mutexPool;
std::condition_variable m_notEmpty;

// 为true时销毁线程池
bool m_isShutdown;


static void* worker(void* arg);
static void* manager(void* arg);


public:
threadPool(int min, int max);
~threadPool();

void addTask(task newTask);
void threadExit();
};


threadPool::threadPool(int min, int max) {
m_taskQueue = new taskQueue;

m_minNum = min;
m_maxNum = max;
m_busyNum = 0;
m_aliveNum = min;
m_exitNum = 0;

m_isShutdown = false;

for (int i = 0; i < min; i++) {
m_workThreads.push_back(std::thread(worker, this));
}
m_managerThread = new std::thread(manager, this);
}


threadPool::~threadPool() {
m_isShutdown = true;

if (m_aliveNum) {
m_exitNum.store(m_aliveNum);
m_notEmpty.notify_all();
}

if (m_taskQueue)
delete m_taskQueue;

if (m_managerThread != nullptr) {
if (m_managerThread->joinable())
m_managerThread->join();
delete m_managerThread;
}

if (!m_workThreads.empty())
threadExit();
}

void* threadPool::worker(void* arg) {
std::cout << std::this_thread::get_id() << " worker thread strating" << std::endl;

// pool指向主线程(main函数)中实例化的那个threadPool
threadPool* pool = static_cast<threadPool*> (arg);

while (true) {
// 构造时自动上锁
std::unique_lock<std::mutex> poolLock(pool->m_mutexPool);

// 还没有任务时
while (pool->m_taskQueue->getTaskNum() == 0 && pool->m_isShutdown == false) {
std::cout << std::this_thread::get_id() << " waiting task" << std::endl;

// 释放锁并阻塞等待任务,被唤醒后自动上锁
pool->m_notEmpty.wait(poolLock);

// 判断是否销毁线程
if (pool->m_exitNum > 0 && pool->m_aliveNum > pool->m_minNum) {
pool->m_exitNum--;
pool->m_aliveNum--;
std::cout << std::this_thread::get_id() << " thread end working" << std::endl;
poolLock.unlock();
return nullptr;
}
}

if (pool->m_isShutdown) {
poolLock.unlock();
std::cout << std::this_thread::get_id() << " thread end working" << std::endl;
return nullptr;
}

task oneTask = pool->m_taskQueue->takeTask();

// 取出任务后就上锁,不需要把任务执行放在锁范围内
poolLock.unlock();

pool->m_busyNum++;
oneTask.fun(oneTask.args);
delete oneTask.args; // 这个delete太容易忘记了
pool->m_busyNum--;
}
}


void* threadPool::manager(void* arg) {
std::cout << std::this_thread::get_id() << " manager thread strating" << std::endl;
threadPool* pool = static_cast<threadPool*> (arg);

while (pool->m_isShutdown == false) {
std::this_thread::sleep_for(std::chrono::seconds(2));
std::unique_lock<std::mutex> poolLock(pool->m_mutexPool);
int queueSize = pool->m_taskQueue->getTaskNum();

// 唤醒等待任务的线程
int sleepThreadNum = pool->m_aliveNum - pool->m_busyNum;
if (pool->m_busyNum < queueSize) {
while (sleepThreadNum > 0) {
// 随机唤醒一个waiting中的worker线程
pool->m_notEmpty.notify_one();
sleepThreadNum--;
}
}

// 任务数>线程数时,添加线程
if (queueSize > pool->m_aliveNum && pool->m_aliveNum < pool->m_maxNum) {
for (int count = 0; count < m_addThreadNum && pool->m_aliveNum < pool->m_maxNum; count++) {
pool->m_workThreads.push_back(std::thread(worker, pool));
pool->m_aliveNum++;
}
}
poolLock.unlock();

// 休息线程数>工作线程数时,销毁多余线程
sleepThreadNum = pool->m_aliveNum - pool->m_busyNum;
if (sleepThreadNum > pool->m_busyNum && pool->m_aliveNum > pool->m_minNum) {
pool->m_exitNum = m_addThreadNum;
for (int i = 0; i < m_addThreadNum; i++)
pool->m_notEmpty.notify_one();
}

if (pool->m_workThreads.size() > pool->m_aliveNum)
pool->threadExit();
}
}


void threadPool::addTask(task newTask) {
m_taskQueue->addTask(newTask);
}


void threadPool::threadExit() {
auto iter = m_workThreads.begin();
while (iter != m_workThreads.end()) {
auto currentThreadID = (*iter).native_handle();

// 发送信号0,探测信号是否存活
int kill = pthread_kill(currentThreadID, 0);
if (kill == ESRCH) {
if ((*iter).joinable())
(*iter).join();
iter = m_workThreads.erase(iter);
std::cout << "thread" << currentThreadID << " has exited" << std::endl;
}
else {
iter++;
}
}
}

/************************* test ****************************/
std::mutex mtxTask;

void testFun(void* arg) {
int num = *(int*)arg;

std::lock_guard<std::mutex> lock(mtxTask);
std::cout << "current thread id is" << std::this_thread::get_id() << num << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}


int main() {
threadPool pool(2, 10);
for (int i = 0; i < 100; i++) {
int* num = new int(i);
pool.addTask(task(testFun, (void*)num));
}

std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}

一些问题

Q:worker()函数中,应该把oneTask.fun(oneTask.args);放到poolLock.unlock();之前吗?

A:不应该,poolLock只保证各worker从taskQueue中取任务时不冲突、不会同一个任务取两次就行了,至于任务函数中的线程安全不应该由threadPool来保证,那是任务函数自己的事,要线程安全就自己在函数内部上锁或atomic(本来真实场景中的任务函数就肯定不一样,不会像这个demo中全用一样的函数,也没办法由threadPool来统一上锁,指不定各任务之间根本没有共享资源,本身就不需要上锁),由threadPool统一上锁的话很容易造成线程堵塞,其它线程只想领个任务做自己的事,结果还得等其它线程先完成任务后才能领取,这就很浪费,典型的锁粒度太大的反面教材

Q:为什么worker()和manager()都要设为static?

A:std::thread(worker, this)的函数原型为:thread(void *(& f)(void *arg), threadPool *&& args)

即传入的回调函数callback不应该带有参数,而类成员函数自带隐藏this指针参数,直接thread构造会报错,而static的类成员函数会消除隐藏this指针参数

Q:manager()中唤醒等待线程时应该用notify_one()还是notify_all()?

A:notify_one(),因为one是唤醒一个condition_variable.wait()中的线程,具体哪个由os的调度器来决定;all则是唤醒所有等待线程,但此处本来也只能有一个线程去领任务,唤醒了多个worker它们也还是会去竞争poolLock,造成不必要的开销。notify_all()应该在多个线程可以同时处理任务的场景下才使用

Q:threadExit()中的if (kill == ESRCH)是什么意思?

A:int kill = pthread_kill(currentThreadID, 0);用于给线程发送试探信号,返回值kill如果为0,表示找到了这个线程,即线程还在生命周期内、还在工作,就不回收;如果kill为ESRCH,表示没找到该线程,说明其生命周期已经结束,可以回收资源了(尽管线程自己会析构,但还有些相关资源可能需要人为处理,如m_workThreads.erase())

Q:manager()中销毁多余线程的思路是什么?

A:把m_exitNum设为一个正数,然后唤醒那些休息中的线程(即被condition_variable.wait()阻塞的线程),worker线程被唤醒后判断if (pool->m_exitNum > 0)、return、结束生命周期、等待被回收资源


c++11简易线程池
https://qlhhahaha.github.io/2024/12/25/c++11简易线程池/
作者
qlhhahaha
发布于
2024年12月25日
许可协议