【C++11】基于std::thread异步执行时的输入输出


本篇主要是记录自己在学习C++11下std::thread异步执行时的一些细节性的东西,为之后基于C++11写并发代码打基础。

C++11引入了std::thread。据说之前因为需要区分对待pthread和win下的线程库,代码中有大量的预编译的if else,非常丑陋。现在的话,统一用std::thread就行了。

基于std::thread最简单的异步执行代码。

#include <iostream>
#include <thread>

void thread_run() {
    std::cout << "thread run\n";
}

int main() {
    std::thread thread1{thread_run};
    thread1.join();
    return 0;
}

代码创建了一个线程thread1,thread1执行thread_run(立马执行)。main函数创建了线程之后,通过join等待thread1执行结束。

如果你写过基于pthread的代码的话,会发现pthread_t不再需要。因为按照C++的风格,肯定会封装在std::thread中。

(注意,本文不会介绍创建thread有哪些不同的方法,或者延迟执行的方法)

输入

接下来要考虑一个问题是,如何向thread_run传递参数。

在多线程程序里面,主要有

  1. 按值传递,或者讲复制
  2. 转移,所有权在新线程中
  3. 共享(可变或不可变)

第一个不用多说,基础类型,或者对复制不敏感的类型都可以这么做。第二种其实算是独享,好处是这样就不会有共享下的并发修改问题。第三种,很多并发程序潜在有这种要求:超过一个线程共享某个变量,虽然处理起来必须很小心。

具体三种写法如下

#include <iostream>
#include <thread>
#include <vector>

void thread1_run(int n) {
}

void thread2_run(std::vector<int>&& vector) {
}

void thread3_run(std::shared_ptr<std::string> vector_ptr) {
}

int main() {
    std::thread thread1{thread1_run, 1};

    std::vector<int> vector = {1, 2, 3};
    std::thread thread2{thread2_run, std::move(vector)};

    std::shared_ptr<std::string> vector_ptr{new std::string{"foo"}};
    std::thread thread3{thread3_run, vector_ptr};

    thread1.join();
    thread2.join();
    thread3.join();
    return 0;
}

(注意,本文不会介绍类似向线程传递引用怎么做,请自行参阅std::thread的文档)

请留意代码中传递的地方和接受的函数签名。

第三种使用shared_ptr的方式背后依赖于shared_ptr可以被复制,当然里面的内容不会被复制。

这里特别讲一下第二种转移的时候,参数实际上在到达实际执行的函数前会有大约两次或以上move的原因,顺便了解一下thread的构造函数在做什么。

写过pthread代码的人可能知道,pthread里线程执行的函数签名是这样的

void* some_function(void* arg) {
    return NULL;
}

std::thread在构造过程中,除了把函数名改成函数指针(为了符合pthread_create的函数签名)之外,还需要打包函数和参数,在内部一个模拟上述代码的统一函数中解包然后执行函数,具体来说

  • 创建一个decay_copy(forward(function),), decay_copy(forward(arg1))…的元组(tuple),这里是一次move或者copy
  • 在内部同一个执行函数中从元组中解包function和arguments,这里是第二次move
  • 然后调用用户定义的函数

有兴趣的人可以看一下libc++的实现。

输出

使用std::thread另外要注意的一个问题是如何返回值,和pthread不同,std::thread没法直接返回结果。

C++11提供了几种异步执行的手段:promise,packaged_task和async。这几种手段都支持返回future,一个可以获取线程执行结果的工具。以promise/future为例。

#include <iostream>
#include <thread>
#include <future>

void thread1_run(std::promise<std::string>&& promise) {
    promise.set_value(std::string{"foo"});
}

int main() {
    std::promise<std::string> promise;
    std::future<std::string> future = promise.get_future();
    std::thread thread1{thread1_run, std::move(promise)};
    thread1.join();
    std::string string = future.get();
    std::cout << string << std::endl;
    return 0;
}

promise作为工作线程(thread1)写入结果的工具,future作为非工作线程读取数据的工具,组合起来达到获取返回值的效果。

这里考虑一下,promise和future是怎么实现的?

在promise端写入的数据,要future端能读出来,说明promise和future共享某个数据结构,里面包含了结果。

同时,一方写入后,被阻塞的一方可以执行,很明显是一种条件变量的行为模式。

