接上篇 c++标准库:并发(三) —— 锁 mutex 和 lock

在上篇关于 std::unique_lock 的小节中,有一个用 std::unique_lock 不断循环加锁、检查条件、释放锁、sleep 来判断一个前置条件是否准备完成的例子。
例子中有一个很关键的点,在准备工作时间无法准确预估的情况下,sleep的时间到底设置多少为合适呢? 设置短了太娘炮,设置长了扯着蛋… 要么sleep过久造成程序执行反应很慢, 要么sleep过短而将CPU性能大量的浪费在检查工作上。
基于这个需求,标准库提供了 std::condition_variable 条件变量,来让我们:

  • 初始化一个条件变量 condVar
  • 通过 condVar.wait 操作使线程进入无限期的阻塞(wait)状态,和sleep状态一样不浪费CPU在检查工作上。多个线程可使用同一个条件变量进行 wait
  • 通过 condVar.notify 操作让一个或所有使用此条件变量的线程,被唤醒而进入工作状态

上面是条件变量的大致执行逻辑,《C++标准库》这本书中并没有对细节进行解释。下面我写了一些demo,可以来看看细节。

wait

下面是一个最基本的使用condition_variable的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
bool readyFlag = false;
std::mutex readyFlagMutex;
std::condition_variable readyCondVar;

//sleep一段时间,将readyFlag设置为true,然后触发一次notify_one
void prepareThread(){
std::this_thread::sleep_for(std::chrono::seconds(5));
{
std::lock_guard<std::mutex> lg(readyFlagMutex);
std::cout << " 1. prepare get lock" << std::endl;
readyFlag = true;
readyCondVar.notify_one();
std::cout << " 2. prepare call notify and sleep " << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(10));
}

}

//通过条件变量的wait让线程进入阻塞状态,通过一个unique_lock,来实现收到notify之后执行的 锁定-检查-解锁逻辑
void waitForPrepareThread(){
{
//这里一定要使用 unique_lock,不能使用lock_guard
std::unique_lock<std::mutex> ul(readyFlagMutex);
readyCondVar.wait(ul);
std::cout << " 3. condition_variable check pass... " << std::endl;
}
std::cout << " 4. prepare job done, run something! " << std::endl;
}

void testConditionVariable(){
std::thread t1(prepareThread);
std::thread t2(waitForPrepareThread);
t1.join();
t2.join();
};

//输出:
// 1. prepare get lock
// 2. prepare call notify and sleep
// 3. condition_variable check pass...
// 4. prepare job done, run something!

分析这个case,普通的readyCondVar.wait(ul)执行的过程可以理解为下面几个步骤:

  • 等待获取锁
  • 进入阻塞状态并立即释放锁
  • 阻塞,直到线程被一个 notify_* 唤醒
  • 等待获取锁
  • 执行后续逻辑

wait(ul, predicate)

用到条件变量的场景,一把是像上面的例子中一样,某个条件ready后就notify一下,通知另一个线程继续执行。看起来很美,对吧?
很不幸的是,实际程序执行的时候,某些情况下,条件变量会出现“假唤醒”的状态。也就是说,会存在莫名其妙程序自动调了一下 notify_*。此时如果程序直接执行下去,就很容易因为条件还未就绪而崩掉。
所以我们需要执行一个检查条件是否真的就绪的工作,而且这个检查工作本身也得加锁,以防程序因假唤醒挂掉。例如:

1
2
3
4
5
6
7
8
9
{
//加锁、判断
std::unique_lock<std::mutex> ul(readyFlagMutex);
while(!readyFlag){
//readyFlag为false时,由wait进入休眠
readyCondVar.wait(ul);
}
//readyFlag为true时,自动释放锁
}

上面的语法稍显麻烦,标准库为wait方法提供了第二个参数,可以传入一个表达式,只有它的返回结果为false时让线程进入休眠,否则直接释放锁并进入后面的流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//(改写上例子中的 waitForPrepareThread )
//通过条件变量的wait让线程进入休眠状态,通过一个unique_lock,来实现收到notify之后执行的 锁定-检查-解锁逻辑
void waitForPrepareThread(){
{
//这里一定要使用 unique_lock,不能使用lock_guard
std::unique_lock<std::mutex> ul(readyFlagMutex);
//传入条件:为了处理假唤醒,多传入一个判断函数,加锁后,判断函数返回false的时候才释放锁并执行后面的逻辑,否则释放锁并进入休眠状态
readyCondVar.wait(ul, []{
//wait一般情况下只会被 condition_variable的 notify_one、 notify_all 唤醒,但是也存在一定可能性误唤醒
//这里不传入lamda做判断的话,误唤醒时实际上readyFlag并没有变成true
//所以这里再加一道check
std::cout << " condition_variable notified, check readyFlag... " << std::endl;
return readyFlag;
});
}
std::cout << " prepare job done, run something! " << std::endl;
}

//执行结果:
// condition_variable notified, check readyFlag...
// condition_variable notified, check readyFlag...
// prepare job done, run something!

