1. 源码

cyber/base/thread_pool.h中实现了一个线程池的类,写得很高级,我们来分析一下,代码如下:

class ThreadPool {
public:
explicit ThreadPool(std::size_t thread_num, std::size_t max_task_num = 1000);

template <typename F, typename... Args>
auto Enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;

~ThreadPool();

private:
std::vector<std::thread> workers_;
BoundedQueue<std::function<void()>> task_queue_;
std::atomic_bool stop_;
};

/*构造函数入参为 线程数量和最大任务数量*/
inline ThreadPool::ThreadPool(std::size_t threads, std::size_t max_task_num)
: stop_(false) {
/*创建一个BoundedQueue,采用的等待策略是阻塞策略*/
if (!task_queue_.Init(max_task_num, new BlockWaitStrategy())) {
throw std::runtime_error("Task queue init failed.");
}

/* 初始化线程池 创建空的任务,每个任务的逻辑就是 */
workers_.reserve(threads);
for (size_t i = 0; i < threads; ++i) {
workers_.emplace_back([this] {
while (!stop_) {
/*返回值为空的可调用对象*/
std::function<void()> task;
if (task_queue_.WaitDequeue(&task)) {
task();
}
}
});
}
}

// before using the return value, you should check value.valid()
template <typename F, typename... Args>
auto ThreadPool::Enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));

std::future<return_type> res = task->get_future();

// don't allow enqueueing after stopping the pool
if (stop_) {
return std::future<return_type>();
}
task_queue_.Enqueue([task]() { (*task)(); });
return res;
};

// the destructor joins all threads
/* 唤醒线程池里所有线程,然后等待所有子线程执行完毕,释放资源*/
inline ThreadPool::~ThreadPool() {
if (stop_.exchange(true)) {
return;
}
task_queue_.BreakAllWait();
for (std::thread& worker : workers_) {
worker.join();
}
}

2. 线程池的实现

线程池是一种多线程处理形式,它处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。

首先先来看一下线程池的模型:

image-20231129165547298

线程池中有两个队列,一个工作队列,这就是线程池维护的线程队列,另一个是任务队列,工作队列中的线程去任务队列中领取任务从而执行,空闲的线程呢则等待直到任务队列有任务可获取。

基于此我们来看ThreadPool类中定义的成员变量:

std::vector<std::thread> workers_;
BoundedQueue<std::function<void()>> task_queue_;
std::atomic_bool stop_;
  • 工作队列的数据结构是vector类型,就是一个数组,数组中的每一个元素都是一个std::thread
  • 任务队列的数据结构是前面实现的 BoundedQueue BoundedQueue中的每一个元素都是c++中的一个可调用对象:std::function<void()>,是一个返回值为空,参数列表也为空的可调用对象
  • 还有一个原子变量用于判断线程池是否还在工作

然后来看线程池的构造函数:传入的参数为线程的数量和所能创建的最大的线程数量,将stop_标志位置为false

inline ThreadPool::ThreadPool(std::size_t threads, std::size_t max_task_num)
: stop_(false) {
/*创建一个BoundedQueue,采用的等待策略是阻塞策略*/
if (!task_queue_.Init(max_task_num, new BlockWaitStrategy())) {
throw std::runtime_error("Task queue init failed.");
}

/* 初始化线程池 创建空的任务,每个任务都是一个while循环 */
workers_.reserve(threads);
for (size_t i = 0; i < threads; ++i) {
workers_.emplace_back([this] {
while (!stop_) {
/*返回值为空的可调用对象*/
std::function<void()> task;
if (task_queue_.WaitDequeue(&task)) {
/*如果出队成功,说明领取到了任务,则就去执行此任务*/
task();
}
}
});
}
}
  • 首先是初始化任务队列task_queue_,并且采用BlockWaitStrategy的线程同步方式,在这种方式下,当队列中没有数据的时候,去队列取数据的线程会被阻塞,直到有线程向队列中添加元素,此时就会通过信号量机制通知阻塞的线程可以继续执行了

  • 然后是初始化工作队列,由于workers_数组中的每一个元素都是一个std::function<void()>,所以在初始化的时候是通过lamda表达式来创建的,使用 [this] 捕获列表是为了能够在 lambda 函数中访问当前类的成员变量 stop_ 和任务队列 task_queue_。如果没有使用 [this],那么在 lambda 函数中就不能直接访问当前类的成员,因为 lambda 默认是不捕获任何外部变量的。

    workers_.emplace_back([this] {
    while (!stop_) {
    /*返回值为空的可调用对象*/
    std::function<void()> task;
    if (task_queue_.WaitDequeue(&task)) {
    /*如果出队成功,说明领取到了任务,则就去执行此任务*/
    task();
    }
    }
    });
    }

    当执行emplace_back操作时此时就会创建线程,然后线程一旦创建就会开始执行,执行的操作就是上面这个while循环里代码,此时每个线程都会去执行这一步task_queue_.WaitDequeue(&task)操作,但是由于任务队列中还未放入任务,所以此时创建的所有线程都会被阻塞。

