服务粉丝

我们一直在努力
当前位置:首页 > 财经 >

使用 STL 实现一个 C++ 线程池

日期: 来源:字节流动收集编辑:

C++ 线程池

参考链接:基于C++11实现线程池的工作原理;c++简单线程池实现https://www.cnblogs.com/ailumiyana/p/10016965.html  https://www.cnblogs.com/yangang92/p/5485868.html

基础概念

线程池: 当进行并行的任务作业操作时,线程的建立与销毁的开销是,阻碍性能进步的关键,因此线程池,由此产生。使用多个线程,无限制循环等待队列,进行计算和操作。帮助快速降低和减少性能损耗。

线程池的组成

  1. 线程池管理器:初始化和创建线程,启动和停止线程,调配任务;管理线程池

  2. 工作线程:线程池中等待并执行分配的任务

  3. 任务接口:添加任务的接口,以提供工作线程调度任务的执行。

  4. 任务队列:用于存放没有处理的任务,提供一种缓冲机制,同时具有调度功能,高优先级的任务放在队列前面

线程池工作的四种情况

  • 1. 没有任务要执行,缓冲队列为空

空队列情况
  • 2. 队列中任务数量,小于等于线程池中线程任务数量

任务数量小于线程数量
  • 3. 任务数量大于线程池数量,缓冲队列未满

任务数量大于线程池数量
  • 4. 任务数量大于线程池数量,缓冲队列已满

缓冲队列已满

线程池的C++实现

参考链接:Thread pool; ThreadPool;

线程池结构

线程池的主要组成有上面三个部分:

  • 任务队列(Task Quene)

  • 线程池(Thread Pool)

  • 完成队列(Completed Tasks)

队列

我们使用队列来存储工作,因为它是更合理的数据结构。我们希望以与发送它相同的顺序启动工作。但是,这个队列有点特殊。正如我在上一节中所说的,线程是连续的(好吧,不是真的,但我们假设它们是)查询队列要求工作。

当有可用的工作时,线程从队列中获取工作并执行它。如果两个线程试图同时执行相同的工作会发生什么?好吧,程序会崩溃。

为了避免这种问题,我在标准C ++ Queue上实现了一个包装器,它使用mutex来限制并发访问。让我们看一下SafeQueue类的一小部分示例:

void enqueue(T& t) {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_queue.push(t);
}

要排队我们做的第一件事就是锁定互斥锁以确保没有其他人正在访问该资源。然后,我们将元素推送到队列中。当锁超出范围时,它会自动释放。好吗,对吧?

这样,我们使Queue线程安全,因此我们不必担心许多线程在相同的“时间”访问和/或修改它。

提交函数

线程池最重要的方法是负责向队列添加工作的方法。我打电话给这个方法提交。不难理解它是如何工作的,但它的实现起初可能看起来很吓人。让我们考虑应该做什么,之后我们会担心如何做到这一点。什么:

  • 接受任何参数的任何函数。

  • 立即返回“东西”以避免阻塞主线程。此返回的对象最终应包含操作的结果。完整的提交函数如下所示:

// Submit a function to be executed asynchronously by the pool template<typename F, typename...Args>

auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
    // Create a function with bounded parameters ready to execute

    std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
    // Encapsulate it into a shared ptr in order to be able to copy construct // assign 

    auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
    // Wrap packaged task into void function

    std::function<void()> wrapper_func = [task_ptr]() {
      (*task_ptr)(); 
    };

    // Enqueue generic wrapper function

    m_queue.enqueue(wrapperfunc);
    // Wake up one thread if its waiting

    m_conditional_lock.notify_one();
    // Return future from promise

    return task_ptr->get_future();
}

队列完整源代码

// SafeQueue.h

#pragma once

#include <mutex>

#include <queue>

// Thread safe implementation of a Queue using a std::queue

template <typename T>
class SafeQueue {
private:
  std::queue<T> m_queue; //利用模板函数构造队列

  std::mutex m_mutex;//访问互斥信号量

public:
  SafeQueue() { //空构造函数


  }