从执行结果中可以发现,条件判断函数被执行了两次,这是因为prepare函数是sleep了5s才获取锁的,所以 readyCondVar.wait在第一时间就获取了锁,然后进行了一次判断,结果此时 readyFlag 是 false,所以条件变量让当前线程释放锁并进入阻塞状态等待。
prepare线程sleep 5s后,将readyFlag修改为true并执行notify_one。 waitForPrepareThread被notify唤醒后,再次执行条件判断函数,此时readyFlag为true,判断函数返回true,所以不再进入阻塞,直接去执行后面的逻辑。总结起来,带条件的wait,工作流程是:

  • 等待获取锁,然后进行判断
  • 判断条件: 若条件为 false
    – 进入阻塞状态并立即释放锁
    – 阻塞,直到线程被一个 notify_* 唤醒
    – 等待获取锁
    – 执行后续逻辑
  • 判断条件: 若条件为true
    – 执行后续逻辑

带条件的wait与不带条件的wait相比:

  • wait(ul) 一定会让线程阻塞;wait(ul, predicate) 不一定会让线程进入阻塞
  • 如果在循环中使用,wait(ul) 的线程被唤醒后,执行一次工作再次wait又会进入阻塞,它被唤醒后是“单发”的;wait(ul, predicate) 如果条件是true,会一直执行任务不阻塞,是 “连发” 的。

notify_one和notify_all

上面的例子中,条件准备就绪后是notify_one唤醒一个阻塞的线程。调用notify_one时,若此时没有用此条件变量进入阻塞状态的线程,那么notify_one没有任何作用。
标准库还提供了一个notify_all来唤醒所有阻塞的线程,我目前理解的,有两种常见的场景需要用到 notify_all 。

事件驱动模型的场景

假如用 condition_variable 来做事件订阅,当某事件发生时,可以通过 notify_all 来通知给所有通过 wait 此 condition_variable 来阻塞的线程。
这个使用场景比较简单,就不写代码验证了。

部分生产者消费者模型

设想有一个场景: 一个生产者,在生产了100个耗时任务,有10个消费者线程通过条件变量等待任务去执行。那么wait和notify不同选择时会产生不同效果:

生产者\消费者 wait(ul) wait(ul, predicate)
notify_one 1次 1个线程执行1个任务 1个线程执行完所有任务
notify_one 100次 10个线程各执行1个任务 10个线程共同执行完所有任务
notify_all 10个线程各执行1个任务 10个线程共同执行完所有任务

如上面的表格,要想让所有线程共同来执行所有任务,可以选择多次执行notify_one或者执行一次notify_all。
其中notify_all因为会唤醒所有线程,在生产者生产较慢而消费者较多时,容易产生不必要的锁竞争和条件判断,所以最优解是多次notify_one,但是生产者生产特征是比较集中,需要消费者共同处理的场景,直接用notify_all也可以。

通过条件变量实现一个线程队列

理解了上面各种模式的wait和notify,下面线程队列的例子就比较简单,没什么好说的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
std::queue<int> queue;
void provider(int val){
std::this_thread::sleep_for(std::chrono::seconds(5));
for(int i=0;i<50;i++)
{
{
std::lock_guard<std::mutex> lg(readyFlagMutex);
queue.push(val+i);
}
//用notify_one,只会唤醒一个 condition_variable.wait
readyCondVar.notify_one();
}
//用notify_all,会唤醒全部的 condition_variable.wait。
//readyCondVar.notify_all();
}

void consumer(std::string name){
int val;
while (true)
{
{
std::unique_lock<std::mutex> ul(readyFlagMutex);
//readyCondVar.wait(ul); //如果用 wait(ul)+notify_*,那么可能有几个 consunmer线程,任务就只会被执行几个
//wait(ul, predicate)可以清空队列
readyCondVar.wait(ul, [&]{
return !queue.empty();
});
std::cout << name << " notified, queue size: " << queue.size() << std::endl;
//对queue的操作需要在ul的保护范围内进行,以免造成线程安全未知错误
val = queue.front();
queue.pop();
//ul生命周期结束,锁自动释放
}
std::cout << name << " notified, get value: " << val << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
};

void testQueue(){
std::thread pro1(provider, 1);
std::thread consum1(consumer, "tom");
std::thread consum2(consumer, "lily");
std::thread consum3(consumer, "lihua");
std::thread consum4(consumer, "cat");
std::thread consum5(consumer, "dog");
pro1.join();
consum1.join();
consum2.join();
consum3.join();
consum4.join();
consum5.join();
};

std::condition_variable_any

点开标准库的头文件可以看到,condition_variablewait 的定义是

1
wait(unique_lock<mutex>& __lock);

也就是说,wait只能接收一个基于标准库互斥锁实现的 unique_lock。
假如我们想基于自己的业务特点实现一些自定义的锁,condition_variable 就有点无能为力了,此时可以使用 std::condition_variable_any
如下面的例子,将上面的互斥锁替换为基于原子量实现的自旋锁来实现demo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <atomic>

class SpinLock {

public:
SpinLock() : flag_(false)
{}

void lock()
{

bool expect = false;
while (!flag_.compare_exchange_weak(expect, true))
{
//这里一定要将expect复原,执行失败时expect结果是未定的
expect = false;
}
}

void unlock()
{

flag_.store(false);
}

private:
std::atomic<bool> flag_;
};

//#define LOCK_TYPE std::mutex
#define LOCK_TYPE SpinLock

如上,将 std::mutex 用自定义的自旋锁替换掉,然后使用 std::condition_variable_any 来做条件变量,可以接受 std::unique_lock<SpinLock> 来作为参数。

☞ 参与评论