Java并发学习 类FutureTask分析与改进


大约两个月没有写博客了,原因是最近自己一直在看《The Art of Multiprocessor Programming》。这本书从理论到实践介绍了多核编程,给出的数据结构和算法都有相应的论文。如果你想好好学习多核编程,而不是某种语言的并发机制介绍的话,非常推荐这本书。同时这本书里介绍的算法,在Java的并发库里面都能找到影子。作为继续深入学习的一部分,个人打算逐个分析典型的并发类,并且基于个人理解给出一些改进或者变化。

Java的并发库里面,从版本1.5(Java 5)开始引入了接口Future和实现类FutureTask。与其他Synchronizer(同步类)不同,FutureTask并没有依赖AQS,所以是相对简单的一个并发类。FutureTask包含执行Task和同步结果两块主要功能。这里主要分析如何同步结果。

注意,本文并不会逐行代码分析FutureTask。为了学习如何设计并发类,本文以个人理解的原型类开始,逐渐修改成为接近实际FutureTask甚至超过FutureTask的代码。

同步器:同步结果

线程角色:

  1. 获取结果
  2. 设置结果

接口

public interface IFuture<T> {
    
    T get();

    T get(long time, @Nonnull TimeUnit unit) throws TimeoutException;

    void set(T value);
    
}

Future2

首先考虑get和set这一对方法。获取结果是调用get方法,设置结果是调用set方法。由于是不同线程调用,所以可能会有先后。如果get先来,set后来的话,get必须等待set设置的结果,所以get需要block。

其次,一般允许多个线程get。相对的,set只会有一个线程。对于多个get线程的话,类似锁,有spin方法,队列方案等。考虑到set不知何时被调用,使用队列方案。无锁的队列方案主要有Michael & Scott Lock Free Queue。其他隐式队列比如CLH Lock。

FutureTask使用的是Treiber Stack。虽然FutureTask里面的注释没有提到,但是

UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q)

很明显是Tribrie Stack的push操作。同时因为是Stack,所以后来的线程可能会被先被唤醒(事实上FutureTask代码中也确实如此)。不过作为同步器,满足在get中block时被set线程唤醒。换句话说,Future同步器没有约定唤醒的顺序。

以下是原型代码

import javax.annotation.Nonnull;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class Future3<T> implements IFuture<T> {

    private static final int STATE_NEW = 0;
    private static final int STATE_COMPLETED = 1;

    private final AtomicInteger state = new AtomicInteger(STATE_NEW);
    private volatile T value;
    private final AtomicReference<Node> top = new AtomicReference<>(null);

    @Override
    public T get() {
        if (state.get() == STATE_COMPLETED) {
            return value;
        }

        Node n = new Node(Thread.currentThread());
        Node t;
        do {
            t = top.get();
            n.next = t;
        } while (!top.compareAndSet(t, n));

        if (state.get() == STATE_COMPLETED) {
            return value;
        }
        LockSupport.park(this);
        return value;
    }

    @Override
    public T get(long time, @Nonnull TimeUnit unit) throws TimeoutException {
        if (state.get() == STATE_COMPLETED) {
            return value;
        }
        long deadline = System.nanoTime() + unit.toNanos(time);
        Node n = new Node(Thread.currentThread());
        Node t;
        do {
            t = top.get();
            n.next = t;
            if (System.nanoTime() > deadline) {
                throw new TimeoutException();
            }
        } while (!top.compareAndSet(t, n));
        if (state.get() == STATE_COMPLETED) {
            return value;
        }
        LockSupport.parkUntil(this, deadline);
        if (System.nanoTime() > deadline) {
            throw new TimeoutException();
        }
        return value;
    }

    @Override
    public void set(T value) {
        if (state.compareAndSet(STATE_NEW, STATE_COMPLETED)) {
            throw new IllegalStateException("expected state new, but was " + state.get());
        }
        this.value = value;
        Node n = top.get();
        while (n != null) {
            LockSupport.unpark(n.thread);
            n = n.next;
        }
    }

    private static class Node {
        final Thread thread;
        Node next = null;

        Node(Thread thread) {
            this.thread = thread;
        }
    }
}

