【C++11】异步执行之既有函数的包装:packaged_task类和async方法


上篇中讲到,C++11的标准库提供了promise用于在线程执行的具体方法中返回数据,接收端通过future阻塞获取。这么做的前提是你可以修改方法的参数,或者说你需要写一个包装函数。想要让既有函数异步的话,你可以使用packaged_task类或者async方法。

具体分析之前,以下代码是在线程中需要执行的方法。

MyString some_function() {
    return MyString{"foo"};
}

MyString是很早之前自己用来查看copy/move次数的类,不想用的话,可以替换为std::string。

packaged_task

packaged_task是一个封装了被调用的函数的task。注意,packaged_task本身并不提供异步执行的机制,所以你仍旧需要把packaged_task放到thread中去执行。

std::packaged_task<MyString()> task{some_function};
std::future<MyString> future = task.get_future();

std::thread task_thread{std::move(task)};
task_thread.join();

MyString string = future.get();
std::cout << string << std::endl;

从设计角度来说,packaged_task是桥接了被包装的函数(这里是some_function)和future,同时又能被thread执行的一个类,所以实现上,需要能执行(重载operator()),持有被包装函数的引用或指针。

为了进一步理解,考虑实现一个单一所有权的MyTask

template<class C>
class MyTask;

template<class R, class ...Args>
class MyTask<R(Args...)> {
    std::function<R(Args...)> function_;
    MyFuture<R> future_;
public:
    template<class F>
    MyTask(F &&f) : function_{std::forward<F>(f)} {}

    // no copy
    MyTask(const MyTask &) = delete;

    MyTask &operator=(const MyTask &) = delete;

    // move is ok
    MyTask(MyTask &&task) :
            function_{std::move(task.function_)},
            future_{std::move(task.future_)} {
    }

    MyTask &operator=(MyTask &&) = delete;

    MyFuture<R> get_future() {
        return future_;
    }

    void operator()(Args... &&args) {
        future_.set(function_(std::forward<Args>(args)...));
    }
};

注意,如果你要实现类似packaged_task的模版类的话,你需要一个只有一个参数的模版,然后再是一个R(Args…)的模版。不这么做的话你会得到一个编译错误。从模版类角度来说,第二个模版类是第一个的具体化。

packaged_task用内部自己的方式存储了被包装函数的指针,这里使用std::function代替。

future使用前一节的MyFuture,支持复制和转移。

MyTask<MyString()> task{some_function};
MyFuture<MyString> future = task.get_future();

std::thread task_thread{std::move(task)};
task_thread.join();

MyString string = future.get();
std::cout << string << std::endl;

执行上述代码,结果和packaged_task的类似。

作为参考,packaged_task的实际代码中,保存了function和promise,整体结构和MyTask类似。

async

虽然packaged_task能够包装需要异步执行的函数,但是仍旧需要你自己操作thread。为此C++11的标准库里提供了另外一个方法级别的异步执行工具:async。

std::future<MyString> future = std::async(some_function);
MyString string = future.get();
std::cout << string << std::endl;

可以看到,代码量比packaged_task要少,而且直接返回我们需要的future。

虽然async可以用很少的代码异步执行,但是需要考虑

  1. async是不是异步执行?
  2. 什么时候执行?
  3. 是否支持线程池?
  4. 调用async之后线程会怎么样?

在阅读了async的文档和async本身代码之后的回答

  1. 是的
  2. 可以参数指定,async为调用时开始执行,deferred是在调用future的get时执行
  3. 不支持
  4. 参数为async时创建线程并detach。deferred并不创建thread,只在第一次调用future的get时在调用线程中执行。严格来说,deferred不算异步调用

理解以上几点对使用async方法很重要。

从行为上来看,async并不属于packaged_task的封装版。而且从所有权上来看,被包含的function必须被async的返回值future所持有。

实际代码其实也是这样设计的。具体来说,由于future必须允许复制,future持有一个关联状态。这个关联状态拥有类似shared_ptr的行为,比如说之前的MyFutureInner。在保持MyFutureInner行为的同时,增加一个function字段,并且启动一个线程调用MyFutureInner的set方法就可以实现async。换句话说,需要从MyFutureInner派生一个子类。以下是实现

template<class R>
class MyFutureInner {
protected:
    R value_;
    bool value_set_ = false;
    std::mutex mutex_;
    std::condition_variable condition_;
    std::atomic_int count_;
public:
    MyFutureInner() : count_{1} {}

