[toc]
5.1 线程
- 线程创建
- 直接创建:
thread t(worker);
- 使用移动语义:
thread t2(move(t));
,线程被move
之后,对象t
不再代表任何线程
- 使用bind:
thread t3(bind(worker, 1, 2));
- lambda表达式:
thread t4([](int a, doubnle b){}, 1, 2);
- 等待线程结束:
t.join();
- 分离线程:
t.detach();
,后台执行,但是与主线程失去联系
对每个线程必须进行join或detach,保证线程对象生命周期到主调函数结束之后仍然存在。
线程基本用法
- 获取线程ID:
t.get_id();
- 获取CPU核数:
thread::hardware_concurrency();
- 线程休眠:
this_thread::sleep_for(chrono::seconds(3));
Ref
thread基础:C++ 并发编程(一):创建线程 - SegmentFault 思否:有一个系列
5.2 互斥量
保护多线程同时访问的共享数据
4种锁
mutex
:独占,不能递归使用
timed_mutex
:带超时的独占锁,设置超时等待时间,在超时后可以做其他事,使用while循环获取互斥量
recursive_mutex
:可递归使用,不带超时
recursive_timed_mutex
:带超时的递归锁
使用锁的方法
lock()
, unlock()
:手动加锁解锁,t.lock();
t.unlock();
lock_guard<>
:析构时自动解锁,lock_guard<mutex> lock(my_mutex);
unique_lock<>
:可手动解锁,析构时检查是否解锁并自动解锁,unique_lock<mutex> lock(my_mutex);
;手动解锁:lock.unlock();
;可配合condition_variable
使用
timed_mutex比mutex多了两个接口:try_lock_for()
和try_lock_until()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
timed_mutex g_timed_mutex;
void worker_timed_mutex() { chrono::milliseconds timeout(100);
while(true) { if(g_timed_mutex.try_lock_for(timeout)) { cout << "获得锁" << endl; g_timed_mutex.unlock(); } else { cout << "未获得锁,处理其他事情" << endl; } } }
|
5.3 条件变量
要配合condition_variable
使用,必须使用unique_lock
,而不能使用lock_guard,且unique_lock支持手动unlock,避免在整个函数期间占用资源
流程:首先对mutex加锁,若没有获得mutex的访问权限,则解锁,同时wait()
阻塞直至等待的信号发生,再获取mutex的访问权限;最后调用notify_one
或notify_all
唤醒其他线程
两种条件变量
condition_variable
:配合unique_lock进行wait()
操作
condition_variable_any
:和任意具有lock、unlock语义的mutex搭配使用,效率比condition_variable差
两种调用方式
1 2 3 4 5 6 7 8 9 10
| condition_variable cv; mutex g_mutex;
void func() { unique_locK<mutex> lock(g_mutex); while( 等待条件 ) { cv.wait(lock); } }
|
- wait第二个参数使用lambda表达式,第二个参数相当于循环调用等待条件,返回false则wait函数会阻塞等待至被唤醒
1 2 3 4 5 6 7
| condition_variable cv; mutex g_mutex;
void func() { unique_locK<mutex> lock(g_mutex); cv.wait(lock, [](){return 等待条件;}); }
|
使用while循环等待的原因:线程可能因为超时或虚假唤醒,造成假醒,此时应继续阻塞等待
代码 - 同步队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
template <typename T> class SyncQueue { private: inline bool is_full() { return queue_.size() == max_size_; } inline bool is_empty() { return !queue_.size(); }
public: SyncQueue(int size = 0) : max_size_(size) {} void put(const T& x) { unique_lock<mutex> locker(mutex_); cv_notfull_.wait(locker, [this]() { cout << "缓冲区已满,需要等待" << endl; return !this->is_full(); }); queue_.emplace_back(x); cv_notempty_.notify_one(); }
void take(T& x) { unique_lock<mutex> locker(mutex_); cv_notempty_.wait(locker, [this](){ cout << "缓冲区空了,需要等待" << endl; return !this->is_empty(); }); x = queue_.front(); queue_.pop_front(); cv_notfull_.notify_one(); }
bool empty() { lock_guard<mutex> locker(mutex_); queue_.empty(); }
bool full() { lock_guard<mutex> locker(mutex_); return queue_.size() == max_size_; }
size_t get_size() { lock_guard<mutex> locker(mutex_); return queue_.size(); }
private: list<T> queue_; mutex mutex_; condition_variable cv_notempty_; condition_variable cv_notfull_; int max_size_; };
|
5.4 原子变量
使用atomic<T>
定义
原子变量定义的数据无需使用mutex限制线程间的互斥访问
原子变量示例 - 原子计数器
1 2 3 4 5 6 7 8 9 10 11
| class AtomicCounter { public: void increment() { ++value; }
void decrement() { --value; }
int get() { return value.load(); }
private: atomic<int> value; };
|
5.5 call_once / once_flag
call_once
可以保证函数在多线程环境下仅被调用一次。使用call_once
时需要一个once_flag
作为call_once
的参数。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
once_flag g_once_flag;
void do_once() { call_once(g_once_flag, [](){cout << "called once" << endl;}); }
void test_call_once() { thread t1(do_once); thread t2(do_once); thread t3(do_once);
t1.join(); t2.join(); t3.join(); }
输出: called once
|
5.6 异步操作类
包括future
、promise
、packaged_task
。future作为异步结果的传输通道,可以方便地获取线程函数的返回值,promise可以将数据与future绑定,方便线程赋值(如获取线程的返回值,无法直接通过join获取),packaged_task可用来包装可调用对象
,将函数与future绑定。
1. future
future
提供了获取异步结果的传输通道。可以通过future_status获取异步操作的状态。
future
不可拷贝,只能被移动;shared_future
可以拷贝,放到容器中时需要使用shared_future。
- deferred:异步操作未开始
- ready:异步操作已完成
- timeout:异步操作超时
获取future结果的方法:
get()
:等待异步操作结束并返回结果
wait()
:等待异步操作结束,没有返回值
wait_for()
:超时等待返回结果
future用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| void test_future_status() { future_status status; future<int> res = async(launch::async, []() { return 10; });
do { status = res.wait_for(chrono::seconds(1)); if (status == future_status::deferred) { cout << "deferred" << endl; } else if (status == future_status::ready) { cout << "ready" << endl; } else if (status == future_status::timeout) { cout << "timeout" << endl; } } while (status != future_status::ready);
cout << "res = " << res.get() << endl; }
|
2. promise
ref:C++11多线程-异步运行(1)之std::promise - 简书 (jianshu.com)
将数据与future绑定,便于获取线程中的某个值,在线程中为传进来的promise赋值,线程结束后可通过promise的future获取值。取值是通过promise内部提供的future间接获取的。
promise用法
1 2 3 4 5 6 7 8 9 10 11 12 13
| void test_promise() { promise<int> prom; thread t([](promise<int>& p) { p.set_value_at_thread_exit(9); }, ref(prom)); future<int> f = prom.get_future(); auto r = f.get(); cout << "res = " << r << endl;
t.join(); }
|
3. packaged_task
将函数与future绑定,类似于promise,promise保存的是共享状态的值,而packaged_task保存的是函数。
packaged_task用法
1 2 3 4 5 6 7 8
| void test_packaged_task() { packaged_task<int()> task([]() { return 7; }); thread t(ref(task)); future<int> f = task.get_future(); cout << "res = " << f.get() << endl;
t.join(); }
|
shared_future用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| int worker_test_shared_future(int x) { return x + 2; }
void test_shared_future() { packaged_task<int(int)> task(worker_test_shared_future); future<int> fut = task.get_future();
thread(ref(task), 2).detach();
int val = fut.get(); cout << "res = " << val << endl;
vector<shared_future<int>> v; auto f = async(launch::async, [](int a, int b){return a + b;}, 2, 3); v.push_back(move(f)); cout << "shared_future res = " << v[0].get() << endl; }
|
5.7 线程异步操作函数async
直接创建异步的task,并将任务返回的结果存储在future中。
获取结果可用future.get();
,仅等待任务完成使用future.wait();
。
async的函数原型:async(std::launch::async | std::launch::deferred, f, args...)
- 第一个参数:线程的创建策略
std::launch::async
:调用async时便创建线程
std::launch::deferred
:延迟加载方式创建线程,知道调用get()
或wait()
时才创建线程
async用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| void test_async() { future<int> f1 = async(launch::async, []() { return 1; }); cout << "f1 = " << f1.get() << endl;
future<int> f2 = async(launch::async, []() { return 2; }); f2.wait();
future<int> f3 = async(launch::async, []() { this_thread::sleep_for(chrono::seconds(3)); return 3; }); cout << "waiting...\n"; future_status status;
do { status = f3.wait_for(chrono::seconds(1)); if (status == future_status::deferred) { cout << "deferred" << endl; } else if (status == future_status::ready) { cout << "ready" << endl; } else if (status == future_status::timeout) { cout << "timeout" << endl; } } while (status != future_status::ready); cout << "f3 = " << f3.get() << endl; }
|
相关代码
- 同步队列
- 线程池 - c++11实现:ThreadPool.h - ThreadPool [GitHub]
- 原子变量示例 - 原子计数器
- future用法
- promise用法
- packaged_task用法
- shared_future用法