[toc]

5.1 线程

  1. 线程创建
    • 直接创建:thread t(worker);
    • 使用移动语义:thread t2(move(t));,线程被move之后,对象t不再代表任何线程
    • 使用bind:thread t3(bind(worker, 1, 2));
    • lambda表达式:thread t4([](int a, doubnle b){}, 1, 2);
  2. 等待线程结束:t.join();
  3. 分离线程:t.detach();,后台执行,但是与主线程失去联系

对每个线程必须进行join或detach,保证线程对象生命周期到主调函数结束之后仍然存在。

线程基本用法

  • 获取线程ID:t.get_id();
  • 获取CPU核数:thread::hardware_concurrency();
  • 线程休眠:this_thread::sleep_for(chrono::seconds(3));

Ref

thread基础:C++ 并发编程(一):创建线程 - SegmentFault 思否:有一个系列


5.2 互斥量

保护多线程同时访问的共享数据

4种锁

  • mutex:独占,不能递归使用
  • timed_mutex:带超时的独占锁,设置超时等待时间,在超时后可以做其他事,使用while循环获取互斥量
  • recursive_mutex:可递归使用,不带超时
  • recursive_timed_mutex:带超时的递归锁

使用锁的方法

  • lock(), unlock():手动加锁解锁,t.lock(); t.unlock();
  • lock_guard<>:析构时自动解锁,lock_guard<mutex> lock(my_mutex);
  • unique_lock<>:可手动解锁,析构时检查是否解锁并自动解锁,unique_lock<mutex> lock(my_mutex);;手动解锁:lock.unlock();;可配合condition_variable使用

timed_mutex比mutex多了两个接口:try_lock_for()try_lock_until()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* @brief 带超时的互斥量
*/

timed_mutex g_timed_mutex;

void worker_timed_mutex() {
chrono::milliseconds timeout(100);

while(true) {
if(g_timed_mutex.try_lock_for(timeout)) {
cout << "获得锁" << endl;
g_timed_mutex.unlock(); // 由于已经获得锁并加锁,需要在此处解锁
} else {
cout << "未获得锁,处理其他事情" << endl;
}
}
}

5.3 条件变量

要配合condition_variable使用,必须使用unique_lock,而不能使用lock_guard,且unique_lock支持手动unlock,避免在整个函数期间占用资源

流程:首先对mutex加锁,若没有获得mutex的访问权限,则解锁,同时wait()阻塞直至等待的信号发生,再获取mutex的访问权限;最后调用notify_onenotify_all唤醒其他线程

两种条件变量

  • condition_variable:配合unique_lock进行wait()操作
  • condition_variable_any:和任意具有lock、unlock语义的mutex搭配使用,效率比condition_variable差

两种调用方式

  • 循环中调用wait
1
2
3
4
5
6
7
8
9
10
condition_variable cv;
mutex g_mutex;

void func() {
unique_locK<mutex> lock(g_mutex);
while( 等待条件 ) {
cv.wait(lock);
}
}

  • wait第二个参数使用lambda表达式,第二个参数相当于循环调用等待条件,返回false则wait函数会阻塞等待至被唤醒
1
2
3
4
5
6
7
condition_variable cv;
mutex g_mutex;

void func() {
unique_locK<mutex> lock(g_mutex);
cv.wait(lock, [](){return 等待条件;});
}

使用while循环等待的原因:线程可能因为超时或虚假唤醒,造成假醒,此时应继续阻塞等待

代码 - 同步队列

  • c++代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* ===========================================================================
* 同步队列
* 使用 mutex 与 condition_variable 实现
* ===========================================================================
*/
template <typename T> class SyncQueue {
private:
inline bool is_full() { return queue_.size() == max_size_; }
inline bool is_empty() { return !queue_.size(); }

public:
SyncQueue(int size = 0) : max_size_(size) {}

void put(const T& x) {
unique_lock<mutex> locker(mutex_);
cv_notfull_.wait(locker, [this]() {
cout << "缓冲区已满,需要等待" << endl;
return !this->is_full();
});
queue_.emplace_back(x);
cv_notempty_.notify_one();
}

void take(T& x) {
unique_lock<mutex> locker(mutex_);
cv_notempty_.wait(locker, [this](){
cout << "缓冲区空了,需要等待" << endl;
return !this->is_empty();
});
x = queue_.front();
queue_.pop_front();
cv_notfull_.notify_one();
}

bool empty() {
lock_guard<mutex> locker(mutex_);
queue_.empty();
}

bool full() {
lock_guard<mutex> locker(mutex_);
return queue_.size() == max_size_;
}

size_t get_size() {
lock_guard<mutex> locker(mutex_);
return queue_.size();
}

private:
list<T> queue_; // 缓冲区
mutex mutex_; // 结合条件变量使用
condition_variable cv_notempty_; // 非空的条件变量
condition_variable cv_notfull_; // 非满的条件变量
int max_size_; // 队列的最大长度
};

5.4 原子变量

使用atomic<T>定义

原子变量定义的数据无需使用mutex限制线程间的互斥访问

