Velox_线程池模块
概述
在现代开发中,多线程可以提高程序的运行效率和响应速度,它已经成为提高应用程序性能、处理并发任务的重要手段。使用多线程需要注意线程同步、资源消耗等问题,当使用的线程数多了时,手动进行管理是十分困难的。为了解决这些问题,线程池作为一种有效的线程管理机制应运而生。
线程池会预先创建一定数量的工作线程,我们只需将待执行任务提交到线程池,线程池会负责任务的分配与执行,从而简化线程管理、减少系统频繁创建与销毁线程的开销、提高资源利用率。其核心思想是避免频繁的创建和销毁线程,减少系统开销。
主要特性
- 完整的生命周期管理:线程池具有明确的运行状态,构成一个状态机,确保在任何时刻都处于可预期的状态。
- 支持暂停和恢复:在暂停状态下,线程池会继续接收新任务,但所有工作线程将暂停执行,直到线程池被恢复。
- 支持优雅关闭:析构函数会自动调用
shutdown
函数,安全的销毁线程池,符合 RAII (Resource Acquisition Is Initialization) 原则。 - 支持调整工作线程数量:可以在运行时通过
increaseThreadCount
和decreaseThreadCount
方法动态地增加或减少工作线程的数量,以适应不同的负载需求。 - 支持自动调整工作线程数量:引入核心线程数和最大线程数两个概念,使得线程池能在工作负载变化时自动调整线程数量。核心线程始终保留在池中,而最大线程数则限定线程池可动态扩展的上限。
- 支持配置文件:可以通过
threadpool.yml
配置文件修改线程池的相关参数(如核心线程数、最大线程数等),无需重复编译。
基础知识
在正式介绍线程池模块的相关代码前,我们需要先了解一些必要的现代C++编程基础知识。
std::atomic
std::atomic
是 C++11 引入的模板类,用于实现多线程环境下的原子操作,从而避免数据竞争。原子操作是指一个不可被中断的操作,要么完全执行,要么完全不执行,在执行过程中不会被其他线程的调度打断。std::atomic简介
作用
- 替代互斥锁:对于简单的计数器、标志位等共享状态,使用
std::atomic
比使用互斥锁(std::mutex
)的开销更小,性能更高。锁通常会引起线程阻塞和上下文切换,而 「原子操作通常由特殊的 CPU 指令实现,属于无锁(Lock-Free)编程的范畴」。
用法
|
|
常用操作
load()
: 原子地读取值。store(value)
: 原子地写入值。exchange(value)
: 原子地替换为新值,并返回旧值。fetch_add(arg)
/fetch_sub(arg)
: 原子地加上/减去一个值,并返回旧值。(+=
和-=
的重载是等效的,但返回新值)。compare_exchange_weak(expected, desired)
/compare_exchange_strong(expected, desired)
: 比较并交换(CAS)操作。这是原子操作的核心,它将当前值与expected
比较,如果相等,则替换为desired
并返回true
;否则,将expected
更新为当前值并返回false
注意事项
- 内存序(Memory Order):
std::atomic
的操作可以接受一个std::memory_order
参数,用来控制当前操作如何与其他线程的内存操作同步。这是std::atomic
中最复杂也最关键的部分,其控制着原子操作在多线程之间的可见性和执行顺序。memory_order_relaxed
: 最宽松的顺序,只保证操作本身的原子性,不提供任何跨线程的顺序保证。memory_order_acquire
: 获取语义。在本线程中,所有后续的读写操作都不能重排到此操作之前。memory_order_release
: 释放语义。在本线程中,所有之前的读写操作都不能重排到此操作之后。memory_order_acq_rel
: 同时具备获取和释放语义。memory_order_seq_cst
: 顺序一致性,最强的内存序,保证所有线程看到的所有原子操作都有一个全局一致的顺序。这是所有原子操作的默认值。
- 不是万能的:
std::atomic
适用于单个变量的原子操作。如果需要保护多个变量,或者需要进行一系列复杂的操作,那么还是应该使用std::mutex
。 - 可平凡复制:
std::atomic<T>
要求类型T
是可平凡复制的(Trivially Copyable),这意味着它可以用memcpy
在内存中安全地复制。
看了上面的注意事项,你可能还在想 “内存序” 是什么鬼?有啥用?那我们就通过下面内容来了解一下: 1. 为什么需要内存序?
-
编译器和 CPU 的优化 现代编译器和 CPU 为了提高性能,会对代码执行顺序进行调整(重排序),只要不改变单线程程序的语义。例如:
1 2
x = 1; // 语句 A y = 2; // 语句 B
在单线程中,编译器 / CPU 可能会交换语句 A 和 B 的执行顺序,因为它们之间没有依赖关系。但在多线程环境中,这种重排序可能会导致其他线程看到不一致的内存状态。
-
多处理器系统的缓存一致性问题 在多核系统中,每个 CPU 核心有自己的缓存。当一个线程修改变量时,这个修改可能先在缓存中生效,而其他核心的缓存尚未更新。如果没有适当的同步机制,其他线程可能读取到旧值。
-
性能与正确性的权衡 如果所有操作都强制按顺序执行并保证全局可见性,虽然简单但会导致性能下降。内存序允许开发者根据实际需求放宽约束,提高性能。
2. 内存序如何解决上述问题?
C++ 提供了 6 种内存序(实际常用 4 种),通过控制 内存可见性 和 指令重排序 来平衡性能和正确性:
(1) std::memory_order_relaxed
该内存序只保证操作本身的原子性,不提供任何跨线程的顺序保证,例如:
|
|
因为relaxed
不提供任何跨线程的顺序保证,所以线程2的读取到counter
值可能为0(在加操作前读取),也可能为1(在加操作后读取)。该内存序只适用于计数器自增等无需同步的操作。
(2)std::memory_order_release
和 std::memory_order_acquire
在同一线程中release
强制要求此操作之前的读写操作都不能重排到此操作后;在同一线程中acquire
强制要求此操作之后的读写操作都不能重排到此操作之前。两者配合使用可以实现同步原语,是一种轻量级的同步机制。直接看示例:
|
|
memory_order_release
保证data=42
不会重排到ready=true
之后memory_order_acquire
保证assert
不会重排到while
之前
(3)std::memory_order_seq_cst
(默认)
这是原子操作中最安全的内存序,代价为性能较低,它保证了:所有线程看到的所有原子操作,具有全局统一的执行顺序(因为会更新全部核的缓存),且同一线程内的执行顺序与代码中写的顺序一致。直接看示例:
|
|
在上面的代码中,合法的执行顺序只可能为下面的6种:
|
|
seq_cst
规定了A必须在B前面,C必须在D前面(执行顺序与代码中写的顺序一致),且当某个原子操作执行完成后,其他线程也能马上看到该操作。所以,不存在输出r1: 0, r2: 0
的情况,因为 r1
为 0 说明A已经执行(线程2能马上看到A操作的结果,即x
的值被修改为了1),r2
不可能为0;同理 r2
为 0 说明C已经执行,r1
不可能为0。
==注意:==内存序十分重要和复杂,刚开始使用时推荐拷打AI老师(建议同时拷打3个),逐步积累使用经验。
std::async
std::async
是一个 C++11 引入的函数模板,它以一种简单直接的方式异步地运行一个可调用对象(如函数、lambda 表达式),并返回一个 std::future
,该 std::future
将在未来的某个时刻持有该异步任务的计算结果。
作用
- 简化异步编程:
std::async
是启动异步任务最简单的方法之一。它封装了线程的创建和管理,让开发者可以像调用普通函数一样启动一个并发任务,而无需手动创建std::thread
对象。 - 获取返回值和异常:它自动将任务的返回值或抛出的异常捕获并存入返回的
std::future
对象中,使得在主线程中获取结果和处理异常变得非常方便。 - 抽象化线程管理:它允许运行时库(Runtime Library)根据系统负载和可用资源来决定任务的执行方式,例如是在新线程中运行还是延迟执行,从而可能实现更好的性能和资源利用。
用法
std::async
的基本用法是传入一个可调用对象和其所需的参数。
|
|
常用操作
std::async
本身是一个函数,其操作就是调用它。关键在于如何配置它的行为,这通过启动策略(launch policy)参数来控制:
-
std::async(std::launch::async, f, args...)
:- 行为: 保证可调用对象
f
在一个新的线程上异步执行,类似于创建了一个std::thread
。 - 效果: 任务会立即开始(或尽快由操作系统调度),与调用者并发执行。
- 行为: 保证可调用对象
-
std::async(std::launch::deferred, f, args...)
:- 行为: 任务被延迟执行。它不会立即启动,而是在返回的
std::future
上首次调用get()
或wait()
时,才会在调用get()
或wait()
的那个线程上同步执行。 - 效果: 没有并发,只是将函数的执行推迟了。
- 行为: 任务被延迟执行。它不会立即启动,而是在返回的
-
std::async(f, args...)
(不指定策略,使用默认值):- 行为: 这是默认策略,等价于
std::launch::async | std::launch::deferred
。实现可以自由选择是在新线程中立即执行,还是延迟执行。 - 效果: 行为不确定,取决于具体的标准库实现和当时的系统状态。这可能导致性能问题或意外的阻塞。
- 行为: 这是默认策略,等价于
注意事项
-
显式指定启动策略:由于默认策略的行为不确定,强烈建议总是显式指定
std::launch::async
或std::launch::deferred
。std::launch::async
是最常用的,因为它能保证真正的并发。 -
future
的析构函数行为:如果由std::async
返回的std::future
在其关联的任务尚未完成时被销毁,该future
的析构函数会阻塞,直到任务执行完毕。这是一种安全机制,防止程序在后台线程仍在运行时意外退出,但也可能导致非预期的等待。1 2 3 4 5 6 7 8
void might_block() { // fut 是局部变量,在函数返回时会被销毁 // 如果任务还在运行,析构函数会阻塞在此处 std::future<void> fut = std::async(std::launch::async, []{ std::this_thread::sleep_for(std::chrono::seconds(5)); }); } // <- 此处可能阻塞
-
参数传递:传递给
std::async
的参数会被复制或移动到任务的内部存储中。如果想通过引用传递参数以避免拷贝,并希望在任务中修改原始值,需要使用std::ref
或std::cref
进行包装。
std::packaged_task
std::packaged_task
是一个 C++11 引入的模板类,它将一个可调用对象(函数、lambda 等)包装起来,使其可以被异步调用。它的核心功能是将任务的定义与任务的执行以及其未来结果分离开来。
作用
- 解耦任务与线程:
std::packaged_task
允许你创建一个“任务包”,这个包内含一个要执行的操作。你可以稍后决定这个任务在哪个线程上执行,甚至可以将其存储在一个队列中,由一个线程池来处理。 - 连接可调用对象与
std::future
:它像一个桥梁,一端连接着你的函数代码,另一端通过get_future()
方法提供一个std::future
对象。当这个任务包被执行时,其返回值或异常会自动被存入关联的std::future
中。 - 构建线程池和任务队列:它是实现线程池、任务调度器等高级并发模式的关键组件。你可以创建一堆
packaged_task
,把它们放入一个队列,然后让工作线程从队列中取出任务并执行。
用法
基本流程分为三步:包装任务、获取 future
、执行任务。
|
|
常用操作
- 构造函数
std::packaged_task<Signature>(callable)
:- 用一个可调用对象
callable
来创建一个任务包。Signature
是函数的签名,例如int(int, double)
。
- 用一个可调用对象
get_future()
:- 返回一个与该任务关联的
std::future
对象。注意:get_future()
只能调用一次。
- 返回一个与该任务关联的
operator()
:- 执行被包装的任务。调用
task(args...)
就会执行内部的可调用对象。任务的返回值(或异常)会被自动存入future
。
- 执行被包装的任务。调用
make_ready_at_thread_exit()
:- 在任务执行后,调用此方法可以使任务的共享状态在线程退出时才变为
ready
。这在处理线程局部变量(thread-local variables)时非常有用。
- 在任务执行后,调用此方法可以使任务的共享状态在线程退出时才变为
reset()
:- 重置
packaged_task
的状态,使其可以被再次执行。它会释放之前的结果,并重新与一个新的std::future
关联(需要再次调用get_future()
)。
- 重置
注意事项
- 移动语义(Move-Only):
std::packaged_task
是一个只移类型(Move-Only),它不可复制。这意味着你不能将其拷贝到多个地方,只能通过std::move
来转移其所有权,例如将其存入容器或传递给线程。 - 一次性的
get_future()
:与std::promise
类似,一个packaged_task
实例的get_future()
方法只能被成功调用一次。再次调用会抛出std::future_error
。
std::future
std::future
提供了一种访问异步操作结果的机制。当你启动一个异步任务时(例如,通过 std::async
,std::packaged_task
或 std::promise
),你会得到一个 std::future
对象。这个 future
对象在未来的某个时刻会持有该异步任务的计算结果或抛出的异常。
作用
- 获取异步结果:主线程可以随时通过
future
对象查询异步任务是否完成,并获取其返回值,而无需自己手动管理线程和共享状态。 - 线程同步:
future
的.get()
或.wait()
方法会阻塞当前线程,直到异步任务完成。这是一种简单的线程同步方式。 - 异常传递:如果异步任务在执行过程中抛出异常,该异常会被捕获并存储在
future
对象中。当主线程调用.get()
时,该异常会被重新抛出。
用法
最常见的用法是与 std::async
配合使用:
|
|
常用操作
get()
: 等待异步任务完成并返回其结果。如果任务抛出异常,get()
会重新抛出该异常。注意:get()
只能调用一次。wait()
: 等待异步任务完成,但不获取结果。wait_for()
/wait_until()
: 等待一段时间或直到某个时间点。valid()
: 检查future
对象是否与一个共享状态关联。调用get()
后,future
会变为 invalid。
注意事项
get()
的一次性:std::future
的get()
成员函数只能被调用一次。因为结果(或异常)可能会被移动出来,而不是复制。再次调用会抛出异常。如果需要多次访问结果,应该使用std::shared_future
。std::async
的析构行为:如果一个由std::async
返回的std::future
对象在析构时,其关联的异步任务还未完成,那么这个析构函数会阻塞,直到任务完成。这被称为“延迟销毁”,有时会造成意想不到的阻塞。
std::invoke_result_t
std::invoke_result_t
是 C++17 中引入的类型别名,用于在编译期推断一个可调用对象(函数、函数指针、lambda 表达式、成员函数指针、函数对象等)在以特定参数调用时,其返回值的类型。它是 std::invoke_result<F, Args...>::type
的简写形式。
作用
- 编译期类型推导:它允许你在编写模板或泛型代码时,能够提前知道一个函数调用将返回什么类型,而无需实际执行该调用。
- SFINAE 和元编程:在模板元编程中,它可以用于根据函数返回类型进行 SFINAE(Substitution Failure Is Not An Error,替换失败并非错误)判断,或者定义依赖于函数返回类型的变量和数据结构。
用法
|
|
注意事项
- 非静态成员函数:对于非静态的成员函数,
invoke_result_t
只能使用其函数指针才能进行返回值类型推导,因为类名::成员函数名
不是完整的成员限定表达式,只有加了&
成为成员函数指针,才能脱离类定义被使用。 - C++17标准:
std::invoke_result_t
是 C++17 的特性,它在处理某些复杂情况(如成员函数指针)时不如std::invoke_result
强大且已被 C++20 废弃。
std::move
std::move
是一个标准库函数,它的主要作用是声明一个对象可以被“移动”,即其资源可以被窃取。它本身并不执行任何移动操作,真正的移动操作是由移动构造函数或移动赋值运算符完成的。
作用
- 启用移动语义:将左值(有名字、可以取地址的对象)转换为右值,从而使得可以调用对象的移动构造函数或移动赋值运算符,避免不必要的深拷贝,提升性能。
- 转移所有权:对于像
std::unique_ptr
、std::thread
这种只移类型(Move-Only Type),std::move
是转移其所有权的唯一方式。
用法
|
|
注意事项
- 被移动后的对象状态:使用
std::move
后,原对象处于“有效的、但未指定的状态”(valid but unspecified state)。这意味着你不能对其状态做任何假设(例如,它可能为空,也可能不是),但你可以安全地对其进行销毁或重新赋值。在对其重新赋值之前,不应再使用它。 - 不要对
const
对象使用std::move
:对const
对象std::move
会得到一个const
类型的右值引用,这通常会退化为一次拷贝操作,因为const
对象是不可修改的,其资源无法被移动。 - 仅在需要转移所有权时使用:
std::move
的名字有一定误导性,它更像是一个“转换”或“允许移动”的请求。只有当你确定不再需要原对象,并希望将其资源转移给新对象时,才应该使用它。
std::forward
std::forward
是一个条件转换函数,主要用于模板编程中的“完美转发”(Perfect Forwarding)。它根据模板参数的类型(左值引用或右值引用),将函数参数以原始的值类别(左值或右值)转发给另一个函数。
作用
- 保持值类别:在模板函数中,函数参数会丢失其原始的值类别信息(都变成了左值)。
std::forward
的作用是恢复这个信息,如果原始传入的是右值,就转发为右值;如果原始传入的是左值,就转发为左值。 - 实现完美转发:这使得我们可以编写一个模板函数作为“中转站”,它能接收任意类型的参数,并将其无损地(不改变值类别,不产生额外拷贝)传递给下一个函数。
用法
它通常用在接受转发引用(Forwarding Reference,也叫万能引用 Universal Reference)的模板函数中。转发引用是一种特殊的模板参数形式 T&&
,其中 T
是一个需要推导的类型。
|
|
上面的示例中,T的类型被推导出来后,wrapper
函数的参数就为 推导的类型 + &&
,这时会发生引用坍缩,参数类型会变为正确的类型。
注意事项
- 必须与转发引用配合:
std::forward
只应在接受转发引用(T&&
)的上下文中使用。如果用在普通的值传递或非转发引用的参数上,行为可能不符合预期。 - 模板参数
T
是关键:std::forward<T>
的模板参数T
必须是函数模板推导出的类型,它包含了原始参数是左值还是右值的信息。
std::apply
std::apply
是 C++17 引入的函数,它允许你使用一个元组(std::tuple
)或类似元组的对象(如 std::pair
, std::array
)的元素作为参数来调用一个可调用对象。
作用
- 解包元组:将一个元组中的元素“解开”,并按顺序作为独立的参数传递给一个函数。这在泛型编程和处理可变参数模板时非常有用。
- 简化函数调用:当你需要调用的函数的参数被存储在一个元组中时,
std::apply
提供了一种简洁、优雅的方式来执行调用,而无需手动索引元组的每个元素。
用法
|
|
注意事项
- C++17 标准:
std::apply
是 C++17 的特性。在之前的版本中,需要手动编写复杂的模板元编程代码(通常涉及索引序列std::index_sequence
)来实现类似的功能。 - 参数顺序:元组中的元素会严格按照从 0 开始的索引顺序,依次映射到函数的第一个、第二个、第三个…参数。
- 兼容性:任何支持
std::get
和std::tuple_size
的类(Tuple-like-object)都可以与std::apply
一起使用。这包括std::tuple
,std::pair
和std::array
。
互斥量与锁包装器
在C++17标准下,常用的互斥量如下所示:
互斥锁类型 | 简介 | 使用场景 |
---|---|---|
std::mutex |
最基本的互斥锁,不能递归锁定,一个线程加锁后,其他任何线程(包括自己)都不能再次加锁 | 通用互斥锁,保护绝大多数常规的临界区 |
std::recursive_mutex |
允许同一个线程多次加锁的互斥锁,需手动解锁相同次数后,其他线程才能获取 | 需要在递归函数或同一线程中多次加锁的场景 |
std::timed_mutex |
带计时的互斥锁,超时自动解锁 | 当你不希望线程无限期地等待一个锁时,如线程池任务调度、定时任务等 |
std::recursive_timed_mutex |
结合递归锁和超时锁的特性的互斥锁 | 既需要递归锁定又要求支持超时的场景,如复杂嵌套逻辑下的任务控制 |
std::shared_mutex |
读写锁,多个线程可共享读,写互斥 | 读多写少场景,如缓存、配置、数据库读接口等 |
直接调用锁的lock()
和 unlock()
函数是危险的,因为异常或复杂的逻辑分支可能导致 unlock()
被跳过。因此,推荐使用锁包装器来管理锁,锁包装器采用RAII机制,能够确保安全的使用锁。常用的锁包装器有以下几种:
std::lock_guard
最简单、高效的锁包装器。
- 工作方式: 在构造时,它接收一个或多个互斥量(C++17 开始支持多个)并立即对它们加锁。在析构时(离开作用域),它会自动解锁。
- 特点:
- 简单粗暴: 一旦创建就加锁,没有其他多余操作。
- 不可移动,不可复制: 它的所有权与作用域绑定。
- 无额外开销: 通常会被编译器优化掉,性能与手动
lock/unlock
几乎无异,但更安全。 - 独占访问: 可以确保某一时刻只有一个线程可以访问被保护的共享资源。
- 适用场景: 当你需要在一个完整的作用域内锁定一个或多个互斥量,且不需要任何高级的锁操作时。这是最常用的选择。
|
|
std::unique_lock
最灵活的锁包装器,相比与std::lock_guard
提供了更多的灵活性,但也是独占访问。
- 工作方式: 提供了对互斥量所有权的完全控制。
- 特点:
- 所有权管理:
std::unique_lock
对象拥有其管理的互斥量的锁。这个所有权可以被转移(通过移动构造或移动赋值),也可以被临时释放和重新获取。 - 可移动,不可复制: 可以作为函数返回值,或存入容器中。
- 支持延迟加锁: 可以在构造时不加锁(使用
std::defer_lock
),稍后手动调用lock()
。 - 支持尝试加锁: 可以使用
try_lock()
、try_lock_for()
、try_lock_until()
。 - 支持所有权转移:
release()
方法会返回底层互斥量的指针并放弃所有权,但不会解锁。std::move
可以将所有权转移给另一个unique_lock
。
- 所有权管理:
- 适用场景:
- 与条件变量(
std::condition_variable
)配合使用:这是unique_lock
最核心的用途。条件变量的wait
系列函数必须接收一个std::unique_lock
。 - 需要提前解锁: 在临界区结束前,如果某些操作不再需要锁的保护,可以调用
unlock()
提前释放锁。 - 锁的作用域不固定: 当锁的生命周期需要跨越多个作用域或在函数间传递时。
- 与条件变量(
|
|
std::shared_lock
std::shared_mutex
的共享模式(读模式)锁包装器。
- 工作方式: 类似于
unique_lock
,但它在构造时获取的是共享锁。 - 特点:
- 与
unique_lock
类似,它也是可移动、可延迟加锁、可尝试加锁的。 - 与
std::unique_lock<std::shared_mutex>
(写锁) 配合使用。
- 与
- 适用场景: 用于
std::shared_mutex
的读锁定。
|
|
std::scoped_lock
std::lock_guard
的终极进化版,也是 C++17 中处理多互斥量锁定的最佳实践。
- 工作方式: 这是一个可变参数模板,可以同时接收任意数量的互斥量,并以避免死锁的方式将它们全部锁定。
- 特点:
- 死锁避免算法: 它内部使用了一种死锁避免算法(如
std::lock
函数),确保在锁定多个互斥量时不会因为加锁顺序不同而导致死锁。 - RAII 封装: 与
lock_guard
一样,它在构造时加锁,析构时按加锁相反的顺序解锁。 - 语法简洁: 是 C++17 中锁定多个互斥量的首选方式。
- 死锁避免算法: 它内部使用了一种死锁避免算法(如
- 适用场景: 当你需要同时锁定两个或更多互斥量时。
条件变量
互斥量解决了“访问”的互斥问题,确保同一时间只有一个线程能进入临界区。但它无法解决“等待”的同步问题,例如:
- 生产者-消费者问题:当队列为空时,消费者线程如何高效地等待生产者放入数据,而不是通过一个
while(true)
循环不停地检查队列(这被称为“忙等待”,会浪费大量 CPU 资源)? - 任务完成通知:一个主线程如何等待多个工作线程全部完成其初始化工作后,再继续执行?
条件变量正是为了解决这类问题而生的。
核心作用
- 线程阻塞与唤醒: 允许线程在某个条件不满足时,原子地释放互斥锁并进入阻塞(睡眠)状态,从而避免了“忙等待”。当其他线程改变了该条件后,可以发送通知来唤醒等待的线程。
- 线程间的同步信令: 作为一种线程间通信机制,用于同步执行流程,一个线程等待(
wait
),另一个线程通知(notify
)。
C++ 标准库提供了两种条件变量:
std::condition_variable
: 最高效,但它必须与std::unique_lock<std::mutex>
配合使用。这是最常用的条件变量。std::condition_variable_any
: 更通用,可以与任何满足BasicLockable
要求的锁类型配合(例如std::shared_lock
),但可能会有额外的性能开销。除非有特殊需求,否则应优先使用std::condition_variable
。
工作流程
条件变量的典型工作流程如下,以一个等待线程(消费者)和一个通知线程(生产者)为例:
等待线程 (Waiting Thread):
- 获取
std::mutex
的std::unique_lock
。 - 检查条件。通常在一个循环中(
while
或if
)检查某个共享状态(如queue.empty()
)。 - 如果条件不满足: a. 调用
cv.wait(lock)
。这个调用会原子地做三件事: i. 解锁互斥量lock
。 ii. 阻塞当前线程,使其进入睡眠状态。 iii. (当被唤醒后)重新加锁互斥量lock
,然后wait
函数返回。 b.wait
返回后,循环会再次检查条件。 - 如果条件满足:线程跳出循环,继续持有锁并执行后续操作(如从队列中取数据)。
- 在离开作用域时,
unique_lock
的析构函数会自动解锁互斥量。
通知线程 (Notifying Thread):
- 获取与等待线程相同的
std::mutex
的std::lock_guard
或std::unique_lock
。 - 修改共享状态,使得等待线程的条件得以满足(如向队列中添加数据)。
- (可选但推荐)在修改完共享状态后,并且在解锁互斥量之前,调用
cv.notify_one()
或cv.notify_all()
来唤醒一个或所有正在等待的线程。 - 离开作用域,锁被释放。
std::condition_variable
主要成员函数
wait(std::unique_lock<std::mutex>& lock)
: 使当前线程阻塞,直到被notify
。如上所述,它会自动释放锁并在此后重新获取。wait(std::unique_lock<std::mutex>& lock, Predicate pred)
: 这是一个重载版本,是强烈推荐的使用方式,可以有效避免虚假唤醒。pred
一个可以调用的对象或者函数,函数或者对象没有参数并且需要返回一个bool类型的值,线程将会不停的调用wait
函数直到该返回值为true
。该版本的行为分为两部分:- 初次进入 wait() 时:
- 自动调用
predicate()
; - 若返回
true
,直接跳出,不阻塞; - 若返回
false
,释放锁并阻塞线程(让出 CPU)。
- 自动调用
- 线程被唤醒后(收到 notify):
- 先重新获得锁;
- 然后再次调用
predicate()
判断; - 如果为
true
:继续执行; - 如果为
false
:继续阻塞(避免虚假唤醒);
- 初次进入 wait() 时:
|
|
wait_for(lock, duration)
/wait_for(lock, duration, pred)
: 带超时的等待。如果在指定的duration
时间段内没有被唤醒(或pred
未满足),wait
也会返回。可以通过其返回值判断是正常唤醒还是超时返回。wait_until(lock, time_point)
/wait_until(lock, time_point, pred)
: 与wait_for
类似,但等待直到一个指定的时间点time_point
。notify_one()
: 唤醒一个正在等待的线程。如果有多个线程在等待,操作系统会选择其中一个来唤醒。如果没有线程在等待,此调用无效。notify_all()
: 唤醒所有正在等待的线程。这些被唤醒的线程会竞争同一个互斥锁,最终只有一个能立即执行,其他线程会继续阻塞在锁上。
设计概览
classDiagram namespace 线程池模块 { class ThreadPool { -string m_name -atomic~Status~ m_status -mutex m_status_mutex -queue~Task~ m_task_queue -mutex m_task_queue_mutex -condition_variable m_task_queue_cv -list~WorkerThread~ m_worker_list -list~WorkerThread~ m_zombie_workers -thread m_monitor_thread +explicit ThreadPool(const ThreadPoolConfig& config) +~ThreadPool() +pause() +resume() +shutdown() +increaseThreadCount(size_t count) +decreaseThreadCount(size_t count) +setMaxTaskCount(size_t count) +getThreadCount() : size_t +getStatus() : string +getThreadPoolConfig() : ThreadPoolConfig +submit(Task) : future~Result~ -monitorLoop() -adjustThreadCount() } class WorkerThread { -ThreadPool* m_pool_ptr -atomic~Status~ m_status -BinarySemaphore m_pause_sem -thread m_thread -atomic~time_point~ m_last_active_time +explicit WorkerThread(ThreadPool* pool_ptr) +~WorkerThread() +terminate() +pause() +resume() +getLastActiveTime() : time_point -isWake() : bool -run() } class ThreadPoolConfig { +size_t max_task_count +size_t core_thread_count +size_t max_thread_count +milliseconds keep_alive_time +milliseconds monitor_interval +bool enable_dynamic_scaling +operator_equals(const ThreadPoolConfig& oth) : bool } class BinarySemaphore { -mutex m_mutex -condition_variable m_cv -bool m_flag +explicit BinarySemaphore(bool initially_available) +acquire() +release() } } ThreadPool "1" *-- "N" WorkerThread : contains ThreadPool ..> ThreadPoolConfig : uses WorkerThread ..> ThreadPool : uses WorkerThread "1" *-- "1" BinarySemaphore : containsclassDiagram namespace 线程池模块 { class ThreadPool { -string m_name -atomic~Status~ m_status -mutex m_status_mutex -queue~Task~ m_task_queue -mutex m_task_queue_mutex -condition_variable m_task_queue_cv -list~WorkerThread~ m_worker_list -list~WorkerThread~ m_zombie_workers -thread m_monitor_thread +explicit ThreadPool(const ThreadPoolConfig& config) +~ThreadPool() +pause() +resume() +shutdown() +increaseThreadCount(size_t count) +decreaseThreadCount(size_t count) +setMaxTaskCount(size_t count) +getThreadCount() : size_t +getStatus() : string +getThreadPoolConfig() : ThreadPoolConfig +submit(Task) : future~Result~ -monitorLoop() -adjustThreadCount() } class WorkerThread { -ThreadPool* m_pool_ptr -atomic~Status~ m_status -BinarySemaphore m_pause_sem -thread m_thread -atomic~time_point~ m_last_active_time +explicit WorkerThread(ThreadPool* pool_ptr) +~WorkerThread() +terminate() +pause() +resume() +getLastActiveTime() : time_point -isWake() : bool -run() } class ThreadPoolConfig { +size_t max_task_count +size_t core_thread_count +size_t max_thread_count +milliseconds keep_alive_time +milliseconds monitor_interval +bool enable_dynamic_scaling +operator_equals(const ThreadPoolConfig& oth) : bool } class BinarySemaphore { -mutex m_mutex -condition_variable m_cv -bool m_flag +explicit BinarySemaphore(bool initially_available) +acquire() +release() } } ThreadPool "1" *-- "N" WorkerThread : contains ThreadPool ..> ThreadPoolConfig : uses WorkerThread ..> ThreadPool : uses WorkerThread "1" *-- "1" BinarySemaphore : contains
库架构
线程池采用了模块化设计,主要由下面三个核心组件构成:
ThreadPool
类:作为用户直接交互的接口,负责任务调度、线程管理等核心功能。用户通过创建ThreadPool
实例来提交任务、控制线程池状态、并获取线程池相关信息。ThreadPool::WorkerThread
类:作为线程池内部的工作单元,每个ThreadPool::WorkerThread
对象代表一个独立的工作线程,负责从任务队列中取出任务并执行。- 辅助工具:包括同步原语(如互斥锁、条件变量、信号量等)以及状态管理机制,它们为线程池和工作线程之间的通信、任务同步、状态变更等操作提供了必要的支撑。
各组件间的关系如下:
ThreadPool
类维护一个工作线程列表std::list<WorkerThread>
,并通过同步原语控制任务队列的访问与状态变更。并通过ThreadPool::WorkerThread
提供的接口,对工作线程发出各种指令(如暂停、恢复、终止等)。ThreadPool::WorkerThread
类定义一系列接口供ThreadPool
类使用,从而能够获取待执行任务、更新自身状态。- 辅助工具贯穿于整个库的设计与实现中,确保并发环境下的数据一致性与操作安全性。
状态转换
线程池的状态转换如下所示:
stateDiagram-v2 direction LR [*] --> RUNNING: 初始化 RUNNING --> PAUSED: 调用 pause() PAUSED --> RUNNING: 调用 resume() RUNNING --> SHUTDOWN: 调用 shutdown() PAUSED --> RUNNING: (1)shutdown()内部先调用resumeWithoutStatusLock() RUNNING --> SHUTDOWN: (2)再自动进入 SHUTDOWN --> TERMINATING: 任务队列为空 TERMINATING --> TERMINATED: 全部工作线程 join TERMINATED --> [*]stateDiagram-v2 direction LR [*] --> RUNNING: 初始化 RUNNING --> PAUSED: 调用 pause() PAUSED --> RUNNING: 调用 resume() RUNNING --> SHUTDOWN: 调用 shutdown() PAUSED --> RUNNING: (1)shutdown()内部先调用resumeWithoutStatusLock() RUNNING --> SHUTDOWN: (2)再自动进入 SHUTDOWN --> TERMINATING: 任务队列为空 TERMINATING --> TERMINATED: 全部工作线程 join TERMINATED --> [*]
工作线程的状态转换如下所示:
stateDiagram-v2 direction LR [*] --> RUNNING: 初始化 RUNNING --> PAUSED: 调用 pause() PAUSED --> RUNNING: 调用 resume() RUNNING --> TERMINATING: 调用 terminate() PAUSED --> TERMINATING: 调用 terminate() TERMINATING --> TERMINATED: run() 循环结束 TERMINATED --> [*]stateDiagram-v2 direction LR [*] --> RUNNING: 初始化 RUNNING --> PAUSED: 调用 pause() PAUSED --> RUNNING: 调用 resume() RUNNING --> TERMINATING: 调用 terminate() PAUSED --> TERMINATING: 调用 terminate() TERMINATING --> TERMINATED: run() 循环结束 TERMINATED --> [*]
线程数量增减
本线程池实现了两种方式的线程池数量增减,第一种为用户手动调用increaseThreadCount
和decreaseThreadCount
函数来进行线程数量的增减:
sequenceDiagram actor User as 用户 participant TP as ThreadPool participant WT as WorkerThread User->>TP: increaseThreadCount(N) activate TP TP->>TP: 1. 锁定状态锁 (m_status_mutex) TP->>TP: 2. 检查状态是否为 RUNNING/PAUSED loop N 次 TP->>WT: 3. new WorkerThread(this) activate WT note right of WT: 构造函数启动 std::thread,sequenceDiagram actor User as 用户 participant TP as ThreadPool participant WT as WorkerThread User->>TP: increaseThreadCount(N) activate TP TP->>TP: 1. 锁定状态锁 (m_status_mutex) TP->>TP: 2. 检查状态是否为 RUNNING/PAUSED loop N 次 TP->>WT: 3. new WorkerThread(this) activate WT note right of WT: 构造函数启动 std::thread,
线程进入 RUNNING 状态 WT-->>TP: 构造完成 deactivate WT end TP->>TP: 4. 解除状态锁 deactivate TP User->>TP: decreaseThreadCount(N) activate TP TP->>TP: 1. 锁定状态锁 (m_status_mutex) TP->>TP: 2. 检查状态是否为 RUNNING/PAUSED loop N 次 TP->>WT: 3. worker.terminate() activate WT note right of WT: 状态变为 TERMINATING,
并唤醒自己准备退出 WT-->>TP: terminate() 返回 deactivate WT TP->>TP: 4. 将 WorkerThread 移入僵尸列表 end TP->>TP: 5. 解除状态锁 deactivate TP
线程进入 RUNNING 状态 WT-->>TP: 构造完成 deactivate WT end TP->>TP: 4. 解除状态锁 deactivate TP User->>TP: decreaseThreadCount(N) activate TP TP->>TP: 1. 锁定状态锁 (m_status_mutex) TP->>TP: 2. 检查状态是否为 RUNNING/PAUSED loop N 次 TP->>WT: 3. worker.terminate() activate WT note right of WT: 状态变为 TERMINATING,
并唤醒自己准备退出 WT-->>TP: terminate() 返回 deactivate WT TP->>TP: 4. 将 WorkerThread 移入僵尸列表 end TP->>TP: 5. 解除状态锁 deactivate TP
第二种为启用监控线程(需要在初始化线程池时指定参数enable_dynamic_scaling
参数为true
),监控线程将会根据线程池的负载自动增减工作线程:
graph TD subgraph MonitorThread [监控线程: monitorLoop] A(启动监控线程) --> B(循环开始); B --> C{"m_terminating_flag为true?"}; C -- 是 --> Z([线程终止]); C -- 否 --> D[等待monitor_interval时间或被唤醒]; D --> F{"should_terminate为true?"}; F -- 是 --> Z; F -- 否 --> I["调用adjustThreadCount()"]; I --> B; end subgraph adjustThreadCount [调整线程数逻辑] I --> J[获取线程池状态锁]; J --> K{"当前状态是RUNNING或PAUSED?"}; K -- 否 --> L[释放status_lock锁]; L --> M(调整结束); K -- 是 --> N[获取相关信息]; N --> O{"满足扩容条件?"}; O -- 是 --> P["调用increaseThreadCountWithoutStatusLock(1)"]; P --> L; O -- 否 --> Q{"满足缩容条件?"}; Q -- 否 --> L; Q -- 是 --> R[计算超时空闲线程数]; R --> W{"超时空闲线程数 > 0?"}; W -- 是 --> X["调用decreaseThreadCountWithoutStatusLock(N)"]; X --> L; W -- 否 --> L; endgraph TD subgraph MonitorThread [监控线程: monitorLoop] A(启动监控线程) --> B(循环开始); B --> C{"m_terminating_flag为true?"}; C -- 是 --> Z([线程终止]); C -- 否 --> D[等待monitor_interval时间或被唤醒]; D --> F{"should_terminate为true?"}; F -- 是 --> Z; F -- 否 --> I["调用adjustThreadCount()"]; I --> B; end subgraph adjustThreadCount [调整线程数逻辑] I --> J[获取线程池状态锁]; J --> K{"当前状态是RUNNING或PAUSED?"}; K -- 否 --> L[释放status_lock锁]; L --> M(调整结束); K -- 是 --> N[获取相关信息]; N --> O{"满足扩容条件?"}; O -- 是 --> P["调用increaseThreadCountWithoutStatusLock(1)"]; P --> L; O -- 否 --> Q{"满足缩容条件?"}; Q -- 否 --> L; Q -- 是 --> R[计算超时空闲线程数]; R --> W{"超时空闲线程数 > 0?"}; W -- 是 --> X["调用decreaseThreadCountWithoutStatusLock(N)"]; X --> L; W -- 否 --> L; end
任务调度与执行
完整的任务生命周期如下所示:
sequenceDiagram actor Client as "任务提交者" participant TP as "ThreadPool" participant TQ as "任务队列" participant Future as "std::future" participant WT as "WorkerThread (空闲)" Client->>TP: submit(function, args...) activate TP TP->>TP: 1. 封装为 packaged_task TP->>Future: 2. task.get_future() activate Future Future-->>TP: future 对象 deactivate Future TP->>TP: 3. 再次封装为 std::function<void()> TP->>TQ: 4. 将任务加入任务队列 deactivate TP TP->>WT: 5. m_task_queue_cv.notify_one() TP-->>Client: 6. 返回 future 对象 activate WT WT->>TQ: 7. lock(), task = task_queue.front() activate TQ TQ-->>WT: task WT->>TQ: 8. task_queue.pop(), unlock() deactivate TQ WT->>WT: 9. 执行任务 note over WT, Future: 任务结果/异常被存入sequenceDiagram actor Client as "任务提交者" participant TP as "ThreadPool" participant TQ as "任务队列" participant Future as "std::future" participant WT as "WorkerThread (空闲)" Client->>TP: submit(function, args...) activate TP TP->>TP: 1. 封装为 packaged_task TP->>Future: 2. task.get_future() activate Future Future-->>TP: future 对象 deactivate Future TP->>TP: 3. 再次封装为 std::function<void()> TP->>TQ: 4. 将任务加入任务队列 deactivate TP TP->>WT: 5. m_task_queue_cv.notify_one() TP-->>Client: 6. 返回 future 对象 activate WT WT->>TQ: 7. lock(), task = task_queue.front() activate TQ TQ-->>WT: task WT->>TQ: 8. task_queue.pop(), unlock() deactivate TQ WT->>WT: 9. 执行任务 note over WT, Future: 任务结果/异常被存入
与 future 关联的共享状态中 deactivate WT Client->>Future: get() [阻塞等待结果] activate Future Future-->>Client: 返回任务结果或抛出异常 deactivate Future
与 future 关联的共享状态中 deactivate WT Client->>Future: get() [阻塞等待结果] activate Future Future-->>Client: 返回任务结果或抛出异常 deactivate Future
主要实现
submit
函数
线程池设计的基本理念就是:任务入队时就应该已经具备所有执行信息(函数体 + 参数),工作线程只负责调用。
|
|
代码中的注释已经详细的说明了每一行代码的作用,那么下面我们将详细说明一下这个函数的设计思路:
-
如何设计任务队列? 首先,线程池的任务队列必须能够存储各种各样的任务,那么就意味着我们只能往里面塞
void()
类型的任务对象,换句话说,所有任务都必须 “类型擦除” 为一个void()
可调用对象。因此,任务队列存储的类型就应该为std::function<void()>
,这样就能够存储任何任务了,且工作线程也只管执行任务,无需关心任务的参数和返回值。 -
如何将原始函数封装成
std::function<void()>
呢? 任务提交者肯定都是想要知道任务执行结果的,而工作线程执行的是void()
,有什么优雅的方式能够获取执行结果呢?那就是使用std::packaged_task
来进行封装! 因为该类的get_future()
方法会返回一个std::future
对象,可以获取任务的执行结果。所以,我们先使用一个
lambda
将原始任务封装成一个无参任务,再使用std::packaged_task
对无参任务进行封装,最后在入队时将其封装成一个void()
对象。这里有三个需要注意的地方:
1. 为什么
std::packaged_task
不直接将无参任务封装成void()
?因为当任务有返回值时,我们想要能够获取返回值。用
packaged_task<void()>
那么返回的future
类型就是std::future<void>
,任务提交者就无法获取任务执行结果了。2. 为什么要用
lambda
将原始任务封装成一个无参任务再由std::packaged_task
封装?因为,这种做法是标准做法,其提供了更直接的参数绑定,可以保持任务签名一致性 (
void()
),代码结构更加清晰健壮。当然也可以使用std::packaged_task
封装原始函数,在后续再把task封装成void()
,但是不推荐这样做。3. 为什么使用
shared_ptr
管理packaged_task
?最主要的原因是
packaged_task
对象必须在submit
函数返回后继续存活,直到某个工作线程最终执行它。如果使用局部变量
auto task = ...
,这个task
对象会在submit
函数结束时被销毁,当工作线程调用task()
时,它访问的是一个已经被销毁的对象。这是典型的悬挂指针/引用问题,会导致未定义行为,程序很可能会立即崩溃。此外,
std::function
要求其内部存储的可调用对象是可拷贝的,而一个捕获了std::unique_ptr
的 lambda 自身是不可拷贝的,所以不能使用std::unique_ptr
。
注意: 如果提交的任务有引用参数的话,要使用 std::ref(data)
的形式来传,因为Args&&... args
是值传递,实参会被 按值 传递进任务中(即复制),除非你明确告诉它传“引用”。
工作线程执行逻辑
工作线程就是不断执行定义的循环逻辑,直到被线程池终止。
|
|
使用示例
|
|