    void increase_count() {
        count_.fetch_add(1, std::memory_order_relaxed);
    }

    int decrease_count() {
        return count_.fetch_sub(1, std::memory_order_acq_rel) - 1;
    }

    void set(R &&value) {
        std::unique_lock<std::mutex> lock{mutex_};
        value_ = std::move(value);
        value_set_ = true;
        condition_.notify_all();
    }

    virtual R get() {
        std::unique_lock<std::mutex> lock{mutex_};
        if (!value_set_) {
            condition_.wait(lock);
        }
        return std::move(value_);
    }
};

template<class R, class...Args>
class MyFutureInnerWithFunction : public MyFutureInner<R> {
    std::function<R(Args...)> function_;
public:
    MyFutureInnerWithFunction(std::function<R(Args...)> &&function) : MyFutureInner<R>{}, function_{std::move(function)} {}

    void execute() {
        this->set(function_());
    }
};

template<class R>
class MyFuture {
    MyFutureInner<R> *inner_ptr_;
public:
    MyFuture() : inner_ptr_{new MyFutureInner<R>{}} {}

    explicit MyFuture(MyFutureInner<R> *inner_ptr) : inner_ptr_{inner_ptr} {}

    MyFuture(const MyFuture &future) : inner_ptr_{future.inner_ptr_} {
        std::cout << "MyFuture(copy)\n";
        inner_ptr_->increase_count();
    }

    MyFuture &operator=(const MyFuture &) = delete;

    MyFuture(MyFuture &&future) {
        std::cout << "MyFuture(move)\n";
        inner_ptr_ = future.inner_ptr_;
        future.inner_ptr_ = nullptr;
    }

    MyFuture &operator=(MyFuture &&) = delete;

    R get() {
        return inner_ptr_->get();
    }

    ~MyFuture() {
        if (inner_ptr_ != nullptr && inner_ptr_->decrease_count() == 0) {
            delete inner_ptr_;
        }
    }
};

注意这里的MyFutureInner和前篇有所不同,成员变量改成可以被子类访问的protected,get方法也加了virtual。

MyFutureInner的子类里增加了成员function,还有一个方法execute。

接下来是async方法的实现

template<class F, class... Args>
auto my_async(F &&f, Args &&... args) -> MyFuture<decltype(f(args...))> {
    typedef decltype(f(args...)) R;
    std::function<R(Args...)> function{std::forward<F>(f), std::forward<Args>(args)...};
    typedef MyFutureInnerWithFunction<R, Args...> FIWF;
    std::unique_ptr<FIWF> inner_ptr{new FIWF{std::move(function)}};
    std::thread async_thread{&FIWF::execute, inner_ptr.get()};
    async_thread.detach();
    return MyFuture<R>{inner_ptr.release()};
}

这里方法的签名参考stackoverflow的一个问题

方法内,用MyFutureInnerWithFunction的指针传入MyFuture。同时,方法内启动一个thread并且detach。这里如果不detach的话,thread在方法结束后会被意外销毁掉,这不是我们想要的,所以必须detach。

执行代码

MyFuture<MyString> future = my_async(some_function);
MyString string = future.get();
std::cout << string << std::endl;

结果和async是一样的。

这里考虑一个问题,假设持有MyFuture的调用线程没有调用get直接结束的话会发生什么。由于只有MyFuture持有MyFutureInner的指针,MyFutureInner会被删除。异步线程访问时MyFutureInnerWithFunction会是一个无效的内存地址。以下是再现的代码

MyString some_function() {
    std::this_thread::sleep_for(std::chrono::milliseconds{500});
    return MyString{"foo"};
}

void test_future() {
    MyFuture<MyString> future = my_async(some_function);
}

int main() {
    test_future();
    std::this_thread::sleep_for(std::chrono::milliseconds{1000});
    return 0;
}

这里根本的原因是MyFutureInnerWithFunction拥有者不只是MyFuture,还有异步线程。那样的话,就不能由MyFuture负责删除MyFutureInner,同样也不能由MyFutureInnerWithFunction里面函数负责删除,所以只能让MyFutureInner自己删除自己,也就是delete this。

修改之后的MyFutureInner和MyFutureInnerWithFunction