get方法首先检查当前状态,如果已完成,则直接返回结果。否则把自己加入stack中。加入stack后再次检查当前状态,之后会详细说明为什么需要二次检查。最后通过LockSupport的park方法block自己。如果从block中醒来的话,一定是有结果了,所以直接返回结果。

相对应的set方法,通过状态判断结果是否已经被设置,没有的话就设置结果。然后从stack的顶部节点开始遍历,unpark即唤醒节点对应的线程。

说明一下为什么get方法中需要判断两次。主要是存在这样一种情况。

get第一次检查之后和把自己加入stack之间,set线程完成了所有操作。即

get: check state
set: set value
set: notify waiter threads in stack
get: push self into stack 
get: block self

这时当前的get线程就永远得不到结果,一直block下去。类似消息丢失。直接的解决方案是在block之前再次检查一下状态,虽然自己不会被唤醒,但是也不会进入block状态。

加了二次检查之后,get线程的park条件是 not completed 1 -> stack -> not completed 2 -> park。set线程在执行时满足 completed -> set value -> unpark顺序。假如此时有park并且completed的get线程,那么肯定是在get方法的第二次not completed检查和park之间。即执行序列应为

(get) not completed 1 -> (get) stack -> (get) not completed 2 -> (set) completed -> (set) set value -> (set) unpark -> (get) park

由于同一个线程的unpark -> park不会导致park时block,所以这次的unpark必须是针对其他线程的,或者说出问题的get线程还没有加入stack。

再观察调用序列,发现get方法中加入stack在unpark之前执行,所以出问题的线程已经加入stack,出现矛盾,不存在所谓park并且completed的get线程(反证法)。

接下来分析一下带超时的get方法。

带超时的get基本和普通的get差不多,只是在push进stack和从park中自动醒来时需要检查是否超时,超时的话抛出异常。

这次需要注意,parkUntil由于会自动苏醒,所以有几率导致多余的唤醒。具体执行序列如下

  1. get线程park
  2. get线程自动苏醒,抛出timeout异常
  3. set线程遍历stack,unpark已经苏醒的get线程

FutureTask中在timeout时会移除当前节点,个人认为减少了发生几率,但是不能完全阻止发生。在移除的时候,当前线程也是有可能被遍历到的。个人之后会给出一个解决方案,现在继续优化同步器本身。

Future3

FutureTask中get线程在等待会响应中断。Java的中断处理是一种协作式的,需要线程代码在允许的情况下检查中断标志。比如原型get代码中push的部分。如果你想在更多的地方检查中断标志的话,原型代码会变得很乱,为了解决这个问题,你需要修改代码。

