接上篇 c++标准库:并发(三) —— 锁 mutex 和 lock
在上篇关于 std::unique_lock
的小节中,有一个用 std::unique_lock
不断循环加锁、检查条件、释放锁、sleep 来判断一个前置条件是否准备完成的例子。
例子中有一个很关键的点,在准备工作时间无法准确预估的情况下,sleep的时间到底设置多少为合适呢? 设置短了太娘炮,设置长了扯着蛋… 要么sleep过久造成程序执行反应很慢, 要么sleep过短而将CPU性能大量的浪费在检查工作上。
基于这个需求,标准库提供了 std::condition_variable
条件变量,来让我们:
- 初始化一个条件变量
condVar
- 通过
condVar.wait
操作使线程进入无限期的阻塞(wait)状态,和sleep状态一样不浪费CPU在检查工作上。多个线程可使用同一个条件变量进行wait
- 通过
condVar.notify
操作让一个或所有使用此条件变量的线程,被唤醒而进入工作状态
上面是条件变量的大致执行逻辑,《C++标准库》这本书中并没有对细节进行解释。下面我写了一些demo,可以来看看细节。
wait
下面是一个最基本的使用condition_variable的例子1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41bool readyFlag = false;
std::mutex readyFlagMutex;
std::condition_variable readyCondVar;
//sleep一段时间,将readyFlag设置为true,然后触发一次notify_one
void prepareThread(){
std::this_thread::sleep_for(std::chrono::seconds(5));
{
std::lock_guard<std::mutex> lg(readyFlagMutex);
std::cout << " 1. prepare get lock" << std::endl;
readyFlag = true;
readyCondVar.notify_one();
std::cout << " 2. prepare call notify and sleep " << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}
//通过条件变量的wait让线程进入阻塞状态,通过一个unique_lock,来实现收到notify之后执行的 锁定-检查-解锁逻辑
void waitForPrepareThread(){
{
//这里一定要使用 unique_lock,不能使用lock_guard
std::unique_lock<std::mutex> ul(readyFlagMutex);
readyCondVar.wait(ul);
std::cout << " 3. condition_variable check pass... " << std::endl;
}
std::cout << " 4. prepare job done, run something! " << std::endl;
}
void testConditionVariable(){
std::thread t1(prepareThread);
std::thread t2(waitForPrepareThread);
t1.join();
t2.join();
};
//输出:
// 1. prepare get lock
// 2. prepare call notify and sleep
// 3. condition_variable check pass...
// 4. prepare job done, run something!
分析这个case,普通的readyCondVar.wait(ul)
执行的过程可以理解为下面几个步骤:
- 等待获取锁
- 进入阻塞状态并立即释放锁
- 阻塞,直到线程被一个
notify_*
唤醒 - 等待获取锁
- 执行后续逻辑
wait(ul, predicate)
用到条件变量的场景,一把是像上面的例子中一样,某个条件ready后就notify一下,通知另一个线程继续执行。看起来很美,对吧?
很不幸的是,实际程序执行的时候,某些情况下,条件变量会出现“假唤醒”的状态。也就是说,会存在莫名其妙程序自动调了一下 notify_*
。此时如果程序直接执行下去,就很容易因为条件还未就绪而崩掉。
所以我们需要执行一个检查条件是否真的就绪的工作,而且这个检查工作本身也得加锁,以防程序因假唤醒挂掉。例如:
1 | { |
上面的语法稍显麻烦,标准库为wait方法提供了第二个参数,可以传入一个表达式,只有它的返回结果为false时让线程进入休眠,否则直接释放锁并进入后面的流程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22//(改写上例子中的 waitForPrepareThread )
//通过条件变量的wait让线程进入休眠状态,通过一个unique_lock,来实现收到notify之后执行的 锁定-检查-解锁逻辑
void waitForPrepareThread(){
{
//这里一定要使用 unique_lock,不能使用lock_guard
std::unique_lock<std::mutex> ul(readyFlagMutex);
//传入条件:为了处理假唤醒,多传入一个判断函数,加锁后,判断函数返回false的时候才释放锁并执行后面的逻辑,否则释放锁并进入休眠状态
readyCondVar.wait(ul, []{
//wait一般情况下只会被 condition_variable的 notify_one、 notify_all 唤醒,但是也存在一定可能性误唤醒
//这里不传入lamda做判断的话,误唤醒时实际上readyFlag并没有变成true
//所以这里再加一道check
std::cout << " condition_variable notified, check readyFlag... " << std::endl;
return readyFlag;
});
}
std::cout << " prepare job done, run something! " << std::endl;
}
//执行结果:
// condition_variable notified, check readyFlag...
// condition_variable notified, check readyFlag...
// prepare job done, run something!
从执行结果中可以发现,条件判断函数被执行了两次,这是因为prepare函数是sleep了5s才获取锁的,所以 readyCondVar.wait
在第一时间就获取了锁,然后进行了一次判断,结果此时 readyFlag
是 false,所以条件变量让当前线程释放锁并进入阻塞状态等待。
prepare线程sleep 5s后,将readyFlag修改为true并执行notify_one。 waitForPrepareThread被notify唤醒后,再次执行条件判断函数,此时readyFlag为true,判断函数返回true,所以不再进入阻塞,直接去执行后面的逻辑。总结起来,带条件的wait,工作流程是:
- 等待获取锁,然后进行判断
- 判断条件: 若条件为 false
– 进入阻塞状态并立即释放锁
– 阻塞,直到线程被一个notify_*
唤醒
– 等待获取锁
– 执行后续逻辑 - 判断条件: 若条件为true
– 执行后续逻辑
带条件的wait与不带条件的wait相比:
wait(ul)
一定会让线程阻塞;wait(ul, predicate)
不一定会让线程进入阻塞- 如果在循环中使用,
wait(ul)
的线程被唤醒后,执行一次工作再次wait又会进入阻塞,它被唤醒后是“单发”的;wait(ul, predicate)
如果条件是true,会一直执行任务不阻塞,是 “连发” 的。
notify_one和notify_all
上面的例子中,条件准备就绪后是notify_one唤醒一个阻塞的线程。调用notify_one时,若此时没有用此条件变量进入阻塞状态的线程,那么notify_one没有任何作用。
标准库还提供了一个notify_all来唤醒所有阻塞的线程,我目前理解的,有两种常见的场景需要用到 notify_all 。
事件驱动模型的场景
假如用 condition_variable 来做事件订阅,当某事件发生时,可以通过 notify_all
来通知给所有通过 wait 此 condition_variable 来阻塞的线程。
这个使用场景比较简单,就不写代码验证了。
部分生产者消费者模型
设想有一个场景: 一个生产者,在生产了100个耗时任务,有10个消费者线程通过条件变量等待任务去执行。那么wait和notify不同选择时会产生不同效果:
生产者\消费者 | wait(ul) | wait(ul, predicate) |
---|---|---|
notify_one 1次 | 1个线程执行1个任务 | 1个线程执行完所有任务 |
notify_one 100次 | 10个线程各执行1个任务 | 10个线程共同执行完所有任务 |
notify_all | 10个线程各执行1个任务 | 10个线程共同执行完所有任务 |
如上面的表格,要想让所有线程共同来执行所有任务,可以选择多次执行notify_one或者执行一次notify_all。
其中notify_all因为会唤醒所有线程,在生产者生产较慢而消费者较多时,容易产生不必要的锁竞争和条件判断,所以最优解是多次notify_one,但是生产者生产特征是比较集中,需要消费者共同处理的场景,直接用notify_all也可以。
通过条件变量实现一个线程队列
理解了上面各种模式的wait和notify,下面线程队列的例子就比较简单,没什么好说的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52std::queue<int> queue;
void provider(int val){
std::this_thread::sleep_for(std::chrono::seconds(5));
for(int i=0;i<50;i++)
{
{
std::lock_guard<std::mutex> lg(readyFlagMutex);
queue.push(val+i);
}
//用notify_one,只会唤醒一个 condition_variable.wait
readyCondVar.notify_one();
}
//用notify_all,会唤醒全部的 condition_variable.wait。
//readyCondVar.notify_all();
}
void consumer(std::string name){
int val;
while (true)
{
{
std::unique_lock<std::mutex> ul(readyFlagMutex);
//readyCondVar.wait(ul); //如果用 wait(ul)+notify_*,那么可能有几个 consunmer线程,任务就只会被执行几个
//wait(ul, predicate)可以清空队列
readyCondVar.wait(ul, [&]{
return !queue.empty();
});
std::cout << name << " notified, queue size: " << queue.size() << std::endl;
//对queue的操作需要在ul的保护范围内进行,以免造成线程安全未知错误
val = queue.front();
queue.pop();
//ul生命周期结束,锁自动释放
}
std::cout << name << " notified, get value: " << val << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
};
void testQueue(){
std::thread pro1(provider, 1);
std::thread consum1(consumer, "tom");
std::thread consum2(consumer, "lily");
std::thread consum3(consumer, "lihua");
std::thread consum4(consumer, "cat");
std::thread consum5(consumer, "dog");
pro1.join();
consum1.join();
consum2.join();
consum3.join();
consum4.join();
consum5.join();
};
std::condition_variable_any
点开标准库的头文件可以看到,condition_variable
的 wait
的定义是1
wait(unique_lock<mutex>& __lock);
也就是说,wait只能接收一个基于标准库互斥锁实现的 unique_lock。
假如我们想基于自己的业务特点实现一些自定义的锁,condition_variable
就有点无能为力了,此时可以使用 std::condition_variable_any
。
如下面的例子,将上面的互斥锁替换为基于原子量实现的自旋锁来实现demo。
1 | #include <atomic> |
如上,将 std::mutex
用自定义的自旋锁替换掉,然后使用 std::condition_variable_any
来做条件变量,可以接受 std::unique_lock<SpinLock>
来作为参数。
本文链接:https://www.zoucz.com/blog/2021/06/09/83619610-c87d-11eb-9fe7-534bbf9f369d/