template<class R>
class MyFutureInner {
protected:
    std::atomic_int count_;
    R value_;
    bool value_set_ = false;
    std::mutex mutex_;
    std::condition_variable condition_;
public:
    MyFutureInner() : count_{1} {}

    void increase_count() {
        count_.fetch_add(1, std::memory_order_relaxed);
    }

    void decrease_count() {
        if (count_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
            on_zero_shared();
        }
    }

    virtual void on_zero_shared() {
        delete this;
    }

    void set(R &&value) {
        std::unique_lock<std::mutex> lock{mutex_};
        value_ = std::move(value);
        value_set_ = true;
        condition_.notify_all();
    }

    virtual R get() {
        std::unique_lock<std::mutex> lock{mutex_};
        if (!value_set_) {
            condition_.wait(lock);
        }
        return std::move(value_);
    }
};

template<class R, class...Args>
class MyFutureInnerWithFunction : public MyFutureInner<R> {
    std::function<R(Args...)> function_;

    typedef MyFutureInner<R> base;
public:
    explicit MyFutureInnerWithFunction(std::function<R(Args...)> &&function)
            : MyFutureInner<R>{}, function_{std::move(function)} {
       this->increase_count();
    }

    void execute() {
        this->set(function_());
       this->decrease_count();
    }
};

另外一个解决方案,是在MyFutureInnerWithFunction里覆盖MyFutureInner的decrease_count(或者修改后的on_zero_shared),等待函数完成才能被销毁。

template<class R, class...Args>
class MyFutureInnerWithFunction : public MyFutureInner<R> {
    std::function<R(Args...)> function_;

    typedef MyFutureInner<R> base;
public:
    explicit MyFutureInnerWithFunction(std::function<R(Args...)> &&function)
            : MyFutureInner<R>{}, function_{std::move(function)} {
    }

    void on_zero_shared() {
        wait_set();
        delete this;
    }

    void execute() {
        this->set(function_());
    }

private:
    void wait_set() {
        std::unique_lock<std::mutex> lock{base::mutex_};
        if (!base::value_set_) {
            base::condition_.wait(lock);
        }
    }
};

这也是async方法里面返回的future里的实现。也就是说,async返回的future即使你不调用get,你可以不能直接从包含有future的当前方法返回,你会被阻塞住。如果这不是你要的行为,你可以需要自己写一个类似前一种方法的MyFutureInner,也就是增减shared count。

最后,deferred async由于不涉及异步线程,实现比较简单。

template<class R, class...Args>
class MyFutureInnerWithFunctionDeferred : public MyFutureInner<R> {
    bool executed = false;
    std::function<R(Args...)> function_;

    typedef MyFutureInner<R> base;
public:
    explicit MyFutureInnerWithFunctionDeferred(std::function<R(Args...)> &&function)
            : MyFutureInner<R>{}, function_{std::move(function)} {
    }

    R get() {
        std::unique_lock<std::mutex> lock{base::mutex_};
        if (!executed) {
            lock.unlock();
            this->set(function_());
            executed = true;
        } else if (!base::value_set_) {
            base::condition_.wait(lock);
        }
        return std::move(base::value_);
    }
};

以及调用代码

template<class F, class... Args>
auto my_async_deferred(F &&f, Args &&... args) -> MyFuture<decltype(f(args...))> {
    typedef decltype(f(args...)) R;
    std::function<R(Args...)> function{std::forward<F>(f), std::forward<Args>(args)...};
    return MyFuture<R>(new MyFutureInnerWithFunctionDeferred<R, Args...>{std::move(function)});
}

int main() {
    MyFuture<MyString> future = my_async_deferred(some_function);
    std::cout << future.get() << std::endl;
    return 0;
}

小结

总得来说,C++11的标准库提供了好几种异步执行的方式,各有各的适用场景。比如说promise适合自己封装异步执行的函数,packaged_task用于封装既有的函数,但是线程调度要自己来做,async看起来最简单,但是你必须理解async返回的future的行为。

不过老实说,async还不是很理想,比如没有线程池,以及future的行为不可调整。但是作为理解如何构建高层次多线程处理很有帮助。

最后,希望本文对各位有帮助。


One response to “【C++11】异步执行之既有函数的包装:packaged_task类和async方法”

  1. 感谢分享!已推荐到《开发者头条》:https://toutiao.io/posts/tf8m2s 欢迎点赞支持!使用开发者头条 App 搜索 385148 即可订阅《并发与分布式系统研究》