跳转至

Thread Pool Cpp

项目来源:https://github.com/progschj/ThreadPool

项目简介:一个利用c++11标准,全平台通用的线程池简单实现

使用方法:作者将线程池封装为ThreadPool类,包含于头文件中;由于只是用c++11标准库内容,因此不需要链接任何第三方库

下面是对该项目的个人浅析:

线程池最基本的工作原理

本项目只包含最简单的多线程并发,不涉及异步计时器等内容

什么是多线程

我们知道,在我们运行C/C++程序(或称为一个 进程 )时,程序一行一行地执行我们写下的代码;这是一种顺序结构,或者说,这是一种线性结构

显然,这样的程序意味着:我们在同一时刻只能运行一条语句;下一条语句总是在上一条语句结束之后才能发生

有时我们又希望程序在同一时刻完成多个任务。比如说:对于一个高性能的网络服务器,他肯定不能一个一个地处理客户端的请求:这样第1M个客户端的请求该等到猴年马月才能被接收。。。

面对这种情况,一个有效的解决方式就是将服务器调整为 非线性结构 ;或者简单一点说,将服务器程序变成 多条线性的结构

对于多条线性的结构的程序——多线程的程序,我们可以想象每一个线程都有各自的起点和终点,他们共享同样的数据但是各自同时对数据进行独自的加工

对于C++来说,线程就像是一条生产线:当我们配置好生产线的设备,确定好生产线的任务,并提供给这条生产线他所需要的原材料之后,就可以让他开始工作了!

std::thread t(Fn&& fn, Args... args)这条语句就创建了一个线程的实例t: 他以参数fn为设备(任务),以(可变)参数args为原材料,生产出主线程想要的“产品”

为什么需要线程池

按照我们之前的思路,只要我们对于每一个需要并行处理的任务,都为他创建一个线程去处理不就好了吗?为什么非得要把几个线程放到一起处理所有的请求呢?

首先,创建和销毁线程的实例需要耗费大量的CPU时间

试想一下,倘使我们程序的大部分时间都放在了创建和销毁线程的事情上,那么还会有什么时间办正事呢?

同时,当线程过多时,CPU在不同线程之间来回切换的时间也就不可忽略了;这同样造成了资源的浪费

其次,线程池有利于更好地进行任务调度

什么是任务调度呢?我们浅显理解一下:对于每一个需要并行处理的任务,可能会有 优先级 的区别;我们当然需要优先处理优先级更高的任务。这种需求线程池就可以轻松办到

还有,线程池可以让你的程序具有良好的鲁棒性

通过封装线程池,我们可以写出RAII的代码,从而减少出现 创建了线程却没有销毁的内存泄露 问题

单挑以上任何一点,都足以成为我们学习线程池的动机

既然如此,让我们开始学习C++的线程池吧!

线程池的简单实现

开源项目 https://github.com/progschj/ThreadPool 利用C++11写出了一个精巧易懂的线程池实现,并具有良好的扩展性,成为我们学习的基础

源码如下(thread_pool.h):

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>

这是引用的头文件,其中condition_variable(条件变量)、future(异步地获取运行结果)、mutex(为了访问共享内存而设置的锁🔒)、thread(包含线程类的文件)

class ThreadPool {
public:
  ThreadPool(size_t);
  template <class F, class... Args>
  auto enqueue(F &&f, Args &&...args)
      -> std::future<typename std::result_of<F(Args...)>::type>;
  ~ThreadPool();

private:
  // need to keep track of threads so we can join them
  std::vector<std::thread> workers;
  // the task queue
  std::queue<std::function<void()>> tasks;

  // synchronization
  std::mutex queue_mutex;
  std::condition_variable condition;
  bool stop;
};

这里定义了线程池的结构:它拥有 一组线程 ,一个 任务队列 ,一个 互斥锁 ,一个“唤醒线程”的 条件变量 和条件2

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
  for (size_t i = 0; i < threads; ++i)
    workers.emplace_back([this] {
      for (;;) {
        std::function<void()> task;

        {
          std::unique_lock<std::mutex> lock(this->queue_mutex);
          this->condition.wait(
              lock, [this] { return this->stop || !this->tasks.empty(); });
          if (this->stop && this->tasks.empty())
            return;
          task = std::move(this->tasks.front());
          this->tasks.pop();
        }

        task();
      }
    });
}

这是线程池的构造函数,调用它我们就可以创建一个线程池的实例

通过传入的参数threads,我们可以限制线程的总数;在互斥锁的保护下,某条线程从任务队列中取出任务,并执行之

// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&...args)
    -> std::future<typename std::result_of<F(Args...)>::type> {
  using return_type = typename std::result_of<F(Args...)>::type;

  auto task = std::make_shared<std::packaged_task<return_type()>>(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...));

  std::future<return_type> res = task->get_future();
  {
    std::unique_lock<std::mutex> lock(queue_mutex);

    // don't allow enqueueing after stopping the pool
    if (stop)
      throw std::runtime_error("enqueue on stopped ThreadPool");

    tasks.emplace([task]() { (*task)(); });
  }
  condition.notify_one();
  return res;
}

这段代码展示了如何向线程池的任务队列中添加一个新的任务:通过std::packaged_task、std::future、std::forward等晦涩的语法,将任务对象放到队列中等待执行;并通过条件变量通知其他线程“有任务来了!!!”

此时,其他线程终于结束了等待(condition.wait方法),开始工作

// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
  {
    std::unique_lock<std::mutex> lock(queue_mutex);
    stop = true;
  }
  condition.notify_all();
  for (std::thread &worker : workers)
    worker.join();
}

#endif

线程池的销毁过程:依据RAII思想,Resource Acquisition Is Initialization,当对象的生命周期结束后自动销毁该对象的内存空间

体现在这里就是将所有子线程join进主线程

至此,一个线程池的基本功能就全部完成了

如何使用

项目中附赠了一个简单的示例代码,帮助人们使用线程池

#include <chrono>
#include <iostream>
#include <vector>

#include "thread_pool.h"

int main() {

  ThreadPool pool(4);
  std::vector<std::future<int>> results;

  for (int i = 0; i < 8; ++i) {
    results.emplace_back(pool.enqueue([i] {
      std::cout << "hello " << i << std::endl;
      std::this_thread::sleep_for(std::chrono::seconds(1));
      std::cout << "world " << i << std::endl;
      return i * i;
    }));
  }

  for (auto &&result : results)
    std::cout << result.get() << ' ';
  std::cout << std::endl;

  return 0;
}

在这份示例代码中,作者创建了一个拥有四个线程的线程池,并且向任务队列中拢共放了8个任务

后记

  • 我们分配多少线程最合适?一般来说,为了充分利用CPU的并发能力,我们线程数应该和CPU核数相当;这样每个CPU核可以单独处理一件任务。(电脑配置说的几核几线程到底是什么意思?谢谢? - 知乎
  • 如何控制任务的优先级?通过阅读代码我们可以知道,我们使用队列来存储所有任务;那么,如果我们可以定义人物的优先级,那么就可以使用优先队列来进行优化。事实上,许多算法可以通过优先队列进行优化
  • 互斥量(mutex)是什么?首先我们知道,所有线程共享进程中的数据,因此在并行过程中,可能出现两个线程同时访问同一个内存地址;若两个内存都只进行读操作还好,可当某个线程进行写操作时,无疑影响了另一个线程的结果。通过加锁,我们可以保证每个时刻至多有一个线程对数据进行操作