然后来看关键的向任务队列中添加任务的函数:

// before using the return value, you should check value.valid()
template <typename F, typename... Args>
auto ThreadPool::Enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));

std::future<return_type> res = task->get_future();

// don't allow enqueueing after stopping the pool
if (stop_) {
return std::future<return_type>();
}
task_queue_.Enqueue([task]() { (*task)(); });
return res;
};

这个函数有两个模板参数,第一个模板函数代表一个可调用对象,这个可调用对象就可以理解成我们一个任务,我们在这个函数里会将这个可调用对象封装然后进行入队,第二个模板参数是模板参数包,用于给这个可调用对象传参。这两个模板参数都是以&&万能引用的方式传入的

函数的返回值类型是std::future<T>类型的,而这个T是什么类型呢:std::result_of<F(Args...)>::type

通过c++11提供的std::result_of推导出了调用F(Args)后的返回值类型。

拿到这个return_type

using return_type = typename std::result_of<F(Args...)>::type;

然后是对这个传入的可调用对象的封装,通过封装使得任务队列中的每个元素都是统一类型,这样工作队列去面对的就是统一的任务类型了。

auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  • 首先task是一个智能指针,通过std::make_shared创建

  • 指针的模板类型是std::packaged_task<return_type()>std::packaged_task可以用来封装任何可以调用的目标,从而用于实现异步的调用。return_type()std::packaged_task的模板参数,代表封装的是一个return_type()类型的可调用对象,这不就是我们传入的这个F(Args)吗,区别在于没有参数而已,这时候就轮到std::bind出场了

  • 假设不用智能指针封装,那么代码该这么写:

    std::packaged_task<return_type()> task(std::bind(std::forward<F>(f), std::forward<Args>(args)...))
  • std::bind起到的作用就是将函数和参数打包生成一个可调用对象,这个可调用对象不用传参了,c++去调用的时候函数的参数已经被打包到bind内部,这不就刚好满足我们上面的需求吗,经过上面一系列操作,我们的任务队列中存储的就是一个个指向具体任务的指针了。注意这里使用std::forward是为了对传进来的模板参数进行完美转发

接着创建了一个future对象:

std::future<return_type> res = task->get_future();

这个future对象的模板参数是return_type,用于和上面创建的任务队列里的std::packaged_task<return_type()>可调用对象的返回值相绑定,后续当任务执行完毕我们就可以通过res.get()来异步的获取任务执行后的返回值

后面就是入队操作了:再次利用std::function,将task指向的std::packaged_task对象取出并包装为void函数。

task_queue_.Enqueue([task]() { (*task)(); });

传入的是一个lamda表达式,捕获task,表达式内部就是去执行这个可调用对象,通过(*task)()

在任务入队之后,还记得在之前实现BoundedQueue时会去执行NotifyOne操作,这样就会唤醒阻塞在队列上的一个工作者线程去拿到传入的这个std::function<void()>去执行。

image-20231129210355863

最后来看析构函数:

inline ThreadPool::~ThreadPool() {
if (stop_.exchange(true)) {
return;
}
task_queue_.BreakAllWait();
for (std::thread& worker : workers_) {
worker.join();
}
}
  • stop_标志位设置为true
  • 调用 task_queue_.BreakAllWait();来唤醒所有的线程,等待所有线程执行完毕后销毁资源

这个线程池的设计涉及到c++11很多知识点:

  • 左值与右值引用

  • 引用折叠与完美转发

  • std::thread

  • std::future

  • std::bind

  • std::function

  • std::packaged_task

参考链接