上篇中讲到,C++11的标准库提供了promise用于在线程执行的具体方法中返回数据,接收端通过future阻塞获取。这么做的前提是你可以修改方法的参数,或者说你需要写一个包装函数。想要让既有函数异步的话,你可以使用packaged_task类或者async方法。
具体分析之前,以下代码是在线程中需要执行的方法。
MyString some_function() { return MyString{"foo"}; }
MyString是很早之前自己用来查看copy/move次数的类,不想用的话,可以替换为std::string。
packaged_task
packaged_task是一个封装了被调用的函数的task。注意,packaged_task本身并不提供异步执行的机制,所以你仍旧需要把packaged_task放到thread中去执行。
std::packaged_task<MyString()> task{some_function}; std::future<MyString> future = task.get_future(); std::thread task_thread{std::move(task)}; task_thread.join(); MyString string = future.get(); std::cout << string << std::endl;
从设计角度来说,packaged_task是桥接了被包装的函数(这里是some_function)和future,同时又能被thread执行的一个类,所以实现上,需要能执行(重载operator()),持有被包装函数的引用或指针。
为了进一步理解,考虑实现一个单一所有权的MyTask
template<class C> class MyTask; template<class R, class ...Args> class MyTask<R(Args...)> { std::function<R(Args...)> function_; MyFuture<R> future_; public: template<class F> MyTask(F &&f) : function_{std::forward<F>(f)} {} // no copy MyTask(const MyTask &) = delete; MyTask &operator=(const MyTask &) = delete; // move is ok MyTask(MyTask &&task) : function_{std::move(task.function_)}, future_{std::move(task.future_)} { } MyTask &operator=(MyTask &&) = delete; MyFuture<R> get_future() { return future_; } void operator()(Args... &&args) { future_.set(function_(std::forward<Args>(args)...)); } };
注意,如果你要实现类似packaged_task的模版类的话,你需要一个只有一个参数的模版,然后再是一个R(Args…)的模版。不这么做的话你会得到一个编译错误。从模版类角度来说,第二个模版类是第一个的具体化。
packaged_task用内部自己的方式存储了被包装函数的指针,这里使用std::function代替。
future使用前一节的MyFuture,支持复制和转移。
MyTask<MyString()> task{some_function}; MyFuture<MyString> future = task.get_future(); std::thread task_thread{std::move(task)}; task_thread.join(); MyString string = future.get(); std::cout << string << std::endl;
执行上述代码,结果和packaged_task的类似。
作为参考,packaged_task的实际代码中,保存了function和promise,整体结构和MyTask类似。
async
虽然packaged_task能够包装需要异步执行的函数,但是仍旧需要你自己操作thread。为此C++11的标准库里提供了另外一个方法级别的异步执行工具:async。
std::future<MyString> future = std::async(some_function); MyString string = future.get(); std::cout << string << std::endl;
可以看到,代码量比packaged_task要少,而且直接返回我们需要的future。
虽然async可以用很少的代码异步执行,但是需要考虑
- async是不是异步执行?
- 什么时候执行?
- 是否支持线程池?
- 调用async之后线程会怎么样?
在阅读了async的文档和async本身代码之后的回答
- 是的
- 可以参数指定,async为调用时开始执行,deferred是在调用future的get时执行
- 不支持
- 参数为async时创建线程并detach。deferred并不创建thread,只在第一次调用future的get时在调用线程中执行。严格来说,deferred不算异步调用
理解以上几点对使用async方法很重要。
从行为上来看,async并不属于packaged_task的封装版。而且从所有权上来看,被包含的function必须被async的返回值future所持有。
实际代码其实也是这样设计的。具体来说,由于future必须允许复制,future持有一个关联状态。这个关联状态拥有类似shared_ptr的行为,比如说之前的MyFutureInner。在保持MyFutureInner行为的同时,增加一个function字段,并且启动一个线程调用MyFutureInner的set方法就可以实现async。换句话说,需要从MyFutureInner派生一个子类。以下是实现
template<class R> class MyFutureInner { protected: R value_; bool value_set_ = false; std::mutex mutex_; std::condition_variable condition_; std::atomic_int count_; public: MyFutureInner() : count_{1} {} void increase_count() { count_.fetch_add(1, std::memory_order_relaxed); } int decrease_count() { return count_.fetch_sub(1, std::memory_order_acq_rel) - 1; } void set(R &&value) { std::unique_lock<std::mutex> lock{mutex_}; value_ = std::move(value); value_set_ = true; condition_.notify_all(); } virtual R get() { std::unique_lock<std::mutex> lock{mutex_}; if (!value_set_) { condition_.wait(lock); } return std::move(value_); } }; template<class R, class...Args> class MyFutureInnerWithFunction : public MyFutureInner<R> { std::function<R(Args...)> function_; public: MyFutureInnerWithFunction(std::function<R(Args...)> &&function) : MyFutureInner<R>{}, function_{std::move(function)} {} void execute() { this->set(function_()); } }; template<class R> class MyFuture { MyFutureInner<R> *inner_ptr_; public: MyFuture() : inner_ptr_{new MyFutureInner<R>{}} {} explicit MyFuture(MyFutureInner<R> *inner_ptr) : inner_ptr_{inner_ptr} {} MyFuture(const MyFuture &future) : inner_ptr_{future.inner_ptr_} { std::cout << "MyFuture(copy)\n"; inner_ptr_->increase_count(); } MyFuture &operator=(const MyFuture &) = delete; MyFuture(MyFuture &&future) { std::cout << "MyFuture(move)\n"; inner_ptr_ = future.inner_ptr_; future.inner_ptr_ = nullptr; } MyFuture &operator=(MyFuture &&) = delete; R get() { return inner_ptr_->get(); } ~MyFuture() { if (inner_ptr_ != nullptr && inner_ptr_->decrease_count() == 0) { delete inner_ptr_; } } };
注意这里的MyFutureInner和前篇有所不同,成员变量改成可以被子类访问的protected,get方法也加了virtual。
MyFutureInner的子类里增加了成员function,还有一个方法execute。
接下来是async方法的实现
template<class F, class... Args> auto my_async(F &&f, Args &&... args) -> MyFuture<decltype(f(args...))> { typedef decltype(f(args...)) R; std::function<R(Args...)> function{std::forward<F>(f), std::forward<Args>(args)...}; typedef MyFutureInnerWithFunction<R, Args...> FIWF; std::unique_ptr<FIWF> inner_ptr{new FIWF{std::move(function)}}; std::thread async_thread{&FIWF::execute, inner_ptr.get()}; async_thread.detach(); return MyFuture<R>{inner_ptr.release()}; }
这里方法的签名参考stackoverflow的一个问题。
方法内,用MyFutureInnerWithFunction的指针传入MyFuture。同时,方法内启动一个thread并且detach。这里如果不detach的话,thread在方法结束后会被意外销毁掉,这不是我们想要的,所以必须detach。
执行代码
MyFuture<MyString> future = my_async(some_function); MyString string = future.get(); std::cout << string << std::endl;
结果和async是一样的。
这里考虑一个问题,假设持有MyFuture的调用线程没有调用get直接结束的话会发生什么。由于只有MyFuture持有MyFutureInner的指针,MyFutureInner会被删除。异步线程访问时MyFutureInnerWithFunction会是一个无效的内存地址。以下是再现的代码
MyString some_function() { std::this_thread::sleep_for(std::chrono::milliseconds{500}); return MyString{"foo"}; } void test_future() { MyFuture<MyString> future = my_async(some_function); } int main() { test_future(); std::this_thread::sleep_for(std::chrono::milliseconds{1000}); return 0; }
这里根本的原因是MyFutureInnerWithFunction拥有者不只是MyFuture,还有异步线程。那样的话,就不能由MyFuture负责删除MyFutureInner,同样也不能由MyFutureInnerWithFunction里面函数负责删除,所以只能让MyFutureInner自己删除自己,也就是delete this。
修改之后的MyFutureInner和MyFutureInnerWithFunction
template<class R> class MyFutureInner { protected: std::atomic_int count_; R value_; bool value_set_ = false; std::mutex mutex_; std::condition_variable condition_; public: MyFutureInner() : count_{1} {} void increase_count() { count_.fetch_add(1, std::memory_order_relaxed); } void decrease_count() { if (count_.fetch_sub(1, std::memory_order_acq_rel) == 1) { on_zero_shared(); } } virtual void on_zero_shared() { delete this; } void set(R &&value) { std::unique_lock<std::mutex> lock{mutex_}; value_ = std::move(value); value_set_ = true; condition_.notify_all(); } virtual R get() { std::unique_lock<std::mutex> lock{mutex_}; if (!value_set_) { condition_.wait(lock); } return std::move(value_); } }; template<class R, class...Args> class MyFutureInnerWithFunction : public MyFutureInner<R> { std::function<R(Args...)> function_; typedef MyFutureInner<R> base; public: explicit MyFutureInnerWithFunction(std::function<R(Args...)> &&function) : MyFutureInner<R>{}, function_{std::move(function)} { this->increase_count(); } void execute() { this->set(function_()); this->decrease_count(); } };
另外一个解决方案,是在MyFutureInnerWithFunction里覆盖MyFutureInner的decrease_count(或者修改后的on_zero_shared),等待函数完成才能被销毁。
template<class R, class...Args> class MyFutureInnerWithFunction : public MyFutureInner<R> { std::function<R(Args...)> function_; typedef MyFutureInner<R> base; public: explicit MyFutureInnerWithFunction(std::function<R(Args...)> &&function) : MyFutureInner<R>{}, function_{std::move(function)} { } void on_zero_shared() { wait_set(); delete this; } void execute() { this->set(function_()); } private: void wait_set() { std::unique_lock<std::mutex> lock{base::mutex_}; if (!base::value_set_) { base::condition_.wait(lock); } } };
这也是async方法里面返回的future里的实现。也就是说,async返回的future即使你不调用get,你可以不能直接从包含有future的当前方法返回,你会被阻塞住。如果这不是你要的行为,你可以需要自己写一个类似前一种方法的MyFutureInner,也就是增减shared count。
最后,deferred async由于不涉及异步线程,实现比较简单。
template<class R, class...Args> class MyFutureInnerWithFunctionDeferred : public MyFutureInner<R> { bool executed = false; std::function<R(Args...)> function_; typedef MyFutureInner<R> base; public: explicit MyFutureInnerWithFunctionDeferred(std::function<R(Args...)> &&function) : MyFutureInner<R>{}, function_{std::move(function)} { } R get() { std::unique_lock<std::mutex> lock{base::mutex_}; if (!executed) { lock.unlock(); this->set(function_()); executed = true; } else if (!base::value_set_) { base::condition_.wait(lock); } return std::move(base::value_); } };
以及调用代码
template<class F, class... Args> auto my_async_deferred(F &&f, Args &&... args) -> MyFuture<decltype(f(args...))> { typedef decltype(f(args...)) R; std::function<R(Args...)> function{std::forward<F>(f), std::forward<Args>(args)...}; return MyFuture<R>(new MyFutureInnerWithFunctionDeferred<R, Args...>{std::move(function)}); } int main() { MyFuture<MyString> future = my_async_deferred(some_function); std::cout << future.get() << std::endl; return 0; }
小结
总得来说,C++11的标准库提供了好几种异步执行的方式,各有各的适用场景。比如说promise适合自己封装异步执行的函数,packaged_task用于封装既有的函数,但是线程调度要自己来做,async看起来最简单,但是你必须理解async返回的future的行为。
不过老实说,async还不是很理想,比如没有线程池,以及future的行为不可调整。但是作为理解如何构建高层次多线程处理很有帮助。
最后,希望本文对各位有帮助。
One response to “【C++11】异步执行之既有函数的包装:packaged_task类和async方法”
感谢分享!已推荐到《开发者头条》:https://toutiao.io/posts/tf8m2s 欢迎点赞支持!使用开发者头条 App 搜索 385148 即可订阅《并发与分布式系统研究》