motivation:用空间换时间,牺牲服务器的硬件资源从而换取运行效率。在服务器启动时就初始化整个池,运行时若需要线程则直接从池中获取,无需再动态分配;线程完成任务后也会直接回到池中,无需执行系统调用释放资源
线程池主要分为三部分:
- 任务队列:一个class,用于存储需要处理的任务,工作线程从中取用任务,处理完的任务会被删除
- 工作线程(任务队列的消费者):有N个,不停地从任务队列中取出任务并处理,任务队列为空时工作线程会被阻塞
- 管理者线程(不处理任务队列中的任务):只有1个,作用是周期性地检测任务队列中的任务数量和工作线程的个数,可根据任务多少适当创建或释放工作线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
| #include<iostream> #include<thread> #include<mutex> #include<queue> #include<atomic> #include<chrono> #include<signal.h> #include<condition_variable>
using callback = void(*)(void*);
struct task { task() { fun = nullptr; args = nullptr; }
task(callback f, void* args) { this->fun = f; this->args = args; }
callback fun; void* args; };
class taskQueue { private: std::queue<task> m_Qtask; std::mutex m_mutexQueue;
public: taskQueue() {}; ~taskQueue() {};
void addTask(task newTask); task takeTask(); int getTaskNum();
};
void taskQueue::addTask(task newTask) { std::lock_guard<std::mutex> lock(m_mutexQueue); m_Qtask.push(newTask); }
task taskQueue::takeTask() { std::lock_guard<std::mutex> lock(m_mutexQueue); task tmp; if (!m_Qtask.empty()) { tmp = m_Qtask.front(); m_Qtask.pop(); } return tmp; }
int taskQueue::getTaskNum() { return m_Qtask.size(); }
class threadPool { private: taskQueue* m_taskQueue; std::thread* m_managerThread; std::vector<std::thread> m_workThreads;
int m_minNum; int m_maxNum; std::atomic<int> m_busyNum; std::atomic<int> m_aliveNum; std::atomic<int> m_exitNum; static const int m_addThreadNum = 2;
std::mutex m_mutexPool; std::condition_variable m_notEmpty;
bool m_isShutdown;
static void* worker(void* arg); static void* manager(void* arg);
public: threadPool(int min, int max); ~threadPool();
void addTask(task newTask); void threadExit(); };
threadPool::threadPool(int min, int max) { m_taskQueue = new taskQueue;
m_minNum = min; m_maxNum = max; m_busyNum = 0; m_aliveNum = min; m_exitNum = 0;
m_isShutdown = false;
for (int i = 0; i < min; i++) { m_workThreads.push_back(std::thread(worker, this)); } m_managerThread = new std::thread(manager, this); }
threadPool::~threadPool() { m_isShutdown = true;
if (m_aliveNum) { m_exitNum.store(m_aliveNum); m_notEmpty.notify_all(); }
if (m_taskQueue) delete m_taskQueue;
if (m_managerThread != nullptr) { if (m_managerThread->joinable()) m_managerThread->join(); delete m_managerThread; }
if (!m_workThreads.empty()) threadExit(); }
void* threadPool::worker(void* arg) { std::cout << std::this_thread::get_id() << " worker thread strating" << std::endl;
threadPool* pool = static_cast<threadPool*> (arg);
while (true) { std::unique_lock<std::mutex> poolLock(pool->m_mutexPool);
while (pool->m_taskQueue->getTaskNum() == 0 && pool->m_isShutdown == false) { std::cout << std::this_thread::get_id() << " waiting task" << std::endl;
pool->m_notEmpty.wait(poolLock);
if (pool->m_exitNum > 0 && pool->m_aliveNum > pool->m_minNum) { pool->m_exitNum--; pool->m_aliveNum--; std::cout << std::this_thread::get_id() << " thread end working" << std::endl; poolLock.unlock(); return nullptr; } }
if (pool->m_isShutdown) { poolLock.unlock(); std::cout << std::this_thread::get_id() << " thread end working" << std::endl; return nullptr; }
task oneTask = pool->m_taskQueue->takeTask();
poolLock.unlock();
pool->m_busyNum++; oneTask.fun(oneTask.args); delete oneTask.args; pool->m_busyNum--; } }
void* threadPool::manager(void* arg) { std::cout << std::this_thread::get_id() << " manager thread strating" << std::endl; threadPool* pool = static_cast<threadPool*> (arg);
while (pool->m_isShutdown == false) { std::this_thread::sleep_for(std::chrono::seconds(2)); std::unique_lock<std::mutex> poolLock(pool->m_mutexPool); int queueSize = pool->m_taskQueue->getTaskNum();
int sleepThreadNum = pool->m_aliveNum - pool->m_busyNum; if (pool->m_busyNum < queueSize) { while (sleepThreadNum > 0) { pool->m_notEmpty.notify_one(); sleepThreadNum--; } }
if (queueSize > pool->m_aliveNum && pool->m_aliveNum < pool->m_maxNum) { for (int count = 0; count < m_addThreadNum && pool->m_aliveNum < pool->m_maxNum; count++) { pool->m_workThreads.push_back(std::thread(worker, pool)); pool->m_aliveNum++; } } poolLock.unlock();
sleepThreadNum = pool->m_aliveNum - pool->m_busyNum; if (sleepThreadNum > pool->m_busyNum && pool->m_aliveNum > pool->m_minNum) { pool->m_exitNum = m_addThreadNum; for (int i = 0; i < m_addThreadNum; i++) pool->m_notEmpty.notify_one(); }
if (pool->m_workThreads.size() > pool->m_aliveNum) pool->threadExit(); } }
void threadPool::addTask(task newTask) { m_taskQueue->addTask(newTask); }
void threadPool::threadExit() { auto iter = m_workThreads.begin(); while (iter != m_workThreads.end()) { auto currentThreadID = (*iter).native_handle();
int kill = pthread_kill(currentThreadID, 0); if (kill == ESRCH) { if ((*iter).joinable()) (*iter).join(); iter = m_workThreads.erase(iter); std::cout << "thread" << currentThreadID << " has exited" << std::endl; } else { iter++; } } }
std::mutex mtxTask;
void testFun(void* arg) { int num = *(int*)arg;
std::lock_guard<std::mutex> lock(mtxTask); std::cout << "current thread id is" << std::this_thread::get_id() << num << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); }
int main() { threadPool pool(2, 10); for (int i = 0; i < 100; i++) { int* num = new int(i); pool.addTask(task(testFun, (void*)num)); }
std::this_thread::sleep_for(std::chrono::seconds(2)); return 0; }
|
一些问题:
Q:worker()函数中,应该把oneTask.fun(oneTask.args);放到poolLock.unlock();之前吗?
A:不应该,poolLock只保证各worker从taskQueue中取任务时不冲突、不会同一个任务取两次就行了,至于任务函数中的线程安全不应该由threadPool来保证,那是任务函数自己的事,要线程安全就自己在函数内部上锁或atomic(本来真实场景中的任务函数就肯定不一样,不会像这个demo中全用一样的函数,也没办法由threadPool来统一上锁,指不定各任务之间根本没有共享资源,本身就不需要上锁),由threadPool统一上锁的话很容易造成线程堵塞,其它线程只想领个任务做自己的事,结果还得等其它线程先完成任务后才能领取,这就很浪费,典型的锁粒度太大的反面教材
Q:为什么worker()和manager()都要设为static?
A:std::thread(worker, this)的函数原型为:thread(void *(& f)(void *arg), threadPool *&& args)
即传入的回调函数callback不应该带有参数,而类成员函数自带隐藏this指针参数,直接thread构造会报错,而static的类成员函数会消除隐藏this指针参数
Q:manager()中唤醒等待线程时应该用notify_one()还是notify_all()?
A:notify_one(),因为one是唤醒一个condition_variable.wait()中的线程,具体哪个由os的调度器来决定;all则是唤醒所有等待线程,但此处本来也只能有一个线程去领任务,唤醒了多个worker它们也还是会去竞争poolLock,造成不必要的开销。notify_all()应该在多个线程可以同时处理任务的场景下才使用
Q:threadExit()中的if (kill == ESRCH)是什么意思?
A:int kill = pthread_kill(currentThreadID, 0);用于给线程发送试探信号,返回值kill如果为0,表示找到了这个线程,即线程还在生命周期内、还在工作,就不回收;如果kill为ESRCH,表示没找到该线程,说明其生命周期已经结束,可以回收资源了(尽管线程自己会析构,但还有些相关资源可能需要人为处理,如m_workThreads.erase())
Q:manager()中销毁多余线程的思路是什么?
A:把m_exitNum设为一个正数,然后唤醒那些休息中的线程(即被condition_variable.wait()阻塞的线程),worker线程被唤醒后判断if (pool->m_exitNum > 0)、return、结束生命周期、等待被回收资源