线程池学习与实现

前言

最近在实现一个高性能服务器的过程中,基于I/O多路复用和多线程,在实现上可以简单的为每一个请求创建一个新的线程去处理,但是在高并发的情况下,线程生命周期的开销非常高。每个线程都有自己的生命周期,创建和销毁线程所花费的时间和资源可能比处理客户端的任务花费的时间和资源更多,并且还会有某些空闲线程也会占用资源。同时,程序的稳定性和健壮性会下降,每个请求开一个线程。如果受到了恶意攻击或者请求过多(内存不足),程序很容易就奔溃掉了。因此考虑使用线程池。

线程池可以看做是线程的集合。在没有任务时线程处于空闲状态,当请求到来:线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务(而不是销毁)。这样就实现了线程的重用。线程池旨在降低创建和销毁线程的频率,使其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。

实现

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务,线程池任务都是后台线程。

本次实现的线程池只包含几个基础部分

  • 线程池管理器。用于创建和管理线程
  • 工作线程。线程池中的线程,用于完成任务
  • 任务队列。提供一种缓冲的机制,保存当前未被处理的任务
  • 添加任务接口。添加任务到线程池中

线程管理器

任务管理器部分主要做到一个初始化线程池,以及在结束的时候回收线程。

线程池初始化,此处使用到emplace_back来将创建的线程保存起来追踪,使用emplace_back可以避免创建不必要的临时线程。

emplace_back 和 push_back 的区别

在引入右值引用,转移构造函数,转移复制运算符之前,通常使用push_back()向容器中加入一个右值元素(临时对象)的时候,首先会调用构造函数构造这个临时对象,然后需要调用拷贝构造函数将这个临时对象放入容器中。原来的临时变量释放。这样造成的问题是临时变量申请的资源就浪费。
引入了右值引用,转移构造函数(请看这里)后,push_back()右值时就会调用构造函数和转移构造函数。在这上面有进一步优化的空间就是使用emplace_back()

1
2
3
4
5
6
7
8
9
10
11
12
template <typename T>
ThreadPool<T>::ThreadPool(int size) : stop(false)
{
if (size <= 0)
{
throw std::exception();
}
for (int i = 0; i < size; i++)
{
workers.emplace_back(work, this);
}
}

在析构的时候,主要是使用条件变量唤醒各个线程,然后调用join函数,回收线程

1
2
3
4
5
6
7
8
9
10
11
12
13
template <typename T>
ThreadPool<T>::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (auto &worker : workers)
{
worker.join();
}
}

工作线程

在线程初始化的时候,让每个线程执行work函数,该函数的核心主要为从任务队列中不断的取出任务并完成。主要涉及到条件变量互斥锁,在任务队列为空的时候线程进入睡眠,等待唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
template <typename T>
void *ThreadPool<T>::work(void *arg)
{
ThreadPool *pool = (ThreadPool *)arg;
pool->run();
return pool;
}

template <typename T>
void ThreadPool<T>::run()
{
while(true) {
T* task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return !this->tasks.empty() || this->stop; });
if (this->stop && this->tasks.empty())
{
return;
}
task = std::move(this->tasks.front());
this->tasks.pop();
}
// task();

// just for test
std::cout << "Thread " << std::this_thread::get_id() << " sleep" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Thread " << std::this_thread::get_id() << " ";
task->process();
}
}

添加任务接口

把任务添加到任务队列中,主要需要注意的是避免同时对队列进行操作,需要使用到互斥锁

1
2
3
4
5
6
7
8
9
template <typename T>
bool ThreadPool<T>::addTask(T *task)
{
queueMutex.lock();
tasks.push(task);
queueMutex.unlock();
condition.notify_one();
return true;
}

简单的测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include "threadpool.hpp"
#include <iostream>
using namespace std;
#define TASKSNUM 1000


class Task {

public:
void process() {
cout << "do something" << endl;
}

};


int main() {
ThreadPool<Task> pool(4);

vector<Task*> tasks;
for(int i = 0; i < TASKSNUM; i++) {
Task *task = new Task();
pool.addTask(task);
delete task;
}
return 0;
}

1

根据输出结果,线程池基本正确实现。

但是目前有一定的局限性,对于实现的任务目前不支持参数传递,接下来将实现改进的版本。

改进

如何传入函数并调用,初始简单的想使用函数的指针,但是搜索了下发现 C++ 11 似乎有一个更加合适的特性,那就是std::function

std::function包含于头文件 #include <functional>中,可将各种可调用实体进行封装统一

Class template std::function is a general-purpose polymorphic function wrapper. Instances of std::function can store, copy, and invoke any Callable target — functions, lambda expressions, bind expressions, or other function objects, as well as pointers to member functions and pointers to data members.
The stored callable object is called the target of std::function. If a std::function contains no target, it is called empty. Invoking the target of an empty std::function results in std::bad_function_call exception being thrown.
std::function satisfies the requirements of CopyConstructible and CopyAssignable.

本次改进主要基于std::functionstd::bind实现,具体实现比较简单,不再阐述,具体可以参考项目代码

run函数实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void ThreadPool::run()
{
while (true)
{
Task task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return !this->tasks.empty() || this->stop; });
if (this->stop && this->tasks.empty())
{
return;
}
task = std::move(this->tasks.front());
this->tasks.pop();
}

// task();

// just for test
std::cout << "Thread " << std::this_thread::get_id() << " sleep" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Thread " << std::this_thread::get_id() << " ";
task();
}
}

相关链接