接上篇 c++标准库:c++标准库:并发(二) —— 低层接口 thread 和 promise

在(一)和(二)中,说了一些方法可以在c++中做到并发执行任务,然而并发执行任务时某些情况可能会导致未知错误:

  • 未同步化的数据:多个并发同时读,不知道哪个先来
  • 写到一半的数据:另一个线程读取到写了一半的数据
  • 重排的语句:语句执行顺序被编译器重新安排,在多线程中造成的异常。

这三个场景 【很关键】 ,是此篇与后面几篇为什么要做线程同步的基础问题来源。

其中第三点有个说明,是因为编译器在保证单线程中执行的结果不受影响的前提下,允许对语句的执行顺序进行重排优化。例如

1
2
data = 42;
readyFlag = true;

在单线程中执行时,真实执行的顺序可能是

1
2
readyFlag = true;
data = 42;

所以当另一个线程通过

1
2
while(!readyFlag) {}
foo(data);

来访问data的值时,可能拿到的是 data=42 被执行之前的值。因为赋值顺序的改变不影响他们在各自单线程中的结果,编译器认为执行语句可以重排。

标准库提供了一些api,来保证任务的执行顺序是可控的,或者是原子性的,来规避上面的问题。

mutex互斥锁

首先定义一个 std::mutex 互斥锁对象,和两个函数。其中一个函数直接执行,另一个函数在执行开始前加互斥锁,执行结束后释放互斥锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
std::mutex mutex;
void run(const std::string msg) {
int i = 5;
while (i>0)
{
i--;
std::cout << "thread id:" << std::this_thread::get_id() << " msg:" << msg << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 1000));
}
}

void runWithLock(const std::string msg){
mutex.lock();
run(msg);
mutex.unlock();
}

测试不用mutex和用mutex的函数在独立线程中并发执行,t1和t2交替输出,t3和t4顺序输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
void TestMutex(){

std::thread t1(run, "test mutex1");
std::thread t2(run, "test mutex2");
t1.join();
t2.join();
std::cout << "no mutex job done!" << std::endl;
std::thread t3(runWithLock, "test mutex3");
std::thread t4(runWithLock, "test mutex4");
t3.join();
t4.join();
std::cout << " mutex job done!" << std::endl;
}

std::lock_guard自动管理生命周期

上面的例子中可以看到,加锁的代码块执行完毕后需要手动释放互斥锁,当代码在执行过程中抛出异常后,可能锁一直都不会被释放。
为了避免编码上的失误,可以使用 std::lock_guard 来自动管理锁的生命周期,在析构时自动unlock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void runWithLockGuard(const std::string msg){
//lock_guard可以在局部作用于内自动管理对mutex的lock,结束后自动释放
run(msg);
{
std::lock_guard<std::mutex> lg(mutex);
run(msg);
}
}
void TestLockGuard(){
std::thread t1(runWithLockGuard, "test mutex1");
std::thread t2(runWithLockGuard, "test mutex2");
t1.join();
t2.join();
std::cout << " lock_guard job done!" << std::endl;
}

std::recursive_mutex 递归锁 — 防止同一线程死锁

下面的例子中,获取了一次锁,然后在获取锁后尝试第二次获取锁,就会造成死锁的情况。
可以使用std::recursive_mutex,允许同一个线程内对锁多次lock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
std::recursive_mutex r_mutex;
void runWithRecursiveMutex(){
std::lock_guard<std::recursive_mutex> lg(r_mutex);
std::cout << "runWithRecursiveMutex" << std::endl;
}

//recursive_mutex 递归锁,解决递归死锁问题
void TestRecursiveMutex(){

// 这里存在对mutex的递归lock,当runWithLockGuard运行到需要获取锁的地方时,会形成死锁
// std::lock_guard<std::mutex> lg(mutex);
// std::cout << "test no recursive mutex" << std::endl;
// runWithLockGuard("test mutex");


//rescursive_mutex,允许 !同一线程! 对rescursive_mutex多次lock,在最后一次的unlcok处释放
// 跨线程还是会死锁的
std::lock_guard<std::recursive_mutex> lg(r_mutex);
std::cout << "test recursive mutex" << std::endl;
runWithRecursiveMutex();
// 这里即使用rescursive_mutex,也会形成死锁,因为跨线程了
// std::thread t1(runWithRecursiveMutex);
// t1.join();
std::cout << "job done!" << std::endl;
}

try_lock / try_lock_for / try_lock_until 控制获取锁失败的情况

可以使用 try_lock / try_lock_for 来处理获取锁失败 / 一段时间内获取锁失败的情况。

  • try_lock 成功了,还想使用 lock_guard 来管理生命周期的话,需要给 lock_guard 传入一个 std::adopt_lock,告诉 lock_guard 不要重复获取锁了。
  • try_lock 可能存在 “假性失败” 的情况,可以考虑用重试来处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//try_lock 和 lock_for,控制获取锁失败的情况
