大约两个月没有写博客了,原因是最近自己一直在看《The Art of Multiprocessor Programming》。这本书从理论到实践介绍了多核编程,给出的数据结构和算法都有相应的论文。如果你想好好学习多核编程,而不是某种语言的并发机制介绍的话,非常推荐这本书。同时这本书里介绍的算法,在Java的并发库里面都能找到影子。作为继续深入学习的一部分,个人打算逐个分析典型的并发类,并且基于个人理解给出一些改进或者变化。
Java的并发库里面,从版本1.5(Java 5)开始引入了接口Future和实现类FutureTask。与其他Synchronizer(同步类)不同,FutureTask并没有依赖AQS,所以是相对简单的一个并发类。FutureTask包含执行Task和同步结果两块主要功能。这里主要分析如何同步结果。
- 获取结果
- 设置结果
public interface IFuture<T> { T get(); T get(long time, @Nonnull TimeUnit unit) throws TimeoutException; void set(T value); }
其次,一般允许多个线程get。相对的,set只会有一个线程。对于多个get线程的话,类似锁,有spin方法,队列方案等。考虑到set不知何时被调用,使用队列方案。无锁的队列方案主要有Michael & Scott Lock Free Queue。其他隐式队列比如CLH Lock。
FutureTask使用的是Treiber Stack。虽然FutureTask里面的注释没有提到,但是
UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q)
很明显是Tribrie Stack的push操作。同时因为是Stack,所以后来的线程可能会被先被唤醒(事实上FutureTask代码中也确实如此)。不过作为同步器,满足在get中block时被set线程唤醒。换句话说,Future同步器没有约定唤醒的顺序。
import javax.annotation.Nonnull; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; public class Future3<T> implements IFuture<T> { private static final int STATE_NEW = 0; private static final int STATE_COMPLETED = 1; private final AtomicInteger state = new AtomicInteger(STATE_NEW); private volatile T value; private final AtomicReference<Node> top = new AtomicReference<>(null); @Override public T get() { if (state.get() == STATE_COMPLETED) { return value; } Node n = new Node(Thread.currentThread()); Node t; do { t = top.get(); n.next = t; } while (!top.compareAndSet(t, n)); if (state.get() == STATE_COMPLETED) { return value; } LockSupport.park(this); return value; } @Override public T get(long time, @Nonnull TimeUnit unit) throws TimeoutException { if (state.get() == STATE_COMPLETED) { return value; } long deadline = System.nanoTime() + unit.toNanos(time); Node n = new Node(Thread.currentThread()); Node t; do { t = top.get(); n.next = t; if (System.nanoTime() > deadline) { throw new TimeoutException(); } } while (!top.compareAndSet(t, n)); if (state.get() == STATE_COMPLETED) { return value; } LockSupport.parkUntil(this, deadline); if (System.nanoTime() > deadline) { throw new TimeoutException(); } return value; } @Override public void set(T value) { if (state.compareAndSet(STATE_NEW, STATE_COMPLETED)) { throw new IllegalStateException("expected state new, but was " + state.get()); } this.value = value; Node n = top.get(); while (n != null) { LockSupport.unpark(n.thread); n = n.next; } } private static class Node { final Thread thread; Node next = null; Node(Thread thread) { this.thread = thread; } } }
get: check state set: set value set: notify waiter threads in stack get: push self into stack get: block self
加了二次检查之后,get线程的park条件是 not completed 1 -> stack -> not completed 2 -> park。set线程在执行时满足 completed -> set value -> unpark顺序。假如此时有park并且completed的get线程,那么肯定是在get方法的第二次not completed检查和park之间。即执行序列应为
(get) not completed 1 -> (get) stack -> (get) not completed 2 -> (set) completed -> (set) set value -> (set) unpark -> (get) park
由于同一个线程的unpark -> park不会导致park时block,所以这次的unpark必须是针对其他线程的,或者说出问题的get线程还没有加入stack。
- get线程park
- get线程自动苏醒,抛出timeout异常
- set线程遍历stack,unpark已经苏醒的get线程
import javax.annotation.Nonnull; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; public class Future3<T> { private static final int STATE_NEW = 0; private static final int STATE_COMPLETED = 1; private final AtomicInteger state = new AtomicInteger(STATE_NEW); private volatile T value; private final AtomicReference<Node> top = new AtomicReference<>(null); public T get() throws InterruptedException { Node n = null; boolean queued = false; Node t; while (true) { if (Thread.interrupted()) { throw new InterruptedException(); } if (state.get() == STATE_COMPLETED) { return value; } if (n == null) { n = new Node(); } else if (!queued) { t = top.get(); n.next = t; queued = top.compareAndSet(t, n); } else { LockSupport.park(this); } } } public T get(long time, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException { final long deadline = System.nanoTime() + unit.toNanos(time); Node n = null; boolean queued = false; Node t; while (true) { if (Thread.interrupted()) { throw new InterruptedException(); } if (state.get() == STATE_COMPLETED) { return value; } if (n == null) { n = new Node(); } else if (!queued) { t = top.get(); n.next = t; queued = top.compareAndSet(t, n); } else if (System.nanoTime() > deadline) { throw new TimeoutException(); } else { LockSupport.parkUntil(this, deadline); } } } public void set(T value) { if (state.compareAndSet(STATE_NEW, STATE_COMPLETED)) { throw new IllegalStateException("expected state new, but was " + state.get()); } this.value = value; Node n = top.get(); while (n != null) { LockSupport.unpark(n.thread); n = n.next; } } private static class Node { final Thread thread; Node next = null; Node() { thread = Thread.currentThread(); } } }
你可能注意到,这里的代码开始和FutureTask有点像了。是的,个人认为FutureTask的代码是为了能更快发现中断信号或者结果已设置的标志而那么写的。这里的代码,主要用if else if来确认前提情况(注意if else if不是switch case,只要有一个条件满足,之后的条件不会被执行,不要搞糊涂了)。
- 中断(抛出异常)
- 结果已设置(返回结果)
- 节点未创建
- 节点未加入stack
- 初始时为某个get线程,不为null
- 异常时设置为null
- set线程在唤醒之后会设置为null
- 第一个节点的next为null,这被利用为遍历终止条件
- 其他节点的next不为null
- 删除节点的方法是让自己的链到自己的节点的next改成自己的next,边界情况另外讨论
- 遍历之后设置节点的next为null,帮助GC
stack top节点
- 初始为null
- 有节点时非null
- 遍历开始时设置为null
import javax.annotation.Nonnull; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; public class Future4<T> { private static final int STATE_NEW = 0; private static final int STATE_COMPLETED = 1; private final AtomicInteger state = new AtomicInteger(STATE_NEW); private volatile T value; private final AtomicReference<Node> top = new AtomicReference<>(null); public T get() throws InterruptedException { Node n = null; boolean queued = false; Node t; while (true) { if (Thread.interrupted()) { removeNode(n); throw new InterruptedException(); } if (state.get() == STATE_COMPLETED) { return value; } if (n == null) { n = new Node(); } else if (!queued) { t = top.get(); n.next = t; queued = top.compareAndSet(t, n); } else { LockSupport.park(this); } } } public T get(long time, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException { final long deadline = System.nanoTime() + unit.toNanos(time); Node n = null; boolean queued = false; Node t; while (true) { if (Thread.interrupted()) { removeNode(n); throw new InterruptedException(); } if (state.get() == STATE_COMPLETED) { return value; } if (n == null) { n = new Node(); } else if (!queued) { t = top.get(); n.next = t; queued = top.compareAndSet(t, n); } else if (System.nanoTime() > deadline) { removeNode(n); throw new TimeoutException(); } else { LockSupport.parkUntil(this, deadline); } } } private void removeNode(Node n) { if (n == null) { return; } n.thread = null; unlinkRemovedNodes(); } private void unlinkRemovedNodes() { Node p; // predecessor Node m; // current node Node s; // successor restart: while (true) { for (p = null, m = top.get(); m != null; m = s) { s = m.next; if (m.thread != null) { p = m; } else if (p != null) { // m.thread == null, m was removed if (p.next != s) { p.next = s; // skip 1 node every time } if (p.thread == null) { // predecessor was removed continue restart; } } else if (!top.compareAndSet(m, s)) { // m.thread == null && p == null, m is first node and removed // if failed, top node was changed during traversal continue restart; } } break; } } public void set(T value) { if (state.compareAndSet(STATE_NEW, STATE_COMPLETED)) { throw new IllegalStateException("expected state new, but was " + state.get()); } this.value = value; Node n = top.getAndSet(null); Node s; Thread t; while (n != null) { t = n.thread; if (t != null) { LockSupport.unpark(t); n.thread = null; } s = n.next; n.next = null; n = s; } } private static class Node { volatile Thread thread; volatile Node next = null; Node() { thread = Thread.currentThread(); } } }
如果你知道Harris Linked List的话,你可能会觉得这块代码是有问题的。因为,Harris Linked List在删除时先逻辑删除,类似这里的设置thread为null,然后再物理删除。同时,物理删除时会CAS两个值,一个是逻辑删除的标记,一个是next指针,防止物理删除时,本身被删除,或者是当前节点已经指向另外一个节点。这里并没有使用AtomicMarkableReference而是直接设置next指针,那么是否会出现问题?个人觉得这里和Harris Linked List不同的是,新节点只能在顶部被添加,所以不会存在当前节点指向了一个完全新的节点,或者说在中间插入的节点。
Node A -> Node B -> Node C -> Node D
Thread 1尝试删除Node B,Thread 2尝试删除Node C。
thread1: nodeB.thread = null thread1: nodeA.next = nodeC thread2: nodeC.thread = null thread2: nodeA.next = nodeD
thread1: nodeB.thread = null thread2: nodeC.thread = null thread1: nodeA.next = nodeD thread2: nodeA.next = nodeD
thread2: nodeC.thread = null thread2: nodeB.next = nodeD thread1: nodeB.thread = null thread1: nodeA.next = nodeD
thread2: nodeC.thread = null thread1: nodeB.thread = null thread2: nodeB.next = nodeD, restart thread1: nodeA.next = nodeD thread2: nodeA.next = nodeD
最后一个restart的case在Harris Linked List是通过CAS失败来判断的,而FutureTask这里,是通过设置后检查判断。反过来Harris Linked List是否也可以用这样的方式?先设置后判断。个人的感觉是邻接的删除的话可以,邻接的删除和插入的话可能会有问题,比如判断前一个节点被删除后如何处理?等等。
清理节点的代码,和其他代码一样,是if else if的前提条件型执行。原代码可能看起来有点摸不着头脑,这次我给了一些注释。
thread1: LockSupport.park(blocker); thread2: LockSupport.unpark(thread1);
thread2: LockSupport.unpark(thread1); thread1: LockSupport.park(blocker);
Makes available the permit for the given thread, if it was not already available. If the thread was blocked on {@code park} then it will unblock. Otherwise, its next call to {@code park} is guaranteed not to block.
thread1: LockSupport.park(blocker); thread2: thread1.interrupt(); thread3: LockSupport.unpark(thread1); thread1: LockSupport.park(blocker);
thread1由于中断苏醒,之后另一个线程unpark thread1。这时给thread1一个许可,下一次park将直接返回而不是block!
get: park other: interrupt set: traverse and unpark
get: parkUntil get: wake up set: traverse and unpark
另外,这个问题发生之后,FutureTask类不会有影响,因为get方法内部是if else if的前提条件式逻辑,允许重新开始,最多也只是多循环一次。但是有可能在其他只能park一次的逻辑中出问题。
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; public class ThreadScheduler { private static final int THREAD_STATE_NORMAL = 0; private static final int THREAD_STATE_PARK = 1; private static final int THREAD_STATE_PARK_TIMED = 2; private static final int THREAD_STATE_WAKE_UP = 3; private final AtomicInteger state = new AtomicInteger(THREAD_STATE_NORMAL); public boolean park(Object blocker) { if (!state.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_PARK)) { return false; } LockSupport.park(blocker); if (!state.compareAndSet(THREAD_STATE_PARK, THREAD_STATE_WAKE_UP)) { Thread.interrupted(); } return true; } public boolean parkUntil(Object blocker, long deadline) { if (!state.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_PARK_TIMED)) { return false; } LockSupport.parkUntil(blocker, deadline); if (!state.compareAndSet(THREAD_STATE_PARK_TIMED, THREAD_STATE_WAKE_UP)) { Thread.interrupted(); } return true; } public void wakeUp(Thread thread) { int s = state.get(); if (s == THREAD_STATE_NORMAL && state.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_WAKE_UP)) { return; } if (s == THREAD_STATE_PARK && state.compareAndSet(THREAD_STATE_PARK, THREAD_STATE_WAKE_UP)) { thread.interrupt(); return; } if (s == THREAD_STATE_PARK_TIMED && state.compareAndSet(THREAD_STATE_PARK_TIMED, THREAD_STATE_WAKE_UP)) { thread.interrupt(); } } @Override public String toString() { return "ThreadScheduler{" + "state=" + state.get() + '}'; } }
ThreadScheduler不与具体线程绑定,而且只能使用一次。对于调用顺序引起的先unpark再park问题,通过state提前改为WAKE_UP避免unpark和park操作。其次,park/parkUntil -> 苏醒 -> unpark引起的多余permit问题,使用interrupt中断park,并且在从park苏醒时尝试设置自己为WAKE_UP,如果失败就说明别人interrupt了自己,通过Thread.interrupted去除中断标志。如果是正常的interrupt的话并且设置WAKE_UP成功的话不会去除中断标志。注意这里有一个边界条件:
get: park NORMAL -> PARK other: interrupt set: interrupt PARK -> WAKE_UP get: clear interrupted flag
import javax.annotation.Nonnull; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SimpleFuture<T> { private volatile boolean completed = false; private volatile T value; private volatile Thread thread = null; private final ThreadScheduler scheduler = new ThreadScheduler(); public T get() { if (completed) { return value; } thread = Thread.currentThread(); if (completed) { return value; } scheduler.park(this); return value; } public T get(long time, @Nonnull TimeUnit unit) throws TimeoutException { if (completed) { return value; } final long deadline = System.nanoTime() + unit.toNanos(time); thread = Thread.currentThread(); if (completed) { return value; } if (scheduler.parkUntil(this, deadline)) { // 1. completed = true, interrupt/unpark // 2. completed = false, now > deadline(timeout) if (completed) { return value; } throw new TimeoutException(); } else { return value; } } public void set(T value) { this.value = value; completed = true; Thread t = thread; if (t != null) { scheduler.wakeUp(t); } } }
带超时的get可能会复杂一些,parkUntil失败时表示已经有值,正常执行并返回时有可能是超时,也有可能是thread scheduler的interrupt,所以还需要对completed做一次检查。
以下是if else if前提条件版,支持interrupt。
import javax.annotation.Nonnull; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SimpleFuture2<T> { private volatile boolean completed = false; private volatile T value; private volatile Thread thread = null; final ThreadScheduler scheduler = new ThreadScheduler(); public T get() throws InterruptedException { while (true) { if(Thread.interrupted()) { throw new InterruptedException(); } if (completed) { return value; } if (thread == null) { thread = Thread.currentThread(); } else { scheduler.park(this); } } } public T get(long time, @Nonnull TimeUnit unit) throws TimeoutException, InterruptedException { final long deadline = System.nanoTime() + unit.toNanos(time); while (true) { if(Thread.interrupted()) { throw new InterruptedException(); } if (completed) { return value; } if (thread == null) { thread = Thread.currentThread(); } else if (System.nanoTime() > deadline) { throw new TimeoutException(); } else { scheduler.parkUntil(this, deadline); } } } public void set(T value) { this.value = value; completed = true; Thread t = thread; if (t != null) { scheduler.wakeUp(t); } } }