之前公司项目中一直没有方便好用的异步编程框架,在开发过程中会遇到一些问题。比如多线程编码时开发需要自己管理线程,并在两侧同步数据。一些操作也会被直接放在主线程中,就算单个操作的耗时并不明显,长此以往也会积少成多,最终拖慢了 UI 的速度。

但好在公司的代码库已经支持 C++11,我们基于 C++11 开发了一个仅使用标准库的框架。这样既能简化业务开发时的代码工作量,也可以提高代码的可读性,而且使用 std 组件实现也方便支持所有平台。更进一步的话还可以给框架加入其他支持,如:

  • task 优先级
  • task 延迟调用、重复调用,并可自定义调用间隔等属性
  • 指定多个 task 在同一个线程中运行

基本结构

显然每个 task 开一个线程非常不环保,所以首先需要实现一个基本的线程池。不到 100 行就可以搞定一个纯头文件的简单实现 ThreadPool。使用方法也非常简单,参见项目的介绍即可。

通常客户端程序除了 UI 处理外,大多的业务逻辑也会放在主线程一起执行。所以比较常见的模式是在主线程提交一个耗时操作到工作线程,当任务完成后再回到主线程做下一步的处理。除非项目里其他组件都是线程安全的,否则在工作线程里直接做后续操作(比如同步数据状态等)都可能会引发竞态条件,所以这里默认都让主线程去完成这些逻辑操作。

而且也不可能让 UI 线程/主线程去做同步等待,所以会在 UI 线程做 OnIdle 处理时会加入一个对完成队列的检查,当任务完成后结果被 post 到完成队列里时,主线程再去取出来执行后续步骤。

PS:在 windows 平台上可以利用 UI 线程的消息循环,自定义一个消息并将结果用 PostMessage 传过去,这样就可以优化掉 OnIdle 时的队列检查,处理会更加高效,但这样的话其他平台需要做类似的适配,用自身平台的机制实现同样效果。

基于这个线程池,我们可以快速搞出一个提交任务处理的基本结构:

//
// 定义一个线程池,全局或者是在特定范围的类里均可,同时设置好最大线程数
//
ThreadPool th_pool_(5);  // 指定池里的最大线程数,这里随便写了个 5
ThreadSafeQueue<std::packaged_task<void()>> completed_queue_;  // 完成队列,用来存放需要主线程做后续处理的 task

//
// 在任何需要的地方将 task 提交到线程池中运行
//
void Panel::OnButtonClick() {
  // ... 其他代码
  std::string full_path = ...;
  th_pool_.enqueue([this, full_path]() {
    // 以下代码都在工作线程中执行

    // 做一些耗时操作,比如读写文件等
    auto result = LoadData(full_path);

    // 工作线程部分的操作结束后在主线程进行后续操作,比如通知其他组件操作结果,
    // 刷新 UI 等,需要新建一个 task 并 post 给主线程
    auto ptask = std::make_shared<std::packaged_task<void()>>([this, result]() {
      // 这个 lambda 内的代码会在主线程中执行
      log::info("save data result: " << result);
      if (this->sink_)
        this->sink_->OnDataLoaded(result);
      UpdateLabel();
    });
    // 将下一步的 task 放到完成队列中等待主线程执行
    completed_queue_.push(ptask);
  });
}

//
// 主线程的 OnIdle 函数
//
void OnIdle() {
  // ... 其他操作
  std::shared_ptr<std::packaged_task<void()>> tmp;
  while (finished_task_queue_.try_pop(tmp) && tmp) {
    (*tmp)();
  }
}

这里用了 packaged_task、future、thread 等来实现简单的异步和回调处理,将 callback 写成 lambda 这样后续的逻辑就一目了然,不用再到代码里东翻西找了。

封装函数对象

实际实现中我们不会直接将任务提交到线程池,而是增加一个间接层比如 TaskRunner,用来处理任务调度,跟由系统调度器默认安排线程来执行 task 相比有更高的灵活度。比如将一些 task 安排到同一个线程、低负载情况下尽可能地唤醒同一个线程来处理 task 等。

实现 task 的优先级、延时、重复执行也比较容易,只要定义一个 Task 类,将外部提交的 lambda 等可执行对象和优先级等属性一起打包保存,然后放到 TaskRunner 的优先队列里,再把队列里的任务分配给线程池执行就可以。重复的任务执行完毕后再次入队即可。

加入 TaskRunner 的封装也可以让主线程变成“Worker Thread”之一,对框架来讲所有线程都可以一视同仁了。

取消 Task

Task 的取消操作也是必不可少的。实际使用中我们大概率会把类对象的 this 指针传入,方便在 task 执行过程中即时更新状态等,或者 task 执行完后调用某个充当 callback 的成员函数。那这样肯定会遇到 task 还没执行,类对象就要被析构的情况;或者传入某个 lambda 表达式,但是在函数被执行前动态库就要被卸载了。在这之前就必须要取消 task,免得到时候不体面。

但是要取消已经开始执行的 task 比较麻烦,通常的处理还是只取消未开始执行的 task,对正在执行的就采取等待的方式。否则需要用到类似 pthread_cancel 的操作,并且在编写 task 逻辑时还需要考虑异常、栈展开时能正确释放资源等,比较麻烦。

也可以特殊处理下,比如外部调用 Task::Cancel 取消一个正在运行的 task 时,将 task 内部的状态设置成 Canceling。同时在 task 的执行函数里面按业务逻辑设置几个检查点,当发现状态变为 Canceling 时就立即终止后续代码的执行,和跑完全部流程相比可以尽量缩短取消时的等待时间。

除了这种持有 task 对象指针,并在适当时候调用 Cancel 接口取消任务之外,还可以给 Task 对象绑定一个 tag,这样可以一次性取消某个 tag 的所有 task,实际使用中会比较方便。比如可以给一个动态库分配一个 tag,也可以直接用一个类的 this 指针作为一个 tag。

还有一种方式可以适用于 this 指针失效的情况,就是以 weakptr 的形式传入 this,这样当类析构之后这个 weakptr 自然也就失效了。不过这对于外部代码的影响比较大,需要改动它们的实现,继承或者在类中增加 weakptr 相关的成员变量并把它传递到 task 里面。

处理能力

实际使用中还会遇到 task 中间调用了会阻塞的 api,这样导致线程池中的线程一直被占用。这个处理起来也简单,只要增加线程池的最大线程数,不要局限在 CPU 物理核心数量上就可以了,这样可以解决处理能力下降的问题。