上篇中讲到,C++11的标准库提供了promise用于在线程执行的具体方法中返回数据,接收端通过future阻塞获取。这么做的前提是你可以修改方法的参数,或者说你需要写一个包装函数。想要让既有函数异步的话,你可以使用packaged_task类或者async方法。
具体分析之前,以下代码是在线程中需要执行的方法。
MyString some_function() {
return MyString{"foo"};
}
MyString是很早之前自己用来查看copy/move次数的类,不想用的话,可以替换为std::string。
packaged_task
packaged_task是一个封装了被调用的函数的task。注意,packaged_task本身并不提供异步执行的机制,所以你仍旧需要把packaged_task放到thread中去执行。
std::packaged_task<MyString()> task{some_function};
std::future<MyString> future = task.get_future();
std::thread task_thread{std::move(task)};
task_thread.join();
MyString string = future.get();
std::cout << string << std::endl;
从设计角度来说,packaged_task是桥接了被包装的函数(这里是some_function)和future,同时又能被thread执行的一个类,所以实现上,需要能执行(重载operator()),持有被包装函数的引用或指针。
为了进一步理解,考虑实现一个单一所有权的MyTask
template<class C>
class MyTask;
template<class R, class ...Args>
class MyTask<R(Args...)> {
std::function<R(Args...)> function_;
MyFuture<R> future_;
public:
template<class F>
MyTask(F &&f) : function_{std::forward<F>(f)} {}
// no copy
MyTask(const MyTask &) = delete;
MyTask &operator=(const MyTask &) = delete;
// move is ok
MyTask(MyTask &&task) :
function_{std::move(task.function_)},
future_{std::move(task.future_)} {
}
MyTask &operator=(MyTask &&) = delete;
MyFuture<R> get_future() {
return future_;
}
void operator()(Args... &&args) {
future_.set(function_(std::forward<Args>(args)...));
}
};
注意,如果你要实现类似packaged_task的模版类的话,你需要一个只有一个参数的模版,然后再是一个R(Args…)的模版。不这么做的话你会得到一个编译错误。从模版类角度来说,第二个模版类是第一个的具体化。
packaged_task用内部自己的方式存储了被包装函数的指针,这里使用std::function代替。
future使用前一节的MyFuture,支持复制和转移。
MyTask<MyString()> task{some_function};
MyFuture<MyString> future = task.get_future();
std::thread task_thread{std::move(task)};
task_thread.join();
MyString string = future.get();
std::cout << string << std::endl;
执行上述代码,结果和packaged_task的类似。
作为参考,packaged_task的实际代码中,保存了function和promise,整体结构和MyTask类似。
async
虽然packaged_task能够包装需要异步执行的函数,但是仍旧需要你自己操作thread。为此C++11的标准库里提供了另外一个方法级别的异步执行工具:async。
std::future<MyString> future = std::async(some_function); MyString string = future.get(); std::cout << string << std::endl;
可以看到,代码量比packaged_task要少,而且直接返回我们需要的future。
虽然async可以用很少的代码异步执行,但是需要考虑
- async是不是异步执行?
- 什么时候执行?
- 是否支持线程池?
- 调用async之后线程会怎么样?
在阅读了async的文档和async本身代码之后的回答
- 是的
- 可以参数指定,async为调用时开始执行,deferred是在调用future的get时执行
- 不支持
- 参数为async时创建线程并detach。deferred并不创建thread,只在第一次调用future的get时在调用线程中执行。严格来说,deferred不算异步调用
理解以上几点对使用async方法很重要。
从行为上来看,async并不属于packaged_task的封装版。而且从所有权上来看,被包含的function必须被async的返回值future所持有。
实际代码其实也是这样设计的。具体来说,由于future必须允许复制,future持有一个关联状态。这个关联状态拥有类似shared_ptr的行为,比如说之前的MyFutureInner。在保持MyFutureInner行为的同时,增加一个function字段,并且启动一个线程调用MyFutureInner的set方法就可以实现async。换句话说,需要从MyFutureInner派生一个子类。以下是实现
template<class R>
class MyFutureInner {
protected:
R value_;
bool value_set_ = false;
std::mutex mutex_;
std::condition_variable condition_;
std::atomic_int count_;
public:
MyFutureInner() : count_{1} {}
void increase_count() {
count_.fetch_add(1, std::memory_order_relaxed);
}
int decrease_count() {
return count_.fetch_sub(1, std::memory_order_acq_rel) - 1;
}
void set(R &&value) {
std::unique_lock<std::mutex> lock{mutex_};
value_ = std::move(value);
value_set_ = true;
condition_.notify_all();
}
virtual R get() {
std::unique_lock<std::mutex> lock{mutex_};
if (!value_set_) {
condition_.wait(lock);
}
return std::move(value_);
}
};
template<class R, class...Args>
class MyFutureInnerWithFunction : public MyFutureInner<R> {
std::function<R(Args...)> function_;
public:
MyFutureInnerWithFunction(std::function<R(Args...)> &&function) : MyFutureInner<R>{}, function_{std::move(function)} {}
void execute() {
this->set(function_());
}
};
template<class R>
class MyFuture {
MyFutureInner<R> *inner_ptr_;
public:
MyFuture() : inner_ptr_{new MyFutureInner<R>{}} {}
explicit MyFuture(MyFutureInner<R> *inner_ptr) : inner_ptr_{inner_ptr} {}
MyFuture(const MyFuture &future) : inner_ptr_{future.inner_ptr_} {
std::cout << "MyFuture(copy)\n";
inner_ptr_->increase_count();
}
MyFuture &operator=(const MyFuture &) = delete;
MyFuture(MyFuture &&future) {
std::cout << "MyFuture(move)\n";
inner_ptr_ = future.inner_ptr_;
future.inner_ptr_ = nullptr;
}
MyFuture &operator=(MyFuture &&) = delete;
R get() {
return inner_ptr_->get();
}
~MyFuture() {
if (inner_ptr_ != nullptr && inner_ptr_->decrease_count() == 0) {
delete inner_ptr_;
}
}
};
注意这里的MyFutureInner和前篇有所不同,成员变量改成可以被子类访问的protected,get方法也加了virtual。
MyFutureInner的子类里增加了成员function,还有一个方法execute。
接下来是async方法的实现
template<class F, class... Args>
auto my_async(F &&f, Args &&... args) -> MyFuture<decltype(f(args...))> {
typedef decltype(f(args...)) R;
std::function<R(Args...)> function{std::forward<F>(f), std::forward<Args>(args)...};
typedef MyFutureInnerWithFunction<R, Args...> FIWF;
std::unique_ptr<FIWF> inner_ptr{new FIWF{std::move(function)}};
std::thread async_thread{&FIWF::execute, inner_ptr.get()};
async_thread.detach();
return MyFuture<R>{inner_ptr.release()};
}
这里方法的签名参考stackoverflow的一个问题。
方法内,用MyFutureInnerWithFunction的指针传入MyFuture。同时,方法内启动一个thread并且detach。这里如果不detach的话,thread在方法结束后会被意外销毁掉,这不是我们想要的,所以必须detach。
执行代码
MyFuture<MyString> future = my_async(some_function); MyString string = future.get(); std::cout << string << std::endl;
结果和async是一样的。
这里考虑一个问题,假设持有MyFuture的调用线程没有调用get直接结束的话会发生什么。由于只有MyFuture持有MyFutureInner的指针,MyFutureInner会被删除。异步线程访问时MyFutureInnerWithFunction会是一个无效的内存地址。以下是再现的代码
MyString some_function() {
std::this_thread::sleep_for(std::chrono::milliseconds{500});
return MyString{"foo"};
}
void test_future() {
MyFuture<MyString> future = my_async(some_function);
}
int main() {
test_future();
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
return 0;
}
这里根本的原因是MyFutureInnerWithFunction拥有者不只是MyFuture,还有异步线程。那样的话,就不能由MyFuture负责删除MyFutureInner,同样也不能由MyFutureInnerWithFunction里面函数负责删除,所以只能让MyFutureInner自己删除自己,也就是delete this。
修改之后的MyFutureInner和MyFutureInnerWithFunction
template<class R>
class MyFutureInner {
protected:
std::atomic_int count_;
R value_;
bool value_set_ = false;
std::mutex mutex_;
std::condition_variable condition_;
public:
MyFutureInner() : count_{1} {}
void increase_count() {
count_.fetch_add(1, std::memory_order_relaxed);
}
void decrease_count() {
if (count_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
on_zero_shared();
}
}
virtual void on_zero_shared() {
delete this;
}
void set(R &&value) {
std::unique_lock<std::mutex> lock{mutex_};
value_ = std::move(value);
value_set_ = true;
condition_.notify_all();
}
virtual R get() {
std::unique_lock<std::mutex> lock{mutex_};
if (!value_set_) {
condition_.wait(lock);
}
return std::move(value_);
}
};
template<class R, class...Args>
class MyFutureInnerWithFunction : public MyFutureInner<R> {
std::function<R(Args...)> function_;
typedef MyFutureInner<R> base;
public:
explicit MyFutureInnerWithFunction(std::function<R(Args...)> &&function)
: MyFutureInner<R>{}, function_{std::move(function)} {
this->increase_count();
}
void execute() {
this->set(function_());
this->decrease_count();
}
};
另外一个解决方案,是在MyFutureInnerWithFunction里覆盖MyFutureInner的decrease_count(或者修改后的on_zero_shared),等待函数完成才能被销毁。
template<class R, class...Args>
class MyFutureInnerWithFunction : public MyFutureInner<R> {
std::function<R(Args...)> function_;
typedef MyFutureInner<R> base;
public:
explicit MyFutureInnerWithFunction(std::function<R(Args...)> &&function)
: MyFutureInner<R>{}, function_{std::move(function)} {
}
void on_zero_shared() {
wait_set();
delete this;
}
void execute() {
this->set(function_());
}
private:
void wait_set() {
std::unique_lock<std::mutex> lock{base::mutex_};
if (!base::value_set_) {
base::condition_.wait(lock);
}
}
};
这也是async方法里面返回的future里的实现。也就是说,async返回的future即使你不调用get,你可以不能直接从包含有future的当前方法返回,你会被阻塞住。如果这不是你要的行为,你可以需要自己写一个类似前一种方法的MyFutureInner,也就是增减shared count。
最后,deferred async由于不涉及异步线程,实现比较简单。
template<class R, class...Args>
class MyFutureInnerWithFunctionDeferred : public MyFutureInner<R> {
bool executed = false;
std::function<R(Args...)> function_;
typedef MyFutureInner<R> base;
public:
explicit MyFutureInnerWithFunctionDeferred(std::function<R(Args...)> &&function)
: MyFutureInner<R>{}, function_{std::move(function)} {
}
R get() {
std::unique_lock<std::mutex> lock{base::mutex_};
if (!executed) {
lock.unlock();
this->set(function_());
executed = true;
} else if (!base::value_set_) {
base::condition_.wait(lock);
}
return std::move(base::value_);
}
};
以及调用代码
template<class F, class... Args>
auto my_async_deferred(F &&f, Args &&... args) -> MyFuture<decltype(f(args...))> {
typedef decltype(f(args...)) R;
std::function<R(Args...)> function{std::forward<F>(f), std::forward<Args>(args)...};
return MyFuture<R>(new MyFutureInnerWithFunctionDeferred<R, Args...>{std::move(function)});
}
int main() {
MyFuture<MyString> future = my_async_deferred(some_function);
std::cout << future.get() << std::endl;
return 0;
}
小结
总得来说,C++11的标准库提供了好几种异步执行的方式,各有各的适用场景。比如说promise适合自己封装异步执行的函数,packaged_task用于封装既有的函数,但是线程调度要自己来做,async看起来最简单,但是你必须理解async返回的future的行为。
不过老实说,async还不是很理想,比如没有线程池,以及future的行为不可调整。但是作为理解如何构建高层次多线程处理很有帮助。
最后,希望本文对各位有帮助。
One response to “【C++11】异步执行之既有函数的包装:packaged_task类和async方法”
感谢分享!已推荐到《开发者头条》:https://toutiao.io/posts/tf8m2s 欢迎点赞支持!使用开发者头条 App 搜索 385148 即可订阅《并发与分布式系统研究》