本篇主要是记录自己在学习C++11下std::thread异步执行时的一些细节性的东西,为之后基于C++11写并发代码打基础。
C++11引入了std::thread。据说之前因为需要区分对待pthread和win下的线程库,代码中有大量的预编译的if else,非常丑陋。现在的话,统一用std::thread就行了。
基于std::thread最简单的异步执行代码。
#include <iostream> #include <thread> void thread_run() { std::cout << "thread run\n"; } int main() { std::thread thread1{thread_run}; thread1.join(); return 0; }
代码创建了一个线程thread1,thread1执行thread_run(立马执行)。main函数创建了线程之后,通过join等待thread1执行结束。
如果你写过基于pthread的代码的话,会发现pthread_t不再需要。因为按照C++的风格,肯定会封装在std::thread中。
(注意,本文不会介绍创建thread有哪些不同的方法,或者延迟执行的方法)
输入
接下来要考虑一个问题是,如何向thread_run传递参数。
在多线程程序里面,主要有
- 按值传递,或者讲复制
- 转移,所有权在新线程中
- 共享(可变或不可变)
第一个不用多说,基础类型,或者对复制不敏感的类型都可以这么做。第二种其实算是独享,好处是这样就不会有共享下的并发修改问题。第三种,很多并发程序潜在有这种要求:超过一个线程共享某个变量,虽然处理起来必须很小心。
具体三种写法如下
#include <iostream> #include <thread> #include <vector> void thread1_run(int n) { } void thread2_run(std::vector<int>&& vector) { } void thread3_run(std::shared_ptr<std::string> vector_ptr) { } int main() { std::thread thread1{thread1_run, 1}; std::vector<int> vector = {1, 2, 3}; std::thread thread2{thread2_run, std::move(vector)}; std::shared_ptr<std::string> vector_ptr{new std::string{"foo"}}; std::thread thread3{thread3_run, vector_ptr}; thread1.join(); thread2.join(); thread3.join(); return 0; }
(注意,本文不会介绍类似向线程传递引用怎么做,请自行参阅std::thread的文档)
请留意代码中传递的地方和接受的函数签名。
第三种使用shared_ptr的方式背后依赖于shared_ptr可以被复制,当然里面的内容不会被复制。
这里特别讲一下第二种转移的时候,参数实际上在到达实际执行的函数前会有大约两次或以上move的原因,顺便了解一下thread的构造函数在做什么。
写过pthread代码的人可能知道,pthread里线程执行的函数签名是这样的
void* some_function(void* arg) { return NULL; }
std::thread在构造过程中,除了把函数名改成函数指针(为了符合pthread_create的函数签名)之外,还需要打包函数和参数,在内部一个模拟上述代码的统一函数中解包然后执行函数,具体来说
- 创建一个decay_copy(forward(function),), decay_copy(forward(arg1))…的元组(tuple),这里是一次move或者copy
- 在内部同一个执行函数中从元组中解包function和arguments,这里是第二次move
- 然后调用用户定义的函数
有兴趣的人可以看一下libc++的实现。
输出
使用std::thread另外要注意的一个问题是如何返回值,和pthread不同,std::thread没法直接返回结果。
C++11提供了几种异步执行的手段:promise,packaged_task和async。这几种手段都支持返回future,一个可以获取线程执行结果的工具。以promise/future为例。
#include <iostream> #include <thread> #include <future> void thread1_run(std::promise<std::string>&& promise) { promise.set_value(std::string{"foo"}); } int main() { std::promise<std::string> promise; std::future<std::string> future = promise.get_future(); std::thread thread1{thread1_run, std::move(promise)}; thread1.join(); std::string string = future.get(); std::cout << string << std::endl; return 0; }
promise作为工作线程(thread1)写入结果的工具,future作为非工作线程读取数据的工具,组合起来达到获取返回值的效果。
这里考虑一下,promise和future是怎么实现的?
在promise端写入的数据,要future端能读出来,说明promise和future共享某个数据结构,里面包含了结果。
同时,一方写入后,被阻塞的一方可以执行,很明显是一种条件变量的行为模式。
为了进一步理解,试着自己写一个模拟实现。
#include <iostream> #include <thread> #include <atomic> #include <mutex> template<class T> class MyFutureInner { std::atomic_int count_; std::mutex mutex_; std::condition_variable condition_; T value_; bool value_set_ = false; public: MyFutureInner() : count_{1} {} int use_count() const { return count_.load(std::memory_order_relaxed); } // get and increase int increase_reference() { return count_.fetch_add(1, std::memory_order_relaxed); } // decrease and get int decrease_reference() { return count_.fetch_add(-1, std::memory_order_acq_rel) - 1; } void set(T &&value) { std::unique_lock<std::mutex> lock{mutex_}; value_ = std::move(value); // move assignment value_set_ = true; condition_.notify_all(); } T move() { std::unique_lock<std::mutex> lock{mutex_}; if (!value_set_) { condition_.wait(lock); } return std::move(value_); // move } T copy() { std::unique_lock<std::mutex> lock{mutex_}; if (!value_set_) { condition_.wait(lock); } return value_; // copy } }; template<class T> class MyFuture { MyFutureInner<T> *inner_ptr_; public: MyFuture() : inner_ptr_{new MyFutureInner<T>} {} // copy MyFuture(const MyFuture &future) : inner_ptr_{future.inner_ptr_} { inner_ptr_->increase_reference(); } MyFuture &operator=(const MyFuture &) = delete; MyFuture(MyFuture &&future) { inner_ptr_ = future.inner_ptr_; future.inner_ptr_ = nullptr; } MyFuture &operator=(MyFuture &&) = delete; int use_count() const { return inner_ptr_->use_count(); } void set(T &&value) { inner_ptr_->set(std::move(value)); } T get(bool copy = false) { return copy ? inner_ptr_->copy() : inner_ptr_->move(); } ~MyFuture() { if (inner_ptr_ != nullptr && inner_ptr_->decrease_reference() == 0) { delete inner_ptr_; } } }; void thread2_run(MyFuture<std::string> future) { future.set(std::string{"foo"}); } int main() { MyFuture<std::string> future; std::thread thread2{thread2_run, future}; std::string result = future.get(); std::cout << result << std::endl; thread2.join(); return 0; }
这里为了简化代码,合并了promise和future的功能为一个类MyFuture。
MyFuture内部持有MyFutureInner的指针,当所有持有MyFutureInner指针的MyFuture被销毁之后,MyFutureInner才会被销毁。行为上类似shared_ptr,当然这里不能直接用shared_ptr。
MyFutureInner实现了shared_ptr的核心行为,增加了set和move/copy几个针对结果的操作。同时内部通过mutex加条件变量的方式,支持阻塞和通知。
除了future之外,MyFuture其实还实现了shared_future的功能。shared_future可以通过调用future的share方法得到。shared_future内部是返回结果的克隆而不是move。在多个线程等待同一个结果时使用。为了避免复制的话,可以使用shared_ptr,具体写法如下
void thread3_run(MyFuture<std::shared_ptr<std::string>> future) { future.set(std::shared_ptr<std::string>{new std::string{"foo"}}); } int main() { MyFuture<std::shared_ptr<std::string>> future; std::thread thread3{thread3_run, future}; std::shared_ptr<std::string> string_ptr1 = future.get(true); thread3.join(); std::shared_ptr<std::string> string_ptr2 = future.get(true); std::cout << *string_ptr1 << std::endl; std::cout << *string_ptr2 << std::endl; return 0; }
如果你有机会阅读libc++的代码的话,可以发现promise/future内部用的也是类似上述代码的机制。当然这里没有处理结果是void(即只需要通知完成)的情况,有机会再考虑。
小结
std::thread是C++11多线程编程比较基础的类,个人觉得学习如何使用std::thread输入输出对之后其他多线程的编程技术会有帮助。希望我的文章对你有用。