Java并发学习 ConcurrentLinkedQueue以及被魔改的M&S算法


前面分析了FutureTask,一个同步器,下面分析一个经典的数据结构:队列。并发环境下的队列实现有很多,针对的问题和使用场景也有所不同。这里分析一个比较基础的lock free unbounded queue:ConcurrentLinkedQueue。

按照《the art of multiprocessor programming》里的定义,ConcurrentLinkedQueue属于total类型。即enqueue与dequeue不会阻塞。不阻塞就代表这个类不需要考虑同步器需要考虑的逻辑,专注于数据结构本身的实现上。

注意,本文并不会详细介绍M&S,也就是Michael & Scott算法,如果你有兴趣,请自行阅读论文 Michael, Maged M., and Michael L. Scott. Simple, fast, and practical non-blocking and blocking concurrent queue algorithms. No. TR-600. ROCHESTER UNIV NY DEPT OF COMPUTER SCIENCE, 1995. 或者《the art of multiprocessor programming》第10章。

ConcurrentLinkedQueue在注释中提到自己是M&S的变体,但是当你看了核心代码offer与poll(分别对应enqueue与dequeue)之后你不会认为这是个变体,因为改得面目全非了。M&S算法最有名的helping在offer没有直接体现,enqueue时的二次CAS被改成了一次,最让人不可思议的是tail可以在某种情况下指向head之前的节点,也就是说数据结构,甚至queue本身的不变条件都被修改了。所以如果你想分析ConcurrentLinkedQueue的话,建议把M&S算法放在一边,以一种新算法的角度来分析和阅读。

ConcurrentLinkedQueue,CLQ代码中分析整理得到以下规则:

  1. 有且仅有一个Node的next为null
  2. Node的值为null代表此节点无效,包括哨兵节点(sentinel)
  3. Node的next指向自己表示此节点无效

规则1很好理解,M&S算法也是这样的。一般就是尾部节点,M&S算法中有可能因为竞争失败导致取到的tail指向next不为null的节点,但是顺着一定可以找到一个next为null的节点。

规则2也不是很难理解,CLQ对节点的值进行CAS设置为null来保证不会有两个线程取同一个节点的值,本质上和M&S用CAS推进head节点一致。但是这么做导致

  1. 值不能为null
  2. head在实际queue的首节点之前,没有推进

第一个问题不大,用类似Optional<T>之类的作为值就行了,第二个感觉有点得不偿失,不推进head而是CAS节点值到底有什么好处?

原因在于CLQ与M&S算法理想中的队列不同,CLQ有迭代器,迭代器在迭代时理论上不应该访问到已经被dequeue的元素。同时迭代器的remove也需要某种方式来删除队列中的元素。现在的CLQ在迭代中remove时,只会置节点值为null,类似逻辑删除,而不是链表的前一个节点的next节点指向下一个节点(那样你可能需要Harris Linked List以及AtomicMarkableReference)。假如CLQ没有迭代器的话,可能不需要这么做,也可能可以传入null。

在回答第三个规则之前,考虑CLQ的head和tail

  1. head指向的节点可能不是最新,但是一定能顺着访问到最新节点
  2. tail指向的节点可能不是最新,而且可能是head之前的节点

head刚才已经提到了,这是规则2的结果。tail的话,考虑以下场景

  1. 线程A追加数据,获取tail,线程卡住
  2. 线程B不断追加数据
  3. 线程C不断获取数据

这时你可以看到A看到的tail在B和C看到的head之前,A恢复后当然可以顺着next一路访问到最新的tail,但是这可能需要比较长的时间,而且中间所有数据都不能被GC,所以CLQ用了一个优化方法,让无效数据的next指向自己,这样的话,线程A就知道tail已经变化,可以跳到最新的head,同时中间的节点可以被GC。因为这是一个优化方法,你也可以选择不用。

个人觉得这个优化是与M&S算法最大的不同,也是一开始看代码最难理解的地方。

除以上规则之外,CLQ还有一些减少CAS以及volatile变量写的优化。这些优化并不影响你阅读代码,所以这里暂时不展开,分析代码时有需要时会提到。

作为队列最核心的代码是enqueue与dequeue,对应CLQ里的offer和poll。在分析poll之前,先分析poll的简化版peek,有助于理解上面的三条规则是如何影响实际代码的。

个人写的简化版CLQ,节点类和初始化代码

public class ConcurrentLinkedQueue2<T> extends AbstractQueue<T> {
    private final AtomicReference<Node<T>> head;
    private final AtomicReference<Node<T>> tail;

    public LinkedQueue2() {
        Node<T> node = new Node<>(null);
        head = new AtomicReference<>(node);
        tail = new AtomicReference<>(node);
    }

    private static class Node<T> {
        final AtomicReference<T> value;
        final AtomicReference<Node<T>> next = new AtomicReference<>(null);