  SafeQueue(SafeQueue& other) {//拷贝构造函数

    //TODO:
  }

  ~SafeQueue() { //析构函数

  }


  bool empty() {  //队列是否为空

    std::unique_lock<std::mutex> lock(m_mutex); //互斥信号变量加锁,防止m_queue被改变

    return m_queue.empty();
  }

  int size() {
    std::unique_lock<std::mutex> lock(m_mutex); //互斥信号变量加锁,防止m_queue被改变

    return m_queue.size();
  }
//队列添加元素

  void enqueue(T& t) {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_queue.push(t);
  }
//队列取出元素

  bool dequeue(T& t) {
    std::unique_lock<std::mutex> lock(m_mutex); //队列加锁

    if (m_queue.empty()) {
      return false;
    }
    t = std::move(m_queue.front()); //取出队首元素,返回队首元素值,并进行右值引用

    m_queue.pop(); //弹出入队的第一个元素

    return true;
  }
};

线程池完整代码

参考链接: std::bind;std::forward;std::packaged_task

//ThreadPool.h

#pragma once

#include <functional>

#include <future>

#include <mutex>

#include <queue>

#include <thread>

#include <utility>

#include <vector>

#include "SafeQueue.h"

class ThreadPool {
private:
  class ThreadWorker {//内置线程工作类

  private:
    int m_id; //工作id

    ThreadPool * m_pool;//所属线程池

  public:
    //构造函数

    ThreadWorker(ThreadPool * pool, const int id) 
      : m_pool(pool), m_id(id) {
    }
    //重载`()`操作

    void operator()() {
      std::function<void()> func; //定义基础函数类func

      bool dequeued; //是否正在取出队列中元素

      //判断线程池是否关闭,没有关闭,循环提取

      while (!m_pool->m_shutdown) {
        {
          //为线程环境锁加锁,互访问工作线程的休眠和唤醒

          std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
          //如果任务队列为空,阻塞当前线程

          if (m_pool->m_queue.empty()) {
            m_pool->m_conditional_lock.wait(lock); //等待条件变量通知,开启线程

          }
          //取出任务队列中的元素

          dequeued = m_pool->m_queue.dequeue(func);
        }
        //如果成功取出,执行工作函数

        if (dequeued) {
          func();
        }
      }
    }
  };

  bool m_shutdown; //线程池是否关闭

  SafeQueue<std::function<void()>> m_queue;//执行函数安全队列,即任务队列

  std::vector<std::thread> m_threads; //工作线程队列

  std::mutex m_conditional_mutex;//线程休眠锁互斥变量

  std::condition_variable m_conditional_lock; //线程环境锁,让线程可以处于休眠或者唤醒状态

public:
    //线程池构造函数

  ThreadPool(const int n_threads)
    : m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false) {
  }

  ThreadPool(const ThreadPool &) = delete; //拷贝构造函数,并且取消默认父类构造函数

  ThreadPool(ThreadPool &&) = delete; // 拷贝构造函数,允许右值引用

  ThreadPool & operator=(const ThreadPool &) = delete; // 赋值操作

  ThreadPool & operator=(ThreadPool &&) = delete; //赋值操作

  // Inits thread pool

  void init() {
    for (int i = 0; i < m_threads.size(); ++i) {
      m_threads[i] = std::thread(ThreadWorker(this, i));//分配工作线程

    }
  }

  // Waits until threads finish their current task and shutdowns the pool

  void shutdown() {
    m_shutdown = true;
    m_conditional_lock.notify_all(); //通知 唤醒所有工作线程

    for (int i = 0; i < m_threads.size(); ++i) {
      if(m_threads[i].joinable()) { //判断线程是否正在等待

        m_threads[i].join();  //将线程加入等待队列

      }
    }
  }

  // Submit a function to be executed asynchronously by the pool
  //线程的主要工作函数,使用了后置返回类型,自动判断函数返回值

  template<typename F, typename...Args>
  auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
    // Create a function with bounded parameters ready to execute
    // 

    std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);//连接函数和参数定义,特殊函数类型,避免左、右值错误

    // Encapsulate it into a shared ptr in order to be able to copy construct // assign 
    //封装获取任务对象,方便另外一个线程查看结果

    auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

    // Wrap packaged task into void function
    //利用正则表达式,返回一个函数对象

    std::function<void()> wrapper_func = [task_ptr]() {
      (*task_ptr)(); 
    };

    // 队列通用安全封包函数,并压入安全队列

    m_queue.enqueue(wrapper_func);

    // 唤醒一个等待中的线程

    m_conditional_lock.notify_one();

    // 返回先前注册的任务指针

    return task_ptr->get_future();
  }
};