import javax.annotation.Nonnull;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class Future3<T> {

    private static final int STATE_NEW = 0;
    private static final int STATE_COMPLETED = 1;

    private final AtomicInteger state = new AtomicInteger(STATE_NEW);
    private volatile T value;
    private final AtomicReference<Node> top = new AtomicReference<>(null);


    public T get() throws InterruptedException {
        Node n = null;
        boolean queued = false;
        Node t;
        while (true) {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            if (state.get() == STATE_COMPLETED) {
                return value;
            }
            if (n == null) {
                n = new Node();
            } else if (!queued) {
                t = top.get();
                n.next = t;
                queued = top.compareAndSet(t, n);
            } else {
                LockSupport.park(this);
            }
        }
    }


    public T get(long time, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException {
        final long deadline = System.nanoTime() + unit.toNanos(time);
        Node n = null;
        boolean queued = false;
        Node t;
        while (true) {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            if (state.get() == STATE_COMPLETED) {
                return value;
            }
            if (n == null) {
                n = new Node();
            } else if (!queued) {
                t = top.get();
                n.next = t;
                queued = top.compareAndSet(t, n);
            } else if (System.nanoTime() > deadline) {
                throw new TimeoutException();
            } else {
                LockSupport.parkUntil(this, deadline);
            }
        }
    }

    public void set(T value) {
        if (state.compareAndSet(STATE_NEW, STATE_COMPLETED)) {
            throw new IllegalStateException("expected state new, but was " + state.get());
        }
        this.value = value;

        Node n = top.get();
        while (n != null) {
            LockSupport.unpark(n.thread);
            n = n.next;
        }
    }

    private static class Node {
        final Thread thread;
        Node next = null;

        Node() {
            thread = Thread.currentThread();
        }
    }
}

你可能注意到,这里的代码开始和FutureTask有点像了。是的,个人认为FutureTask的代码是为了能更快发现中断信号或者结果已设置的标志而那么写的。这里的代码,主要用if else if来确认前提情况(注意if else if不是switch case,只要有一个条件满足,之后的条件不会被执行,不要搞糊涂了)。

简单分析一下get方法。主要条件有

  1. 中断(抛出异常)
  2. 结果已设置(返回结果)
  3. 节点未创建
  4. 节点未加入stack

以上所有条件clear之后才能park。理论上来说,这里的get和Future2的get是一致的,在park之前会再次检查结果是否已设置(节点加入stack之后会重新开始循环)。

带超时的get比普通get多了一个是否超时的条件,未超时的话执行park。

Future4

同步器在执行时,等待的get线程在stack中等待被遍历。遍历完之后这些节点仍旧存在,为了帮助GC,以及降低异常等待(中断和超时)节点被访问到的可能性,FutureTask在遍历和异常(中断和超时)处理时,会设置节点的thread和next为null。整理一下这两个变量的值和变化的情况。

节点的thread

  1. 初始时为某个get线程,不为null
  2. 异常时设置为null
  3. set线程在唤醒之后会设置为null

由于第二条异常时设置为null,set线程尝试唤醒时需要检查thread是否为null,而且必须保存到local变量,否则中途thread变为null会导致唤醒失败。之后可以看到这块代码。

节点的next

  1. 第一个节点的next为null,这被利用为遍历终止条件
  2. 其他节点的next不为null
  3. 删除节点的方法是让自己的链到自己的节点的next改成自己的next,边界情况另外讨论
  4. 遍历之后设置节点的next为null,帮助GC

FutureTask中使用的节点只有next,或者说stack构成了单向链表。如果有prev构成双向列表的话,可能删除节点的代码可能会简单一些。

stack top节点

  1. 初始为null
  2. 有节点时非null
  3. 遍历开始时设置为null

第三条个人认为是为了GC。

import javax.annotation.Nonnull;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class Future4<T> {

    private static final int STATE_NEW = 0;
    private static final int STATE_COMPLETED = 1;

    private final AtomicInteger state = new AtomicInteger(STATE_NEW);
    private volatile T value;
    private final AtomicReference<Node> top = new AtomicReference<>(null);


    public T get() throws InterruptedException {
        Node n = null;
        boolean queued = false;
        Node t;
        while (true) {
            if (Thread.interrupted()) {
                removeNode(n);
                throw new InterruptedException();
            }
            if (state.get() == STATE_COMPLETED) {
                return value;
            }
            if (n == null) {
                n = new Node();
            } else if (!queued) {
                t = top.get();
                n.next = t;
                queued = top.compareAndSet(t, n);
            } else {
                LockSupport.park(this);
            }
        }
    }


    public T get(long time, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException {
        final long deadline = System.nanoTime() + unit.toNanos(time);
        Node n = null;
        boolean queued = false;
        Node t;
        while (true) {
            if (Thread.interrupted()) {
                removeNode(n);
                throw new InterruptedException();
            }
            if (state.get() == STATE_COMPLETED) {
                return value;
            }
            if (n == null) {
                n = new Node();
            } else if (!queued) {
                t = top.get();
                n.next = t;
                queued = top.compareAndSet(t, n);
            } else if (System.nanoTime() > deadline) {
                removeNode(n);
                throw new TimeoutException();
            } else {
                LockSupport.parkUntil(this, deadline);
            }
        }
    }

    private void removeNode(Node n) {
        if (n == null) {
            return;
        }
        n.thread = null;
        unlinkRemovedNodes();
    }

    private void unlinkRemovedNodes() {
        Node p; // predecessor
        Node m; // current node
        Node s; // successor

        restart:
        while (true) {
            for (p = null, m = top.get(); m != null; m = s) {
                s = m.next;
                if (m.thread != null) {
                    p = m;
                } else if (p != null) {
                    // m.thread == null, m was removed
                    if (p.next != s) {
                        p.next = s; // skip 1 node every time
                    }
                    if (p.thread == null) { // predecessor was removed
                        continue restart;
                    }
                } else if (!top.compareAndSet(m, s)) {
                    // m.thread == null && p == null, m is first node and removed
                    // if failed, top node was changed during traversal
                    continue restart;
                }
            }
            break;
        }
    }

    public void set(T value) {
        if (state.compareAndSet(STATE_NEW, STATE_COMPLETED)) {
            throw new IllegalStateException("expected state new, but was " + state.get());
        }
        this.value = value;

        Node n = top.getAndSet(null);
        Node s;
        Thread t;
        while (n != null) {
            t = n.thread;
            if (t != null) {
                LockSupport.unpark(t);
                n.thread = null;
            }
            s = n.next;
            n.next = null;
            n = s;
        }
    }

    private static class Node {
        volatile Thread thread;
        volatile Node next = null;

        Node() {
            thread = Thread.currentThread();
        }
    }
}

首先看set部分的代码,开始遍历前设置top为null。在尝试唤醒某个节点时,将thread保存到local变量中,然后再检查,唤醒和设置为null。同时在唤醒之后,为了帮助GC,设置节点next为null。

FutureTask原来的removeWaitNode方法,除了设置某个node的thread为null之外,还尝试整体清理被删除的节点。重点讲一下清理的代码unlinkRemovedNodes。

如果你知道Harris Linked List的话,你可能会觉得这块代码是有问题的。因为,Harris Linked List在删除时先逻辑删除,类似这里的设置thread为null,然后再物理删除。同时,物理删除时会CAS两个值,一个是逻辑删除的标记,一个是next指针,防止物理删除时,本身被删除,或者是当前节点已经指向另外一个节点。这里并没有使用AtomicMarkableReference而是直接设置next指针,那么是否会出现问题?个人觉得这里和Harris Linked List不同的是,新节点只能在顶部被添加,所以不会存在当前节点指向了一个完全新的节点,或者说在中间插入的节点。

考虑邻接节点同时删除的情况

Node A -> Node B -> Node C -> Node D

Thread 1尝试删除Node B,Thread 2尝试删除Node C。

thread1: nodeB.thread = null
thread1: nodeA.next = nodeC
thread2: nodeC.thread = null
thread2: nodeA.next = nodeD

正常

thread1: nodeB.thread = null 
thread2: nodeC.thread = null 
thread1: nodeA.next = nodeD
thread2: nodeA.next = nodeD

正常

thread2: nodeC.thread = null
thread2: nodeB.next = nodeD
thread1: nodeB.thread = null
thread1: nodeA.next = nodeD

正常

thread2: nodeC.thread = null
thread1: nodeB.thread = null
thread2: nodeB.next = nodeD, restart
thread1: nodeA.next = nodeD
thread2: nodeA.next = nodeD

restart

最后一个restart的case在Harris Linked List是通过CAS失败来判断的,而FutureTask这里,是通过设置后检查判断。反过来Harris Linked List是否也可以用这样的方式?先设置后判断。个人的感觉是邻接的删除的话可以,邻接的删除和插入的话可能会有问题,比如判断前一个节点被删除后如何处理?等等。

这里的边界条件是第一个节点,即stack的顶节点。如果要删除的节点是顶节点,除了设置thread为null,还需要修改top。如果此时top失败,说明,有新节点加入,或者遍历开始了。后者在restart时直接退出,前者需要从新的顶节点开始遍历,也就是说restart。

清理节点的代码,和其他代码一样,是if else if的前提条件型执行。原代码可能看起来有点摸不着头脑,这次我给了一些注释。

到这里为止,代码基本和FutureTask一样了。FutureTask还包含额外的一些任务状态,但是并不影响上面的核心逻辑。接下来,主要分析一下FutureTask中可能出现的多余的unpark问题。

unpark与ThreadScheduler

首先以下调用序列可以正常恢复thread1

thread1: LockSupport.park(blocker);
thread2: LockSupport.unpark(thread1);

可能一部分人不知道

thread2: LockSupport.unpark(thread1);
thread1: LockSupport.park(blocker);

thread1在进入park之后立马就会返回。unpark的描述中提到

Makes available the permit for the given thread, if it was not already available.  If the thread was blocked on
{@code park} then it will unblock.  Otherwise, its next call to {@code park} is guaranteed not to block.

所以先unpark再park也是可以的,如果你的程序由于某些原因造成了这种调用顺序。

但是,由于这个特性会造成一类问题。

thread1: LockSupport.park(blocker);
thread2: thread1.interrupt();
thread3: LockSupport.unpark(thread1);
thread1: LockSupport.park(blocker);

thread1由于中断苏醒,之后另一个线程unpark thread1。这时给thread1一个许可,下一次park将直接返回而不是block!

FutureTask中是否存在这种问题,有可能,但是几率比较小。

get: park
other: interrupt
set: traverse and unpark

get线程park,某个线程中断了get线程,此时get线程会尝试从stack中移除自己。假设在移除时,正好结果被设置,stack开始被遍历,get线程被unpark。几率小的原因是,移除(逻辑)只需要一步,设置thread为null,这样被遍历时不会被unpark。

另外一种带超时的get

get: parkUntil
get: wake up
set: traverse and unpark

同样在timeout之后,get线程会尝试删除自己,所以发生几率很低。

另外,这个问题发生之后,FutureTask类不会有影响,因为get方法内部是if else if的前提条件式逻辑,允许重新开始,最多也只是多循环一次。但是有可能在其他只能park一次的逻辑中出问题。

为了解决这个问题,我设计了一种相对严格的park/interrupt调度方式,以下是实际代码

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

public class ThreadScheduler {

    private static final int THREAD_STATE_NORMAL = 0;
    private static final int THREAD_STATE_PARK = 1;
    private static final int THREAD_STATE_PARK_TIMED = 2;
    private static final int THREAD_STATE_WAKE_UP = 3;

    private final AtomicInteger state = new AtomicInteger(THREAD_STATE_NORMAL);

    public boolean park(Object blocker) {
        if (!state.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_PARK)) {
            return false;
        }
        LockSupport.park(blocker);
        if (!state.compareAndSet(THREAD_STATE_PARK, THREAD_STATE_WAKE_UP)) {
            Thread.interrupted();
        }
        return true;
    }

    public boolean parkUntil(Object blocker, long deadline) {
        if (!state.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_PARK_TIMED)) {
            return false;
        }
        LockSupport.parkUntil(blocker, deadline);
        if (!state.compareAndSet(THREAD_STATE_PARK_TIMED, THREAD_STATE_WAKE_UP)) {
            Thread.interrupted();
        }
        return true;
    }

    public void wakeUp(Thread thread) {
        int s = state.get();
        if (s == THREAD_STATE_NORMAL && state.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_WAKE_UP)) {
            return;
        }
        if (s == THREAD_STATE_PARK && state.compareAndSet(THREAD_STATE_PARK, THREAD_STATE_WAKE_UP)) {
            thread.interrupt();
            return;
        }
        if (s == THREAD_STATE_PARK_TIMED && state.compareAndSet(THREAD_STATE_PARK_TIMED, THREAD_STATE_WAKE_UP)) {
            thread.interrupt();
        }
    }

    @Override
    public String toString() {
        return "ThreadScheduler{" +
                "state=" + state.get() +
                '}';
    }
}