        Node(T v) {
            value = new AtomicReference<>(v);
        }
    }
}

peek方法以及辅助方法updateHead

@Override
public T peek() {
    T value;

    restart:
    while (true) {
        for (Node<T> h = head.get(), n = h, s; ; ) {
            value = n.value.get();

            if (value != null) {
                updateHead(h, n);
                return value;
            }

            s = n.next.get();
            if (s == null) { // last node
                updateHead(h, n);
                return null;
            }

            if (s == n) { // linked to self
                continue restart;
            }

            n = s;
        }
    }
}

private void updateHead(Node<T> head, Node<T> node) {
    if (head != node && this.head.compareAndSet(head, node)) {
        head.next.set(head);
    }
}

peek方法会取得队列的第一个元素,但是不会从队列中去除这个元素。从代码中可以看到peek从head开始,判断value是否为null,根据规则2,如果不为null,表示是有效节点,返回对应值。这里返回前updateHead会在当前节点不是head是推进head,这会在head指向的节点是无效节点(value 为null),循环到下一个节点时发生。

注意,CLQ updateHead实际的代码中,自己指向自己时用的并不是next.set而是lazySet。之前也提到规则3是一个优化,所以这里用lazySet,即使某些线程读到旧值也没有大的问题。至于lazySet到底和普通volatile write(AtomicXXX的set内部是volatile write)有什么区别,只能说在部分机器上可以提高效率。volatile write考虑到之后可能会有读操作,Java编译器层是用StoreLoad屏障以及禁止乱序,CPU层用member barrier来完成操作。对于只需要写入,但是不需要立马同步的情况,即lazySet在这里使用了unsafe的putOrderedObject,如其名禁止乱序,Java编译器层使用StoreStore屏幕,CPU层可能没有操作。“没有操作”有没有问题?这里涉及到CPU间的缓存一致性。没有使用内存屏障的写入由于CPU缓存之间的异步硬件设计可能会一时不一致,但是最终可能会一致(这块老实说和CPU架构有关,比较难断言)。考虑到member barrier比较重,所以lazySet是CPU缓存层面上的一个优化。

回到peek的第二步,如果当前节点的next指向null,说明当前节点是最后一个节点,同时value为null的话,队列里只有一开始的哨兵节点,或者真的没数据,和之前一样,顺便更新一下head,然后返回null。

第三种情况,自己指向自己,即规则3。这种往往是因为,线程A在peek,同时线程B poll了队首元素,设置节点自己指向自己(接下来可以看到poll的代码)。线程A这时只能重新读取head,即restart。

最后一种情况是当前节点值为null,next不为null也不指向自己,一般就是没有推进的head节点,代码简单地步进到next指向的节点即可。

接下来是比peek稍微复杂一点的poll代码。

@Override
public T poll() {
    T value;

    restart:
    while (true) {
        for (Node<T> h = head.get(), n = h, s; ; ) {
            value = n.value.get();

            if (value != null && n.value.compareAndSet(value, null)) {
                if (n != h) {
                    s = n.next.get();
                    if (s == null) {
                        updateHead(h, n);
                    } else {
                        updateHead(h, s);
                    }
                }
                return value;
            }

            s = n.next.get();
            if (s == null) { // last node
                updateHead(h, n);
                return null;
            }
            if (s == n) { // linked to self
                continue restart;
            }

            n = s;
        }
    }
}

代码整体和peek很像,除了第一个有值的情况。根据规则2,poll会尝试CAS节点的值为null来逻辑删除,或者在这里是推进队列。接下来的一个判断可能有一点诡异,判断当前节点是不是head。CLQ为了减少CAS,第一次poll时不会CAS,第二次或者各种边界情况(包括peek)时,间接更新。所以说这里的判断也是一个优化,并不影响主体流程。之后在offer代码中也可以看到类似优化。判断内部的代码其实很好理解,一个是边界情况当前节点是最后一个节点,另外是一种常规情况,推进到下一个节点。源代码写法有点难懂,这里展开了相信会好理解一些。

剩下代码和peek一致。

最后是offer代码

@Override
public boolean offer(T value) {
    if (value == null) {
        throw new IllegalArgumentException("value cannot be null");
    }

    final Node<T> node = new Node<>(value);

    Node<T> t = tail.get();
    Node<T> t2;
    Node<T> n = t; // current node
    Node<T> s; // successor
    while (true) {
        s = n.next.get();
        if (s == null) { // last node
            if (n.next.compareAndSet(null, node)) {
                if (n != t) {
                    tail.compareAndSet(t, n);
                }
                return true;
            }
            // re-read next and retry
            continue;
        }

        if (s == n) { // linked to self
            t2 = tail.get();
            if (t2 != t) { // tail changed
                t = t2;
                n = t2;
            } else {
                n = head.get();
            }
        } else {
            if (n != t) {
                t2 = tail.get();
                if (t2 != t) { // tail changed
                    t = t2;
                    n = t2;
                    continue;
                }
            }
            n = s;
        }
    }
}