使用样例代码 参考连接: std::random_device;std::mt19937;std::uniform_int_distribution;;

#include <iostream>

#include <random>

#include "../include/ThreadPool.h"

std::random_device rd; //真实随机数产生器

std::mt19937 mt(rd()); //生成计算随机数mt;

std::uniform_int_distribution<int> dist(-1000, 1000);//生成-1000到1000之间的离散均匀分部数

auto rnd = std::bind(dist, mt);

//设置线程睡眠时间

void simulate_hard_computation() {
  std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}

// 添加两个数字的简单函数并打印结果

void multiply(const int a, const int b) {
  simulate_hard_computation();
  const int res = a * b;
  std::cout << a << " * " << b << " = " << res << std::endl;
}

//添加并输出结果

void multiply_output(int & out, const int a, const int b) {
  simulate_hard_computation();
  out = a * b;
  std::cout << a << " * " << b << " = " << out << std::endl;
}

// 结果返回

int multiply_return(const int a, const int b) {
  simulate_hard_computation();
  const int res = a * b;
  std::cout << a << " * " << b << " = " << res << std::endl;
  return res;
}


void example() {
  // 创建3个线程的线程池

  ThreadPool pool(3);

  // 初始化线程池

  pool.init();

  // 提交乘法操作,总共30个

  for (int i = 1; i < 3; ++i) {
    for (int j = 1; j < 10; ++j) {
      pool.submit(multiply, i, j);
    }
  }

  // 使用ref传递的输出参数提交函数

  int output_ref;
  auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);

  // 等待乘法输出完成

  future1.get();
  std::cout << "Last operation result is equals to " << output_ref << std::endl;

  // 使用return参数提交函数

  auto future2 = pool.submit(multiply_return, 5, 3);

  // 等待乘法输出完成

  int res = future2.get();
  std::cout << "Last operation result is equals to " << res << std::endl;

  //关闭线程池
  pool.shutdown();
}

原文链接: https://wangpengcheng.github.io/2019/05/17/cplusplus_theadpool/

推荐:

面试常问的 C/C++ 问题,你能答上来几个?

C++ 面试必问:深入理解虚函数表

很多人搞不清 C++ 中的 delete 和 delete[ ] 的区别

看懂别人的代码,总得懂点 C++ lambda 表达式吧

Java、C++ 内存模型都不知道,还敢说自己是高级工程师?

C++ std::thread 必须要熟悉的几个知识点

现代 C++ 并发编程基础

现代 C++ 智能指针使用入门

c++ thread join 和 detach 到底有什么区别?

C++ 面试八股文:list、vector、deque 比较

C++经典面试题(最全,面中率最高)

C++ STL deque 容器底层实现原理(深度剖析)

STL vector push_back 和 emplace_back 区别

了解 C++ 多态与虚函数表

C++ 面试被问到的“左值引用和右值引用”

-- END --


进技术交流群,扫码添加我的微信:Byte-Flow



获取相关资料和源码


