# [C++任务队列与多线程](https://www.cnblogs.com/zhiranok/archive/2013/01/14/task_queue.html) ### 摘要: 很多场合之所以使用C++,一方面是由于C++编译后的native code的高效性能,另一方面是由于C++优秀的并发能力。并行方式有多进程 和多线程之分,本章暂且只讨论多线程,多进程方面的知识会在其他章节具体讨论。多线程是开发C++服务器程序非常重要的基础,如何根据需求具体的设计、分配线程以及线程间的通信,也是服务器程序非常重要的部分,除了能够带来程序的性能提高外,若设计失误,则可能导致程序复杂而又混乱,变成bug滋生的温床。所以设计、开发优秀的线程组件以供重用,无论如何都是值得的。 ​ 线程相关的api并不复杂,然而无论是linux还是windows系统,都是c风格的接口,我们只需简单的封装成对象,方便易用即可。任务队列是设计成用来进行线程间通信,使用任务队列进行线程间通信设计到一些模式,原理并不难理解,我们需要做到是弄清楚,在什么场景下选用什么样的模式即可。 #### 任务队列的定义: ​ 任务队列对线程间通信进行了抽象,限定了线程间只能通过传递任务,而相关的数据及操作则被任务保存。任务队列这个名词可能在其他场景定义过其他意义,这里讨论的任务队列定义为:能够把封装了数据和操作的任务在多线程间传递的线程安全的先入先出的队列。其与线程关系示意图如下: | | | | ---- | ------------------------------------------------------------ | | | [![clip_image001](Task_And_multithreading.assets/14221931-2c40b86b81ba476a850e3b1a6283218c.gif)](https://images0.cnblogs.com/blog/282357/201301/14221931-eb26d8a176994c2c904f07d61a3a045e.gif) | 注:两个虚线框分别表示线程A和线程B恩能够访问的数据边界,由此可见 任务队列是线程间通信的媒介。 #### 任务队列的实现: ##### 任务的定义 ​ 生产者消费者模型在软件设计中是极其常见的模型,常常被用来实现对各个组件或系统解耦合。大到分布式的系统交互,小到网络层对象和应用层对象的通讯,都会应用到生产者消费者模型,在任务队列中,生产和消费的对象为“任务”。这里把任务定义为组合了数据和操作的对象,或者简单理解成包含了void (void*) 类型的函数指针和void* 数据指针的结构。我们把任务定义成类task_t,下面来分析一下task_t的实现。 插入代码: [![复制代码](Task_And_multithreading.assets/copycode.gif)](javascript:void(0);) ``` class task_impl_i { public: virtual ~task_impl_i(){} virtual void run() = 0; virtual task_impl_i* fork() = 0; }; class task_impl_t: public task_impl_i { public: task_impl_t(task_func_t func_, void* arg_): m_func(func_), m_arg(arg_) {} virtual void run() { m_func(m_arg); } virtual task_impl_i* fork() { return new task_impl_t(m_func, m_arg); } protected: task_func_t m_func; void* m_arg; }; struct task_t { static void dumy(void*){} task_t(task_func_t f_, void* d_): task_impl(new task_impl_t(f_, d_)) { } task_t(task_impl_i* task_imp_): task_impl(task_imp_) { } task_t(const task_t& src_): task_impl(src_.task_impl->fork()) { } task_t() { task_impl = new task_impl_t(&task_t::dumy, NULL); } ~task_t() { delete task_impl; } task_t& operator=(const task_t& src_) { delete task_impl; task_impl = src_.task_impl->fork(); return *this; } void run() { task_impl->run(); } task_impl_i* task_impl; }; ``` [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0);) ​ Task最重要的接口是run,简单的执行保存的操作,具体的操作保存在task_impl_i的基类中,由于对象本身就是数据加操作的集合,所以构造task_impl_i的子类对象时,为其赋予不同的数据和操作即可。这里使用了组合的方式实现了接口和实现的分离。这么做的优点是应用层只需知道task的概念即可,对应task_impl_i不需要了解。由于不同的操作和数据可能需要构造不同task_impl_i子类,我们需要提供一些泛型函数,能够将用户的所有操作和数据都能轻易的转换成task对象。task_binder_t 提供一系列的gen函数,能够转换用户的普通函数和数据为task_t对象。 [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0);) ``` struct task_binder_t { //! C function static task_t gen(void (*func_)(void*), void* p_) { return task_t(func_, p_); } template static task_t gen(RET (*func_)(void)) { struct lambda_t { static void task_func(void* p_) { (*(RET(*)(void))p_)(); }; }; return task_t(lambda_t::task_func, (void*)func_); } template static task_t gen(FUNCT func_, ARG1 arg1_) { struct lambda_t: public task_impl_i { FUNCT dest_func; ARG1 arg1; lambda_t(FUNCT func_, const ARG1& arg1_): dest_func(func_), arg1(arg1_) {} virtual void run() { (*dest_func)(arg1); } virtual task_impl_i* fork() { return new lambda_t(dest_func, arg1); } }; return task_t(new lambda_t(func_, arg1_)); ``` [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0);) ##### 生产任务 函数封装了用户的操作逻辑,需要在某线程执行特定操作时,需要将操作对应的函数转换成task_t,投递到目的线程对应的任务队列。任务队列使用起来虽然像是在互相投递消息,但是根本上仍然是共享数据式的数据交换方式。主要步骤如下: l 用户函数转换成task_t对象 l 锁定目的线程的任务队列,将task_t 放到任务队列尾,当队列为空时,目的线程会wait在条件变量上,此时需要signal唤醒目的线程 实现的关键代码如下: [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0);) ``` void produce(const task_t& task_) { lock_guard_t lock(m_mutex); bool need_sig = m_tasklist.empty(); m_tasklist.push_back(task_); if (need_sig) { m_cond.signal(); } } ``` [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0);) ##### 消费任务 消费任务的线程会变成完全的任务驱动,该线程只有一个职责,执行任务队列的所有任务,若当前任务队列为空时,线程会阻塞在条件变量上,重新有新任务到来时,线程会被再次唤醒。实现代码如下: [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0);) ``` int consume(task_t& task_) { lock_guard_t lock(m_mutex); while (m_tasklist.empty()) { if (false == m_flag) { return -1; } m_cond.wait(); } task_ = m_tasklist.front(); m_tasklist.pop_front(); return 0; } int run() { task_t t; while (0 == consume(t)) { t.run(); } return 0; } ``` [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0);) #### 任务队列的模式 ##### 单线程单任务队列方式 任务队列已经提供了run接口,绑定任务队列的线程只需执行此函数即可,此函数除非用户显示的调用任务队列的close接口,否则run函数永不返回。任务队列的close接口是专门用来停止任务队列的工作的,代码如下: ``` void close() { lock_guard_t lock(m_mutex); m_flag = false; m_cond.broadcast(); } ``` 首先设置了关闭标记,然后在条件变量上执行broadcast, 任务队列的run函数也会由此退出。在回头看一下run接口的代码你会发现,检查任务队列是否关闭(m_flag 变量)的代码是在任务队列为空的时候才检测的,这样能够保证任务队列被全部执行后,run函数才返回。 下面是一个使用任务队列的helloworld的示例: [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0);) ``` class foo_t { public: void print(int data) { cout << "helloworld, data:" <