前面分析了FutureTask,一个同步器,下面分析一个经典的数据结构:队列。并发环境下的队列实现有很多,针对的问题和使用场景也有所不同。这里分析一个比较基础的lock free unbounded queue:ConcurrentLinkedQueue。
按照《the art of multiprocessor programming》里的定义,ConcurrentLinkedQueue属于total类型。即enqueue与dequeue不会阻塞。不阻塞就代表这个类不需要考虑同步器需要考虑的逻辑,专注于数据结构本身的实现上。
注意,本文并不会详细介绍M&S,也就是Michael & Scott算法,如果你有兴趣,请自行阅读论文 Michael, Maged M., and Michael L. Scott. Simple, fast, and practical non-blocking and blocking concurrent queue algorithms. No. TR-600. ROCHESTER UNIV NY DEPT OF COMPUTER SCIENCE, 1995. 或者《the art of multiprocessor programming》第10章。
ConcurrentLinkedQueue在注释中提到自己是M&S的变体,但是当你看了核心代码offer与poll(分别对应enqueue与dequeue)之后你不会认为这是个变体,因为改得面目全非了。M&S算法最有名的helping在offer没有直接体现,enqueue时的二次CAS被改成了一次,最让人不可思议的是tail可以在某种情况下指向head之前的节点,也就是说数据结构,甚至queue本身的不变条件都被修改了。所以如果你想分析ConcurrentLinkedQueue的话,建议把M&S算法放在一边,以一种新算法的角度来分析和阅读。
ConcurrentLinkedQueue,CLQ代码中分析整理得到以下规则:
- 有且仅有一个Node的next为null
- Node的值为null代表此节点无效,包括哨兵节点(sentinel)
- Node的next指向自己表示此节点无效
规则1很好理解,M&S算法也是这样的。一般就是尾部节点,M&S算法中有可能因为竞争失败导致取到的tail指向next不为null的节点,但是顺着一定可以找到一个next为null的节点。
规则2也不是很难理解,CLQ对节点的值进行CAS设置为null来保证不会有两个线程取同一个节点的值,本质上和M&S用CAS推进head节点一致。但是这么做导致
- 值不能为null
- head在实际queue的首节点之前,没有推进
第一个问题不大,用类似Optional<T>之类的作为值就行了,第二个感觉有点得不偿失,不推进head而是CAS节点值到底有什么好处?
原因在于CLQ与M&S算法理想中的队列不同,CLQ有迭代器,迭代器在迭代时理论上不应该访问到已经被dequeue的元素。同时迭代器的remove也需要某种方式来删除队列中的元素。现在的CLQ在迭代中remove时,只会置节点值为null,类似逻辑删除,而不是链表的前一个节点的next节点指向下一个节点(那样你可能需要Harris Linked List以及AtomicMarkableReference)。假如CLQ没有迭代器的话,可能不需要这么做,也可能可以传入null。
在回答第三个规则之前,考虑CLQ的head和tail
- head指向的节点可能不是最新,但是一定能顺着访问到最新节点
- tail指向的节点可能不是最新,而且可能是head之前的节点
head刚才已经提到了,这是规则2的结果。tail的话,考虑以下场景
- 线程A追加数据,获取tail,线程卡住
- 线程B不断追加数据
- 线程C不断获取数据
这时你可以看到A看到的tail在B和C看到的head之前,A恢复后当然可以顺着next一路访问到最新的tail,但是这可能需要比较长的时间,而且中间所有数据都不能被GC,所以CLQ用了一个优化方法,让无效数据的next指向自己,这样的话,线程A就知道tail已经变化,可以跳到最新的head,同时中间的节点可以被GC。因为这是一个优化方法,你也可以选择不用。
个人觉得这个优化是与M&S算法最大的不同,也是一开始看代码最难理解的地方。
除以上规则之外,CLQ还有一些减少CAS以及volatile变量写的优化。这些优化并不影响你阅读代码,所以这里暂时不展开,分析代码时有需要时会提到。
作为队列最核心的代码是enqueue与dequeue,对应CLQ里的offer和poll。在分析poll之前,先分析poll的简化版peek,有助于理解上面的三条规则是如何影响实际代码的。
个人写的简化版CLQ,节点类和初始化代码
public class ConcurrentLinkedQueue2<T> extends AbstractQueue<T> { private final AtomicReference<Node<T>> head; private final AtomicReference<Node<T>> tail; public LinkedQueue2() { Node<T> node = new Node<>(null); head = new AtomicReference<>(node); tail = new AtomicReference<>(node); } private static class Node<T> { final AtomicReference<T> value; final AtomicReference<Node<T>> next = new AtomicReference<>(null); Node(T v) { value = new AtomicReference<>(v); } } }
peek方法以及辅助方法updateHead
@Override public T peek() { T value; restart: while (true) { for (Node<T> h = head.get(), n = h, s; ; ) { value = n.value.get(); if (value != null) { updateHead(h, n); return value; } s = n.next.get(); if (s == null) { // last node updateHead(h, n); return null; } if (s == n) { // linked to self continue restart; } n = s; } } } private void updateHead(Node<T> head, Node<T> node) { if (head != node && this.head.compareAndSet(head, node)) { head.next.set(head); } }
peek方法会取得队列的第一个元素,但是不会从队列中去除这个元素。从代码中可以看到peek从head开始,判断value是否为null,根据规则2,如果不为null,表示是有效节点,返回对应值。这里返回前updateHead会在当前节点不是head是推进head,这会在head指向的节点是无效节点(value 为null),循环到下一个节点时发生。
注意,CLQ updateHead实际的代码中,自己指向自己时用的并不是next.set而是lazySet。之前也提到规则3是一个优化,所以这里用lazySet,即使某些线程读到旧值也没有大的问题。至于lazySet到底和普通volatile write(AtomicXXX的set内部是volatile write)有什么区别,只能说在部分机器上可以提高效率。volatile write考虑到之后可能会有读操作,Java编译器层是用StoreLoad屏障以及禁止乱序,CPU层用member barrier来完成操作。对于只需要写入,但是不需要立马同步的情况,即lazySet在这里使用了unsafe的putOrderedObject,如其名禁止乱序,Java编译器层使用StoreStore屏幕,CPU层可能没有操作。“没有操作”有没有问题?这里涉及到CPU间的缓存一致性。没有使用内存屏障的写入由于CPU缓存之间的异步硬件设计可能会一时不一致,但是最终可能会一致(这块老实说和CPU架构有关,比较难断言)。考虑到member barrier比较重,所以lazySet是CPU缓存层面上的一个优化。
回到peek的第二步,如果当前节点的next指向null,说明当前节点是最后一个节点,同时value为null的话,队列里只有一开始的哨兵节点,或者真的没数据,和之前一样,顺便更新一下head,然后返回null。
第三种情况,自己指向自己,即规则3。这种往往是因为,线程A在peek,同时线程B poll了队首元素,设置节点自己指向自己(接下来可以看到poll的代码)。线程A这时只能重新读取head,即restart。
最后一种情况是当前节点值为null,next不为null也不指向自己,一般就是没有推进的head节点,代码简单地步进到next指向的节点即可。
接下来是比peek稍微复杂一点的poll代码。
@Override public T poll() { T value; restart: while (true) { for (Node<T> h = head.get(), n = h, s; ; ) { value = n.value.get(); if (value != null && n.value.compareAndSet(value, null)) { if (n != h) { s = n.next.get(); if (s == null) { updateHead(h, n); } else { updateHead(h, s); } } return value; } s = n.next.get(); if (s == null) { // last node updateHead(h, n); return null; } if (s == n) { // linked to self continue restart; } n = s; } } }
代码整体和peek很像,除了第一个有值的情况。根据规则2,poll会尝试CAS节点的值为null来逻辑删除,或者在这里是推进队列。接下来的一个判断可能有一点诡异,判断当前节点是不是head。CLQ为了减少CAS,第一次poll时不会CAS,第二次或者各种边界情况(包括peek)时,间接更新。所以说这里的判断也是一个优化,并不影响主体流程。之后在offer代码中也可以看到类似优化。判断内部的代码其实很好理解,一个是边界情况当前节点是最后一个节点,另外是一种常规情况,推进到下一个节点。源代码写法有点难懂,这里展开了相信会好理解一些。
剩下代码和peek一致。
最后是offer代码
@Override public boolean offer(T value) { if (value == null) { throw new IllegalArgumentException("value cannot be null"); } final Node<T> node = new Node<>(value); Node<T> t = tail.get(); Node<T> t2; Node<T> n = t; // current node Node<T> s; // successor while (true) { s = n.next.get(); if (s == null) { // last node if (n.next.compareAndSet(null, node)) { if (n != t) { tail.compareAndSet(t, n); } return true; } // re-read next and retry continue; } if (s == n) { // linked to self t2 = tail.get(); if (t2 != t) { // tail changed t = t2; n = t2; } else { n = head.get(); } } else { if (n != t) { t2 = tail.get(); if (t2 != t) { // tail changed t = t2; n = t2; continue; } } n = s; } } }
第一步判断value,因为规则2导致值不能为null。第二部创建节点。注意,这一步创建时,CLQ Node在构造函数里使用了unsafe的putObject,而不是直接写volatile。这里同样是为了减少volatile write引起的memory barrier,用一个普通变量的写来代替。因为是offer线程创建的节点,运行offer线程的CPU以外的CPU不可能有value的拷贝,所以这样做是安全的。
接下来尝试找到最后的节点,即next为null的节点,并尝试设置为当前节点。如果成功,在当前节点不是tail的时候,推进tail。这里也是一个优化,为了减少CAS。M&S算法原本在成功设置后会CAS tail为新节点,offer只有在看到的tail节点不是最新时才这么做。这样的话,两个线程同时追加,M&S算法最坏可能有6次CAS
- 线程A CAS tail.next为新节点
- 线程B CAS tail.next失败
- 线程B CAS推进tail(helping)
- 线程A CAS tail为新节点(失败,但是被忽略)
- 线程B CAS 新的tail.next为新节点
- 线程B CAS推进tail
而offer代码中
- 线程A CAS tail.next为新节点
- 线程B CAS tail.next失败
- (线程A不会推进tail)
- 线程B 读取tail.next得到新的tail,CAS tail.next为新节点
- 线程B CAS推进tail
总共只有4次,去掉了一次helping和一次CAS推进。仔细想想,确实作为一个并发队列不一定要和串行队列一样保证tail肯定指向最后一个元素,去掉这个要求的话,可以减少CAS操作。当然没了M&S算法标志的helping,offer很难看出来是基于M&S算法。
第二种是自己指向自己,根据规则3,当前节点已失效,需要重新遍历。这里offer算法会尝试重新读取tail或者head。理论上来说,一旦有新元素被追加了,那么失效的tail肯定会重新指向最新的tail,这时选择tail更合算。另外一种情况是节点追加后tail未推进,但是poll线程把所有元素都设置为null了,即队列中没有元素了,因为tail没有变化,通过当前的元素的next也无法访问到真正的尾部元素,只能选择head,重新从head开始遍历。
第三种情况是tail未推进,或者竞争失败,这时可以选择当前节点的next,也可以重新读取tail。CLQ这里在当前节点不是tail的时候选择重新读取,否则直接使用next。源代码有些难懂,这里把分支展开便于理解。
以上就是CLQ核心部分的算法。整体来说,因为加了好几个优化,导致实际代码和M&S算法相去甚远。作为参考,给出一个我写的简化版M&S算法实现。
public class LinkedQueue1<T> extends AbstractQueue<T> { private final AtomicReference<Node<T>> head; private final AtomicReference<Node<T>> tail; public LinkedQueue1() { Node<T> sentinel = new Node<>(); head = new AtomicReference<>(sentinel); tail = new AtomicReference<>(sentinel); } @Override @Nonnull public Iterator<T> iterator() { throw new UnsupportedOperationException(); } @Override public int size() { throw new UnsupportedOperationException(); } @Override public boolean offer(T value) { final Node<T> n = new Node<>(value); Node<T> t; // tail Node<T> s; // successor while (true) { t = tail.get(); s = t.next.get(); if (s != null) { tail.compareAndSet(t, s); // help System.out.println("help"); } else if (t.next.compareAndSet(null, n)) { tail.compareAndSet(t, n); return true; } } } @Override public T poll() { Node<T> h; // head Node<T> s; // successor while (true) { h = head.get(); s = h.next.get(); if (s == null) { throw new IllegalStateException("queue is empty"); } if (head.compareAndSet(h, s)) { return s.value; } } } @Override public T peek() { final Node<T> s = head.get().next.get(); return s != null ? s.value : null; } private static class Node<T> { final T value; final AtomicReference<Node<T>> next = new AtomicReference<>(null); Node() { this(null); } Node(T value) { this.value = value; } } }
如果你有兴趣,你可以对比CLQ看一看哪些地方被修改了。
顺便说一句,网上部分介绍CLQ的文章都只是就代码进行分析,而不是从M&S算法开始讲解CLQ的改进。这样读者就难以理解为什么要这么做以及原来是怎么做的。如果你想深入了解的话,建议还是看一下M&S算法,并发编程领域的知识个人觉得还是需要循序渐进地学比较好。希望我的分析对你有用。