ThreadScheduler不与具体线程绑定,而且只能使用一次。对于调用顺序引起的先unpark再park问题,通过state提前改为WAKE_UP避免unpark和park操作。其次,park/parkUntil -> 苏醒 -> unpark引起的多余permit问题,使用interrupt中断park,并且在从park苏醒时尝试设置自己为WAKE_UP,如果失败就说明别人interrupt了自己,通过Thread.interrupted去除中断标志。如果是正常的interrupt的话并且设置WAKE_UP成功的话不会去除中断标志。注意这里有一个边界条件:

get: park NORMAL -> PARK
other: interrupt
set: interrupt PARK -> WAKE_UP
get: clear interrupted flag

即有线程中断了get线程,同时set也尝试唤醒自己的话,中断标志会被删除,get线程也不会知道有两次中断。在FutureTask这里不会有问题,因为从结果上来说,第二次中断预示有值,返回而不是抛出异常没有什么不妥。

SimpleFuture

最后给一个简化的FutureTask,单get线程单set线程。同样分别给原型代码和优化后的代码。

import javax.annotation.Nonnull;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SimpleFuture<T> {

    private volatile boolean completed = false;
    private volatile T value;
    private volatile Thread thread = null;
    private final ThreadScheduler scheduler = new ThreadScheduler();

    public T get() {
        if (completed) {
            return value;
        }
        thread = Thread.currentThread();
        if (completed) {
            return value;
        }
        scheduler.park(this);
        return value;
    }

    public T get(long time, @Nonnull TimeUnit unit) throws TimeoutException {
        if (completed) {
            return value;
        }
        final long deadline = System.nanoTime() + unit.toNanos(time);
        thread = Thread.currentThread();
        if (completed) {
            return value;
        }
        if (scheduler.parkUntil(this, deadline)) {
            // 1. completed = true, interrupt/unpark
            // 2. completed = false, now > deadline(timeout)
            if (completed) {
                return value;
            }
            throw new TimeoutException();
        } else {
            return value;
        }
    }

    public void set(T value) {
        this.value = value;
        completed = true;
        Thread t = thread;
        if (t != null) {
            scheduler.wakeUp(t);
        }
    }

}