第一步判断value,因为规则2导致值不能为null。第二部创建节点。注意,这一步创建时,CLQ Node在构造函数里使用了unsafe的putObject,而不是直接写volatile。这里同样是为了减少volatile write引起的memory barrier,用一个普通变量的写来代替。因为是offer线程创建的节点,运行offer线程的CPU以外的CPU不可能有value的拷贝,所以这样做是安全的。

接下来尝试找到最后的节点,即next为null的节点,并尝试设置为当前节点。如果成功,在当前节点不是tail的时候,推进tail。这里也是一个优化,为了减少CAS。M&S算法原本在成功设置后会CAS tail为新节点,offer只有在看到的tail节点不是最新时才这么做。这样的话,两个线程同时追加,M&S算法最坏可能有6次CAS

  1. 线程A CAS tail.next为新节点
  2. 线程B CAS tail.next失败
  3. 线程B CAS推进tail(helping)
  4. 线程A CAS tail为新节点(失败,但是被忽略)
  5. 线程B CAS 新的tail.next为新节点
  6. 线程B CAS推进tail

而offer代码中

  1. 线程A CAS tail.next为新节点
  2. 线程B CAS tail.next失败
  3. (线程A不会推进tail)
  4. 线程B 读取tail.next得到新的tail,CAS tail.next为新节点
  5. 线程B CAS推进tail

总共只有4次,去掉了一次helping和一次CAS推进。仔细想想,确实作为一个并发队列不一定要和串行队列一样保证tail肯定指向最后一个元素,去掉这个要求的话,可以减少CAS操作。当然没了M&S算法标志的helping,offer很难看出来是基于M&S算法。

第二种是自己指向自己,根据规则3,当前节点已失效,需要重新遍历。这里offer算法会尝试重新读取tail或者head。理论上来说,一旦有新元素被追加了,那么失效的tail肯定会重新指向最新的tail,这时选择tail更合算。另外一种情况是节点追加后tail未推进,但是poll线程把所有元素都设置为null了,即队列中没有元素了,因为tail没有变化,通过当前的元素的next也无法访问到真正的尾部元素,只能选择head,重新从head开始遍历。

第三种情况是tail未推进,或者竞争失败,这时可以选择当前节点的next,也可以重新读取tail。CLQ这里在当前节点不是tail的时候选择重新读取,否则直接使用next。源代码有些难懂,这里把分支展开便于理解。

以上就是CLQ核心部分的算法。整体来说,因为加了好几个优化,导致实际代码和M&S算法相去甚远。作为参考,给出一个我写的简化版M&S算法实现。

public class LinkedQueue1<T> extends AbstractQueue<T> {

    private final AtomicReference<Node<T>> head;
    private final AtomicReference<Node<T>> tail;

    public LinkedQueue1() {
        Node<T> sentinel = new Node<>();
        head = new AtomicReference<>(sentinel);
        tail = new AtomicReference<>(sentinel);
    }

    @Override
    @Nonnull
    public Iterator<T> iterator() {
        throw new UnsupportedOperationException();
    }

    @Override
    public int size() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean offer(T value) {
        final Node<T> n = new Node<>(value);
        Node<T> t; // tail
        Node<T> s; // successor
        while (true) {
            t = tail.get();
            s = t.next.get();
            if (s != null) {
                tail.compareAndSet(t, s); // help
                System.out.println("help");
            } else if (t.next.compareAndSet(null, n)) {
                tail.compareAndSet(t, n);
                return true;
            }
        }
    }

    @Override
    public T poll() {
        Node<T> h; // head
        Node<T> s; // successor
        while (true) {
            h = head.get();
            s = h.next.get();
            if (s == null) {
                throw new IllegalStateException("queue is empty");
            }
            if (head.compareAndSet(h, s)) {
                return s.value;
            }
        }
    }

    @Override
    public T peek() {
        final Node<T> s = head.get().next.get();
        return s != null ? s.value : null;
    }

    private static class Node<T> {
        final T value;
        final AtomicReference<Node<T>> next = new AtomicReference<>(null);

        Node() {
            this(null);
        }

        Node(T value) {
            this.value = value;
        }
    }
}

如果你有兴趣,你可以对比CLQ看一看哪些地方被修改了。

顺便说一句,网上部分介绍CLQ的文章都只是就代码进行分析,而不是从M&S算法开始讲解CLQ的改进。这样读者就难以理解为什么要这么做以及原来是怎么做的。如果你想深入了解的话,建议还是看一下M&S算法,并发编程领域的知识个人觉得还是需要循序渐进地学比较好。希望我的分析对你有用。