相关阅读

  • 浅析Pe2shellcode

  • 编者注:本文仅供学习研究,严禁从事非法活动,任何后果由使用者本人负责。前言众所周知,对shellcode免杀是很流行的技术,但是直接对exe的免杀方法相对稀缺,如果我们能将exe转为shell
  • 弈 - Codeql 自动运行和项目监控工具

  • 前言代码审计总是离不开一些神器,笔者常用 Codeql[1] 这款工具辅助挖洞。当我每写一个规则都需要对其它项目手动运行检查一遍,效率很低,再加上 lgtm[2] 的关闭,此项目诞生了 ---
  • ECMAScript Async Context 提案介绍

  • 背景由阿里巴巴 TC39 代表主导的Async Context 提案[1] 刚在 2023年 2 月初的 TC39 会议中成为了 TC39 Stage 1 提案。提案的目标是定义在 JavaScript 的异步任务中传递数据
  • constexpr

  • 前面介绍了模板这种编译期动作,关于编译期动作,有必要介绍下constexpr。在这之前有必要简单提一下constexpr与const的关系,两者字面上都表达常量的意思。主要的区别是:const修饰
  • 阿里云消息队列 Kafka 生态集成的实践与探索

  • 消息队列 Kafka 简介AliwareApache Kafka是一个分布式流平台,作为互联网领域不可或缺的消息组件,在全球获得了广泛的应用。在使用过程中,Kafka一般被作为消息流转的核心枢纽,上
  • 万字长文教你如何做出 ChatGPT

  • 作者:monychen,腾讯 IEG 应用研究员简单来说,ChatGPT 是自然语言处理(NLP)和强化学习(RL)的一次成功结合,考虑到读者可能只熟悉其中一个方向或者两个方向都不太熟悉,本文会将 ChatGPT
  • IcedID僵尸网络滥用谷歌 PPC服务分发恶意软件

  • 关键词IcedID僵尸网络、谷歌 PPC、恶意软件1. 概述在密切跟踪 IcedID 僵尸网络的活动后,趋势科技的研究人员发现其分发方法发生了一些重大变化。自 2022 年 12 月以来, 趋势科

热门文章

  • “复活”半年后 京东拍拍二手杀入公益事业

  • 京东拍拍二手“复活”半年后,杀入公益事业,试图让企业捐的赠品、家庭闲置品变成实实在在的“爱心”。 把“闲置品”变爱心 6月12日,“益心一益·守护梦想每一步”2018年四

最新文章

  • 使用 STL 实现一个 C++ 线程池

  • C++ 线程池参考链接:基于C++11实现线程池的工作原理;c++简单线程池实现https://www.cnblogs.com/ailumiyana/p/10016965.html https://www.cnblogs.com/yangang92/p/5485868
  • 《人物志》七似:七种似是而非的“懂王”操作

  • 文/CxEric《人物志》是三国时期一本“观人”奇书,作者刘劭。有坊间传闻,曾国藩爱研读此书,但我无法判断真假。《人物志》所说的观人,不是摸骨看相一类,也不是占卜算命那套,而是试
  • 擅长不易,喜欢也挺难

  • 文/CxEric1很多文章都会告诉我们,年轻人要尽快找到自己喜欢+擅长的事情,这样职业生涯、事业才会顺畅,人生才更有意义。但就我的观察来看,这两件事情都挺难的:找到自己擅长的事情
  • 一岁一|CxEric的碎碎念聚会

  • 各位新年好。我将在明天做一次视频交流活动,主题名为《一岁一》,活动平台为腾讯会议。我想借年底/新年的机会,与各位朋友分享交流这一年的感悟、思考,可以跟投资有关,也可以无关
  • 跟AI闲聊半天:人类炒股的正确方式

  • 文/CxEric、ChatGPT最近ChatGPT很火,有人说这是颠覆性技术,有人说它会让很多基础工种失业,而我第一反应是这个东西好像很有趣啊。抱着好玩的心态,在闯了几个关后,我跟ChatGPT聊了
  • 看青涩少年沦陷东莞浴场,我理解了下海

  • 一位ID为「奶油日记」的男孩在短视频平台火了。连续4个月,奶油用视频记录了打工之路的日夜,却在一段肉身沦陷的短片中戛然而止。小镇青年进城打工迷失的故事我们听的太多 ,然而