原子变量示例 - 原子计数器

1
2
3
4
5
6
7
8
9
10
11
class AtomicCounter {
public:
void increment() { ++value; }

void decrement() { --value; }

int get() { return value.load(); }

private:
atomic<int> value;
};

5.5 call_once / once_flag

call_once可以保证函数在多线程环境下仅被调用一次。使用call_once时需要一个once_flag作为call_once的参数。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* ===========================================================================
* call_once / once_flag
* ===========================================================================
*/
once_flag g_once_flag;

void do_once() {
call_once(g_once_flag, [](){cout << "called once" << endl;});
}

void test_call_once() {
thread t1(do_once);
thread t2(do_once);
thread t3(do_once);

t1.join();
t2.join();
t3.join();
}


输出:
called once


5.6 异步操作类

包括futurepromisepackaged_task。future作为异步结果的传输通道,可以方便地获取线程函数的返回值,promise可以将数据与future绑定,方便线程赋值(如获取线程的返回值,无法直接通过join获取),packaged_task可用来包装可调用对象,将函数与future绑定。

1. future

future提供了获取异步结果的传输通道。可以通过future_status获取异步操作的状态。

future不可拷贝,只能被移动;shared_future可以拷贝,放到容器中时需要使用shared_future。

  • deferred:异步操作未开始
  • ready:异步操作已完成
  • timeout:异步操作超时

获取future结果的方法:

  • get():等待异步操作结束并返回结果
  • wait():等待异步操作结束,没有返回值
  • wait_for():超时等待返回结果

future用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void test_future_status() {
future_status status;
future<int> res = async(launch::async, []() { return 10; });

do {
status = res.wait_for(chrono::seconds(1));
if (status == future_status::deferred) {
cout << "deferred" << endl;
} else if (status == future_status::ready) {
cout << "ready" << endl;
} else if (status == future_status::timeout) {
cout << "timeout" << endl;
}
} while (status != future_status::ready);

cout << "res = " << res.get() << endl;
}

2. promise

ref:C++11多线程-异步运行(1)之std::promise - 简书 (jianshu.com)

将数据与future绑定,便于获取线程中的某个值,在线程中为传进来的promise赋值,线程结束后可通过promise的future获取值。取值是通过promise内部提供的future间接获取的。

promise用法

1
2
3
4
5
6
7
8
9
10
11
12
13
void test_promise() {
promise<int> prom;

// 将promise作为参数传入
thread t([](promise<int>& p) { p.set_value_at_thread_exit(9); }, ref(prom));

// 通过promise获取内部的future并取值
future<int> f = prom.get_future();
auto r = f.get();
cout << "res = " << r << endl;

t.join();
}

3. packaged_task

将函数与future绑定,类似于promise,promise保存的是共享状态的值,而packaged_task保存的是函数。

packaged_task用法

1
2
3
4
5
6
7
8
void test_packaged_task() {
packaged_task<int()> task([]() { return 7; });
thread t(ref(task));
future<int> f = task.get_future();
cout << "res = " << f.get() << endl;

t.join();
}

shared_future用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int worker_test_shared_future(int x) { return x + 2; }

void test_shared_future() {
packaged_task<int(int)> task(worker_test_shared_future);
future<int> fut = task.get_future();

thread(ref(task), 2).detach();

int val = fut.get();
cout << "res = " << val << endl;

// future不能拷贝,vector中只能使用shared_future
vector<shared_future<int>> v;
auto f = async(launch::async, [](int a, int b){return a + b;}, 2, 3);
v.push_back(move(f)); // 需要传入右值
cout << "shared_future res = " << v[0].get() << endl;
}

5.7 线程异步操作函数async

直接创建异步的task,并将任务返回的结果存储在future中。

获取结果可用future.get();,仅等待任务完成使用future.wait();

async的函数原型:async(std::launch::async | std::launch::deferred, f, args...)

  • 第一个参数:线程的创建策略
    • std::launch::async:调用async时便创建线程
    • std::launch::deferred:延迟加载方式创建线程,知道调用get()wait()时才创建线程

async用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void test_async() {
future<int> f1 = async(launch::async, []() { return 1; });
cout << "f1 = " << f1.get() << endl;

future<int> f2 = async(launch::async, []() { return 2; });
f2.wait();

future<int> f3 = async(launch::async, []() {
this_thread::sleep_for(chrono::seconds(3));
return 3;
});
cout << "waiting...\n";
future_status status;

do {
status = f3.wait_for(chrono::seconds(1));
if (status == future_status::deferred) {
cout << "deferred" << endl;
} else if (status == future_status::ready) {
cout << "ready" << endl;
} else if (status == future_status::timeout) {
cout << "timeout" << endl;
}
} while (status != future_status::ready);
cout << "f3 = " << f3.get() << endl;
}

相关代码

  1. 同步队列
  2. 线程池 - c++11实现:ThreadPool.h - ThreadPool [GitHub]
  3. 原子变量示例 - 原子计数器
  4. future用法
  5. promise用法
  6. packaged_task用法
  7. shared_future用法