因为只有单个get和set线程,所以基本只需要volatile变量。一个技巧是,如果只有一个线程修改变量的话,那么就可以设置为volatile变量。同样,第二次状态检查是必须的。

带超时的get可能会复杂一些,parkUntil失败时表示已经有值,正常执行并返回时有可能是超时,也有可能是thread scheduler的interrupt,所以还需要对completed做一次检查。

以下是if else if前提条件版,支持interrupt。

import javax.annotation.Nonnull;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SimpleFuture2<T> {

    private volatile boolean completed = false;
    private volatile T value;
    private volatile Thread thread = null;
    final ThreadScheduler scheduler = new ThreadScheduler();

    public T get() throws InterruptedException {
        while (true) {
            if(Thread.interrupted()) {
                throw new InterruptedException();
            }
            if (completed) {
                return value;
            }
            if (thread == null) {
                thread = Thread.currentThread();
            } else {
                scheduler.park(this);
            }
        }
    }

    public T get(long time, @Nonnull TimeUnit unit) throws TimeoutException, InterruptedException {
        final long deadline = System.nanoTime() + unit.toNanos(time);
        while (true) {
            if(Thread.interrupted()) {
                throw new InterruptedException();
            }
            if (completed) {
                return value;
            }
            if (thread == null) {
                thread = Thread.currentThread();
            } else if (System.nanoTime() > deadline) {
                throw new TimeoutException();
            } else {
                scheduler.parkUntil(this, deadline);
            }
        }
    }

    public void set(T value) {
        this.value = value;
        completed = true;
        Thread t = thread;
        if (t != null) {
            scheduler.wakeUp(t);
        }
    }

}

至此,FutureTask的分析完成。欢迎感兴趣的人一起交流和分享,谢谢。

,