CyberRt源码分析-线程池
1. 源码
在cyber/base/thread_pool.h
中实现了一个线程池的类,写得很高级,我们来分析一下,代码如下:
class ThreadPool { |
2. 线程池的实现
线程池是一种多线程处理形式,它处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
首先先来看一下线程池的模型:
线程池中有两个队列,一个工作队列,这就是线程池维护的线程队列,另一个是任务队列,工作队列中的线程去任务队列中领取任务从而执行,空闲的线程呢则等待直到任务队列有任务可获取。
基于此我们来看ThreadPool
类中定义的成员变量:
std::vector<std::thread> workers_; |
- 工作队列的数据结构是
vector
类型,就是一个数组,数组中的每一个元素都是一个std::thread
- 任务队列的数据结构是前面实现的
BoundedQueue
,BoundedQueue
中的每一个元素都是c++中的一个可调用对象:std::function<void()>
,是一个返回值为空,参数列表也为空的可调用对象 - 还有一个原子变量用于判断线程池是否还在工作
然后来看线程池的构造函数:传入的参数为线程的数量和所能创建的最大的线程数量,将stop_
标志位置为false
inline ThreadPool::ThreadPool(std::size_t threads, std::size_t max_task_num) |
首先是初始化任务队列
task_queue_
,并且采用BlockWaitStrategy
的线程同步方式,在这种方式下,当队列中没有数据的时候,去队列取数据的线程会被阻塞,直到有线程向队列中添加元素,此时就会通过信号量机制通知阻塞的线程可以继续执行了然后是初始化工作队列,由于
workers_
数组中的每一个元素都是一个std::function<void()>
,所以在初始化的时候是通过lamda
表达式来创建的,使用[this]
捕获列表是为了能够在lambda
函数中访问当前类的成员变量stop_
和任务队列task_queue_
。如果没有使用[this]
,那么在lambda
函数中就不能直接访问当前类的成员,因为lambda
默认是不捕获任何外部变量的。workers_.emplace_back([this] {
while (!stop_) {
/*返回值为空的可调用对象*/
std::function<void()> task;
if (task_queue_.WaitDequeue(&task)) {
/*如果出队成功,说明领取到了任务,则就去执行此任务*/
task();
}
}
});
}当执行
emplace_back
操作时此时就会创建线程,然后线程一旦创建就会开始执行,执行的操作就是上面这个while
循环里代码,此时每个线程都会去执行这一步task_queue_.WaitDequeue(&task)
操作,但是由于任务队列中还未放入任务,所以此时创建的所有线程都会被阻塞。
然后来看关键的向任务队列中添加任务的函数:
// before using the return value, you should check value.valid() |
这个函数有两个模板参数,第一个模板函数代表一个可调用对象,这个可调用对象就可以理解成我们一个任务,我们在这个函数里会将这个可调用对象封装然后进行入队,第二个模板参数是模板参数包,用于给这个可调用对象传参。这两个模板参数都是以&&
万能引用的方式传入的
函数的返回值类型是std::future<T>
类型的,而这个T
是什么类型呢:std::result_of<F(Args...)>::type
通过c++11提供的std::result_of
推导出了调用F(Args)
后的返回值类型。
拿到这个return_type
using return_type = typename std::result_of<F(Args...)>::type; |
然后是对这个传入的可调用对象的封装,通过封装使得任务队列中的每个元素都是统一类型,这样工作队列去面对的就是统一的任务类型了。
auto task = std::make_shared<std::packaged_task<return_type()>>( |
首先
task
是一个智能指针,通过std::make_shared
创建指针的模板类型是
std::packaged_task<return_type()>
,std::packaged_task
可以用来封装任何可以调用的目标,从而用于实现异步的调用。return_type()
是std::packaged_task
的模板参数,代表封装的是一个return_type()
类型的可调用对象,这不就是我们传入的这个F(Args)
吗,区别在于没有参数而已,这时候就轮到std::bind
出场了假设不用智能指针封装,那么代码该这么写:
std::packaged_task<return_type()> task(std::bind(std::forward<F>(f), std::forward<Args>(args)...))
std::bind
起到的作用就是将函数和参数打包生成一个可调用对象,这个可调用对象不用传参了,c++去调用的时候函数的参数已经被打包到bind
内部,这不就刚好满足我们上面的需求吗,经过上面一系列操作,我们的任务队列中存储的就是一个个指向具体任务的指针了。注意这里使用std::forward
是为了对传进来的模板参数进行完美转发
接着创建了一个future
对象:
std::future<return_type> res = task->get_future(); |
这个future
对象的模板参数是return_type
,用于和上面创建的任务队列里的std::packaged_task<return_type()>
可调用对象的返回值相绑定,后续当任务执行完毕我们就可以通过res.get()
来异步的获取任务执行后的返回值
后面就是入队操作了:再次利用std::function
,将task
指向的std::packaged_task
对象取出并包装为void函数。
task_queue_.Enqueue([task]() { (*task)(); }); |
传入的是一个lamda
表达式,捕获task
,表达式内部就是去执行这个可调用对象,通过(*task)()
在任务入队之后,还记得在之前实现BoundedQueue
时会去执行NotifyOne
操作,这样就会唤醒阻塞在队列上的一个工作者线程去拿到传入的这个std::function<void()>
去执行。
最后来看析构函数:
inline ThreadPool::~ThreadPool() { |
- 将
stop_
标志位设置为true
- 调用
task_queue_.BreakAllWait();
来唤醒所有的线程,等待所有线程执行完毕后销毁资源
这个线程池的设计涉及到c++11很多知识点:
左值与右值引用
引用折叠与完美转发
std::thread
std::future
std::bind
std::function
std::packaged_task