为了进一步理解,试着自己写一个模拟实现。

#include <iostream>
#include <thread>
#include <atomic>
#include <mutex>

template<class T>
class MyFutureInner {
    std::atomic_int count_;
    std::mutex mutex_;
    std::condition_variable condition_;
    T value_;
    bool value_set_ = false;
public:
    MyFutureInner() : count_{1} {}

    int use_count() const {
        return count_.load(std::memory_order_relaxed);
    }

    // get and increase
    int increase_reference() {
        return count_.fetch_add(1, std::memory_order_relaxed);
    }

    // decrease and get
    int decrease_reference() {
        return count_.fetch_add(-1, std::memory_order_acq_rel) - 1;
    }

    void set(T &&value) {
        std::unique_lock<std::mutex> lock{mutex_};
        value_ = std::move(value); // move assignment
        value_set_ = true;
        condition_.notify_all();
    }

    T move() {
        std::unique_lock<std::mutex> lock{mutex_};
        if (!value_set_) {
            condition_.wait(lock);
        }
        return std::move(value_); // move
    }

    T copy() {
        std::unique_lock<std::mutex> lock{mutex_};
        if (!value_set_) {
            condition_.wait(lock);
        }
        return value_; // copy
    }
};

template<class T>
class MyFuture {
    MyFutureInner<T> *inner_ptr_;
public:
    MyFuture() : inner_ptr_{new MyFutureInner<T>} {}

    // copy
    MyFuture(const MyFuture &future) : inner_ptr_{future.inner_ptr_} {
        inner_ptr_->increase_reference();
    }

    MyFuture &operator=(const MyFuture &) = delete;

    MyFuture(MyFuture &&future) {
        inner_ptr_ = future.inner_ptr_;
        future.inner_ptr_ = nullptr;
    }

    MyFuture &operator=(MyFuture &&) = delete;

    int use_count() const {
        return inner_ptr_->use_count();
    }

    void set(T &&value) {
        inner_ptr_->set(std::move(value));
    }

    T get(bool copy = false) {
        return copy ? inner_ptr_->copy() : inner_ptr_->move();
    }

    ~MyFuture() {
        if (inner_ptr_ != nullptr && inner_ptr_->decrease_reference() == 0) {
            delete inner_ptr_;
        }
    }
};

void thread2_run(MyFuture<std::string> future) {
    future.set(std::string{"foo"});
}

int main() {
    MyFuture<std::string> future;
    std::thread thread2{thread2_run, future};
    std::string result = future.get();
    std::cout << result << std::endl;
    thread2.join();
    return 0;
}

这里为了简化代码,合并了promise和future的功能为一个类MyFuture。

MyFuture内部持有MyFutureInner的指针,当所有持有MyFutureInner指针的MyFuture被销毁之后,MyFutureInner才会被销毁。行为上类似shared_ptr,当然这里不能直接用shared_ptr。

MyFutureInner实现了shared_ptr的核心行为,增加了set和move/copy几个针对结果的操作。同时内部通过mutex加条件变量的方式,支持阻塞和通知。

除了future之外,MyFuture其实还实现了shared_future的功能。shared_future可以通过调用future的share方法得到。shared_future内部是返回结果的克隆而不是move。在多个线程等待同一个结果时使用。为了避免复制的话,可以使用shared_ptr,具体写法如下

void thread3_run(MyFuture<std::shared_ptr<std::string>> future) {
    future.set(std::shared_ptr<std::string>{new std::string{"foo"}});
}

int main() {
    MyFuture<std::shared_ptr<std::string>> future;
    std::thread thread3{thread3_run, future};
    std::shared_ptr<std::string> string_ptr1 = future.get(true);
    thread3.join();
    std::shared_ptr<std::string> string_ptr2 = future.get(true);
    std::cout << *string_ptr1 << std::endl;
    std::cout << *string_ptr2 << std::endl;
    return 0;
}

如果你有机会阅读libc++的代码的话,可以发现promise/future内部用的也是类似上述代码的机制。当然这里没有处理结果是void(即只需要通知完成)的情况,有机会再考虑。

小结

std::thread是C++11多线程编程比较基础的类,个人觉得学习如何使用std::thread输入输出对之后其他多线程的编程技术会有帮助。希望我的文章对你有用。