—线程池—
任务短,线程需求多的情况下,频繁创建线程和销毁线程需要时间,这样就会大大降低系统的效率,故使用线程池来统一管理多个线程,能提高程序效率。**—- 实现代码贴在最后**
1 思路
1.任务队列,存储需要处理的任务,由工作的线程来处理这些任务
通过线程池提供的API函数,用于对任务的添加与删除
工人线程与管理者线程均需访问该任务队列来进行需求实现,故需加互斥锁
2.工作的线程(任务队列任务的消费者) ,N个
循环读任务队列, 从里边取出任务并处理,如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作
3.管理者线程 1个
它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
当任务过多的时候, 可以适当的创建一些新的工作线程
当任务过少的时候, 可以适当的销毁一些工作的线程
2. 示例图
3. 内存泄漏检测
最后使用valgrind工具对编写的线程池代码进行内存泄漏检测。
从检测结果可以发现 16 allocs, 16 frees 。 All heap blocks were freed,故未发现内存泄漏。
4. 编写代码过程中遇到过的问题
问题1:对于管理者线程,发现存在于线程池中的线程远大于任务数量时,会进行销毁线程的操作,也就是说需要动态实时的进行退出线程(pthread_exit())同时回收线程(pthread_join())的操作。注:退出线程和回收线程不能在同一线程当中
起初想的是 通过一个全局的数组来记录每个退出线程的ID,然后在析构函数中统一退出。然后发现这样做虽然不会导致最后程序运行结束后的内存泄漏,但是在运行过程中会使内存变大,没有及时的动态回收线程。
解决方案:使用一个全局变量(exthread)用来记录退出线程的ID,同时申请一把新锁(m_exmut)专门用来对该变量进行临界操作。同时需要实现退出线程->回收线程这个步骤的原子性,将exthread 变量 0 ,1,线程ID 作为 FLAG进行原子操作。成功实现动态实时回收退出线程。
问题2:工作线程数N的初始值需要根据什么条件或者影响因素来进行合理的设定?
默认8个 根据系统相关参数限制,以及是CPU还是IO密集型任务综合进行判断。这里是IO密集型任务
如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 Ncpu+1能够实现最优的CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因 导致暂停时,额外的这个线程就能顶上去,保证CPU 时钟周期不被浪费
如果是IO密集型任务,参考值可以设置为 2 * Ncpu。因为线程间竞争的不是CPU的计算资源而是IO,IO的处理一般较慢,多于cores数的线程将为CPU争取更多的任务,不至在线程处理IO的过程造成CPU空闲导致资源浪费
5 详细实现代码
task.h
#ifndef TASK_QUEUE_
#define TASK_QUEUE_
#include <pthread.h>
#include <queue>
#include <stack>
using namespace std;
using callback = void(*)(void*);
//定义任务结构体
class Task
{
public:
Task()
{
function = nullptr;
arg = nullptr;
}
Task(callback f, int* arg) ////////////
{
this->function = f;
this->arg = (int *)arg;
}
callback function; //任务函数
int* arg; //任务参数
};
class taskQueue
{
public:
taskQueue();
~taskQueue();
//添加任务
void addTask(Task &arg);
//取出任务
Task takeTask();
//获取当前任务个数
size_t taskCount();
private:
pthread_mutex_t mut; //对任务队列加锁
queue<Task> m_taskQ;
};
#endif // !1
————————————————————————————————————————————-
taskQueue.cpp
#include "task.h"
//构造函数
taskQueue::taskQueue()
{
pthread_mutex_init(&mut, NULL); //初始化锁
}
//析构
taskQueue::~taskQueue()
{
pthread_mutex_destroy(&mut); //释放锁
}
//添加任务
void taskQueue::addTask(Task& arg)
{
pthread_mutex_lock(&mut);
m_taskQ.push(arg);
pthread_mutex_unlock(&mut);
}
//取出任务
Task taskQueue::takeTask()
{
Task t;
if (m_taskQ.size() != 0) //双重判断,减少锁带来的开销
{
pthread_mutex_lock(&mut);
if (m_taskQ.size() > 0)
{
t = m_taskQ.front();
m_taskQ.pop();
}
pthread_mutex_unlock(&mut);
}
return t;
}
size_t taskQueue::taskCount()
{
return m_taskQ.size();
}
————————————————————————————————————————————-
threadpoolDemo.h
#ifndef THREADPOOLDEMO_H_
#define THREADPOOLDEMO_H_
#include"task.h"
//线程池
class threadpoolDemo
{
public:
//线程池初始化
threadpoolDemo(size_t minNum, size_t maxNum);
//添加任务
void addThreadTask(Task t );
// 获取线程池中工作的线程的个数
int threadPoolBusyNum();
// 获取线程池中活着的线程的个数
int threadPoolAliveNum();
//销毁线程池
~threadpoolDemo();
//退出单个线程
void exitThread();
private:
static void* worker(void* ptr);
static void* manager(void* ptr);
static void* reclaim(void* ptr);
private:
size_t m_minNum; //最小线程数
size_t m_maxNum; //最大线程数
size_t m_aliveNum; //存活线程数
size_t m_busyNum; //忙碌线程数
size_t m_exitNum; //退出线程数
static bool shutDown; //是否关闭线程池 true--关闭 false--未关闭
static bool shutReclaim; //是否关闭回收线程
pthread_t managerTid; //管理者tid
pthread_t* workerTid; //工作线程数
pthread_t reclaimTid; //用来回收线程的线程
pthread_mutex_t m_exmut; // 退出锁
pthread_mutex_t m_mut; //锁
pthread_cond_t m_condBytask; //条件变量
taskQueue* m_taskQueue; //任务队列
};
#endif // !1
————————————————————————————————————————————-
threadpoolDemo.cpp
#include "threadpoolDemo.h"
#include <iostream>
#include <string.h>
#include <unistd.h>
bool threadpoolDemo::shutDown = false;
bool threadpoolDemo::shutReclaim = false;
static pthread_t exthread=0;
//回收线程
void* threadpoolDemo::reclaim(void* ptr)
{
threadpoolDemo* obj = static_cast<threadpoolDemo*>(ptr);
while (!shutReclaim)
{
if (exthread != 0)
{
pthread_mutex_lock(&obj->m_exmut);
if( exthread != 0 )
{
cout << "reclaim id :" << exthread <<" is successful !" << endl;
pthread_join(exthread, NULL);
exthread = 0;
}
pthread_mutex_unlock(&obj->m_exmut);
}
}
pthread_exit(NULL);
}
//工人线程
void* threadpoolDemo::worker(void* ptr)
{
cout << "worker ID: " << pthread_self() <<endl;
threadpoolDemo* workerObj = static_cast<threadpoolDemo *>(ptr);
while (true)
{
pthread_mutex_lock(&workerObj->m_mut);
//如果队列是空的 就阻塞
while ( workerObj->m_taskQueue->taskCount() == 0 && !workerObj->shutDown)
{
cout << " taskQueue is empty , waiting..." << endl;
pthread_cond_wait(&workerObj->m_condBytask,&workerObj->m_mut);
//倘若有信号唤醒,看是否存在退出线程
if (workerObj->m_exitNum > 0)
{
workerObj->m_exitNum--;
if (workerObj->m_aliveNum > workerObj->m_minNum)
{
workerObj->m_aliveNum--;
while (true)
{
if (exthread == 0)
{
pthread_mutex_lock(&workerObj->m_exmut);
exthread = pthread_self();
pthread_mutex_unlock(&workerObj->m_exmut);
pthread_mutex_unlock(&workerObj->m_mut);
workerObj->exitThread();
}
}
}
}
}
//线程池关闭
if (workerObj->shutDown)
{
while(true)
{
if(exthread == 0 ) // 0,其他
{
pthread_mutex_lock(&workerObj->m_exmut);
if (exthread == 0)
{
exthread = pthread_self();
pthread_mutex_unlock(&workerObj->m_exmut);
pthread_mutex_unlock(&workerObj->m_mut);
workerObj->exitThread();
}
pthread_mutex_unlock(&workerObj->m_exmut);
}
}
}
//否则取出任务执行
Task task =workerObj->m_taskQueue->takeTask();
workerObj->m_busyNum++;
pthread_mutex_unlock(&workerObj->m_mut);
// 执行任务
//cout << "thread " << to_string(pthread_self()) << " start working..." << endl;
task.function(task.arg);
delete task.arg;
task.arg = nullptr;
//任务处理结束
//cout << " thread " << to_string(pthread_self()) << " end !" << endl;
pthread_mutex_lock(&workerObj->m_mut);
workerObj->m_busyNum--;
pthread_mutex_unlock(&workerObj->m_mut);
}
pthread_exit(NULL);
}
//管理者线程
void* threadpoolDemo::manager(void* ptr)
{
threadpoolDemo* managerObj = static_cast<threadpoolDemo*>(ptr);
while (!managerObj->shutDown) // -----------!managerObj->shutDown
{
sleep(5);
pthread_mutex_lock(&managerObj->m_mut);
size_t aliveNum = managerObj->m_aliveNum;
size_t busyNum = managerObj->m_busyNum;
size_t queueSize = managerObj->m_taskQueue->taskCount();
pthread_mutex_unlock(&managerObj->m_mut);
//算法
//添加线程 如果 queueSize > 2 * aliveNum && aliveNum < m_maxNum
const size_t NUMBER = 2;
if ( (queueSize > 2 * aliveNum) && (aliveNum < managerObj->m_maxNum) )
{
pthread_mutex_lock(&managerObj->m_mut);
//谨慎的循环添加线程
size_t num = 0;
for (int i = 0; i < managerObj->m_maxNum && num < NUMBER && managerObj->m_aliveNum < managerObj->m_maxNum; ++i)
{
if (managerObj->workerTid[i] == 0)
{
int err =pthread_create(&managerObj->workerTid[i], NULL, worker, managerObj);
if (err != 0)
{
cout << "create new thread failed ! " << endl;
pthread_mutex_unlock(&managerObj->m_mut);
return nullptr;
}
num++;
managerObj->m_aliveNum++;
}
}
pthread_mutex_unlock(&managerObj->m_mut);
}
//销毁线程 如果 queueSize * 2 < aliveNum && aliveNum > m_minNum
if ((busyNum * 2 < aliveNum) && (aliveNum > managerObj->m_minNum))
{
pthread_mutex_lock(&managerObj->m_mut);
managerObj->m_exitNum = NUMBER;
pthread_mutex_unlock(&managerObj->m_mut);
//依次唤醒线程并退出
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&managerObj->m_condBytask);
}
}
}
pthread_exit(NULL);
}
//线程池初始化
threadpoolDemo::threadpoolDemo(size_t minNum,size_t maxNum)
{
m_taskQueue = new taskQueue; //任务队列
if (m_taskQueue == NULL)
{
cout << "m_taskQueue new failed !" << endl;
return;
}
do
{
this->m_minNum = minNum;
this->m_maxNum = maxNum;
this->m_aliveNum = minNum;
this->m_busyNum = 0;
this->m_exitNum = 0;
//this->shutDown = false;
pthread_cond_init(&m_condBytask, NULL);
pthread_mutex_init(&m_mut, NULL);
pthread_mutex_init(&m_exmut, NULL);// 退出锁
//创建工人线程
workerTid = new pthread_t[maxNum];
if (workerTid == NULL)
{
cout << " new workerTid failed ! " << endl;
return;
}
//初始化数组
memset(workerTid, 0, sizeof(pthread_t) * maxNum);
for (int i = 0; i < minNum; ++i)
{
int err = pthread_create(&workerTid[i], NULL, worker, this);
if (err != 0)
{
cout << "workerTid pthread_create errno !" << endl;
return;
}
}
//创建管理线程
int err = pthread_create(&managerTid, NULL, manager, this);
if (err != 0)
{
cout << "managerTid pthread_create errno !" << endl;
return;
}
//创建一个专门用来回收的线程
int err1 = pthread_create(&reclaimTid, NULL, reclaim, this);
if (err1 != 0)
{
cout << "managerTid pthread_create errno !" << endl;
return;
}
} while (0);
}
int threadpoolDemo::threadPoolBusyNum()
{
int busyNum = 0;
pthread_mutex_lock(&m_mut);
busyNum = this->m_busyNum;
pthread_mutex_unlock(&m_mut);
return busyNum;
}
int threadpoolDemo::threadPoolAliveNum()
{
int aliveNum = 0;
pthread_mutex_lock(&m_mut);
aliveNum = this->m_aliveNum;
pthread_mutex_unlock(&m_mut);
return aliveNum;
}
//添加任务
void threadpoolDemo::addThreadTask(Task t)
{
if (shutDown)
return;
//添加任务 因为任务队列已加锁,无需重复加锁
m_taskQueue->addTask(t);
//有任务就唤醒
pthread_cond_signal(&m_condBytask);
}
//退出单个线程
void threadpoolDemo::exitThread()
{
pthread_t tid = pthread_self();
for (int i = 0; i < m_maxNum; ++i)
{
if (tid== workerTid[i])
{
workerTid[i] = 0;
break;
}
}
pthread_exit(NULL);
}
//销毁线程池
threadpoolDemo::~threadpoolDemo()
{
shutDown = true;
cout << "shutDwon =" << shutDown << endl;
//销毁管理者线程
pthread_join(managerTid, NULL);
//销毁工人线程
for (int i = 0; i < m_aliveNum; ++i)
{
pthread_cond_signal(&m_condBytask);
}
sleep(3); //及时销毁所有工作线程
shutReclaim = true;
pthread_join(reclaimTid,NULL);
if (workerTid) delete[] workerTid;
if (m_taskQueue) delete m_taskQueue;
pthread_mutex_destroy(&m_mut);
pthread_cond_destroy(&m_condBytask);
pthread_mutex_destroy(&m_exmut); // 退出锁
}
————————————————————————————————————————————-
测试案例(LEFT-RIGHT范围内找出质数)
main.cpp
#include <cstdio>
#include <iostream>
#include <pthread.h>
#include "threadpoolDemo.h"
#include <unistd.h>
using namespace std;
#define LEFT 30000000
#define RIGHT 30000200
void taskFunc(void* arg)
{
int i, j, mark;
for (i = LEFT; i <= RIGHT; i++)
{
mark = 1;
for (j = 2; j < i / 2; j++)
{
if (i % j == 0)
{
mark = 0;
break;
}
}
if (mark)
cout << i << " is a primer " << endl;
}
}
int main()
{
//// 创建线程池
threadpoolDemo* test = new threadpoolDemo(6, 10);
Task t(taskFunc,NULL);
test->addThreadTask(t);
sleep(3);
cout << "alive num : " << test->threadPoolAliveNum() << " and busyNum: " << test->threadPoolBusyNum() << endl;
delete test;
return 0;
}
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 351134995@qq.com