void TestTryLockAndLockFor(){
//加锁失败放弃执行
{
mutex.lock();
if(mutex.try_lock() == true){
run("try_lock success");
} else {
std::cout << "try_lock fail" << std::endl;
}
mutex.unlock();
}

//加锁成功,用std::adopt_lock使用lock_guard
{
if(mutex.try_lock() == true){
//这里因为在try_lock的时候已经lock了,所以用lock_guard要传入一个adopt_lock
std::lock_guard<std::mutex>(mutex, std::adopt_lock);
run("try_lock success");
//std::cout << "try_lock success" << std::endl;
} else {
std::cout << "try_lock fail" << std::endl;
}
}

// try_lock_for/try_lock_until等待一定时间,需要使用 timed_mutex或者 rescursive_timed_mutex
// 这里要注意的是 gcc4.8下运行此代码, t_mutex.try_lock_for总是会立即返回失败,不符合预期中的等待30s
// 据说高版本gcc上才能比较好的支持c++11的特性 https://bbs.csdn.net/topics/397728258
{
std::thread t(runWithTimedLock, "lock t_mutex");
//sleep 1s,确保lock线程先起来,好让waitfor真的进入等待状态
std::this_thread::sleep_for(std::chrono::seconds(1));
std::thread t2([]{
if(t_mutex.try_lock_for(std::chrono::seconds(30)) == true){
std::lock_guard<std::timed_mutex> lg(t_mutex, std::adopt_lock);
run("try_lock_for success");
} else {
std::cout << "try_lock_for fail" << std::endl;
}
});
t.join();
t2.join();
}
}

同时锁定多个

有时候需要在一个代码块里边获取多个锁后执行逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
std::mutex m1, m2;
// 同时锁定多个mutex
void TestMultiMutex(){
//上面的lock、try_lock,都能应用于多个mutex,只有都成功时才一起lock所有的mutex。其中try_lock在都lock成功时返回-1,若不成功,则哪个不成功就返回其索引
std::lock(m1, m2);
std::lock_guard<std::mutex>(m1, std::adopt_lock);
std::lock_guard<std::mutex>(m2, std::adopt_lock);
//Do Something
//使用try_lock的话,会返回第一个获取锁失败的index,如果全部成功,会返回-1
int failIndex = std::try_lock(m1, m2);
if(failIndex < 0){
std::lock_guard<std::mutex>(m1, std::adopt_lock);
std::lock_guard<std::mutex>(m2, std::adopt_lock);
} else {
// failIndex 就是第一个失败的mutex的索引
// 这里不需要手动释放锁定成功的mutex,因为一旦有锁定失败的,之前锁定成功的也会自动释放
}
}

std::unique_lock

std::unique_lock作用与std::lock_guard类似,也会在生命周期结束时自动unlock。
区别是可控性更强:

  • 生命周期期间不一定lock住了一个mutex,可以用owns_lock(),或者bool()来查询当前是否锁住了
  • 多了一个构造函数:可以与try_to_lock配合使用, std::unique_lock< std::mutex> ul(mutex, std::try_to_lock),尝试获取lock,但是不阻塞
  • 多了一个构造函数:可以传入一个时间,类似try_lock_for,尝试在一段时间内锁定(待验证,也许gcc4.8不行)std::unique_lock< std::timed_mutex> ul(t_mutex, std::chrono::seconds(1))
  • 多了一个构造函数:可以传入defer_lock,初始化但不锁住,后续手动锁,std::unique_lock< std::mutex> ul(mutex, std::defer_lock), 后续ul.lock()
  • 生命周期结束时,若是获得锁的状态,则自动释放锁;若没有获得,则啥都不做
  • 提供release来释放,或者转移lock,此函数返回一个指向关联的mutex,并且unlock它;与之相对,ul.mutex()返回mutex的指针,但是不unlock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
std::mutex readyFlagMutex;
bool readyFlag;
void prepareThread(){
//做某种准备工作,花费5秒钟
std::this_thread::sleep_for(std::chrono::seconds(5));
//加锁然后将readyFlag设置为true,代表已经准备好了
std::lock_guard<std::mutex> lg(readyFlagMutex);
readyFlag = true;
//设置完毕后,lock_guard生命周期结束,自动释放锁
}

void waitForPrepareThread(){
{
//在此作用域内,声明一个unique_lock,阻塞并尝试获取锁
std::unique_lock<std::mutex> ul(readyFlagMutex);
//死循环等待readyFlag变为true
while (!readyFlag)
{
//如果还没有变为true,就sleep 500ms,把cpu让出来让程序去做准备工作或者其它事情
//这里注意在等待之前,一定要把锁释放掉,不然prepare thread可能因为此处占着锁而没办法修改readyFlag,造成死锁
ul.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << " wait for prepare... " << std::endl;
//sleep完成后,再次加锁判断,直到readyFlag变为true
ul.lock();
}
//注意:当readyFlag变为true时,ul到这里还是拥有锁的状态,随着它生命周期结束,锁会被自动释放
}
std::cout << " prepare job done, run something! " << std::endl;
};

void TestUniqueLock(){
std::thread t1(prepareThread);
std::thread t2(waitForPrepareThread);
t1.join();
t2.join();
}

上面实现了一个利用unique_lock拥有一个不一定处于锁住状态的互斥锁的特性,实现了一个线程等待另一个线程的操作。
但是这种实现还是听不优雅的,主要在于这个等待sleep的时间,步子小了太娘炮,步子大了扯着蛋。
后面会提到的基于事件驱动的 condition_variable 才是专业做这种事情的。

std::once_flag 只执行一次

多线程中,用下面的方式来判断数据初始化,可能因并发读而引起未知异常

1
2
3
4
5
bool initialized = false;
if(!initialized){
initialized();
initialized = true;
}

可以用来 std::once_flag 控制多线程环境下的数据初始化,避免出现数据并发读冲突问题。

1
2
std::once_flag oc;
std::call_once(oc, callable);

☞ 参与评论