Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(2)


接着上篇,本篇主要介绍如何添加限时tryLock方法。介绍完之后,ReentrantLock除了condition之外都会实现,而且基本和实际ReentrantLock代码接近。上篇也提到过,实际ReentrantLock依赖AQS,但是本文不会直接介绍AQS,只是AQS的一个不完全分析。

顺便提一下,“自己写ReentrantLock和ReentrantReadWriteLock”由于篇幅比较长,预计会分成3到4篇左右。

在进入限时tryLock方法的介绍之前,考虑一个问题:上篇中FairLock1和UnfairLock1是否可以复用Node?

为什么要问这个问题?因为C++代码比较关注对象的生命周期。使用C++实现锁的话必须关注何时可以回收Node的内存。如果你了解CLHLock的话,可以知道CLHLock是可以复用Node的,最多是N个(线程)+1个(哨兵)Node,理论上不需要回收。所以和CLHLock基本一致的FairLock1,同样可以用类似CLHLock的方式,即复用前序节点为自己的当前节点(CLHLock是复用前序节点的,至于为什么可以参阅CLHLock的介绍)。UnfairLock1中由于可能是非公平模式,不知道前序节点是谁,也就无法简单复用了。

 

继续考虑限时获取锁的时候,Node是否可以复用?老实说,个人不知道确切答案,但是即使是在公平模式下,限时tryLock想要复用Node比UnfairLock1也要难很多。比如线程B获取锁超时,但是之后线程C进入了队列,这时线程B再次尝试获取锁的时候会怎么样?首先B由于获取失败,无法复用前序节点。其次C有可能持有B的节点指针,也有可能跳过B链到了B的前序节点上。因此,实现带超时tryLock时一般会考虑每次都新建Node来减少复杂性。

可能因为示例代码是Java,所以每次new一个Node出来没什么感觉。但是假如你用C++实现带超时版本的tryLock的话,必须考虑谁来回收Node的内存。有兴趣的可以看一下相关带超时的锁的论文,比如TOLock。

第二个要考虑的问题是,允许限时会带来什么影响?

  1. 前序节点有可能已超时
  2. 后续节点有可能已超时
  3. (除了哨兵节点)head不可能是超时节点

在上图中,C的前序节点B已超时,为了让A能唤醒自己,C需要设置自己的previous指针到A以及A的next指针为自己。当然signal successor标志也是要设置的。对于A来说,后续节点B已超时,A需要唤醒自己的后续节点中没有超时的第一个节点,即C。第三点不难理解,head的推进是由已经获取锁的线程处理的,所以新的head指针肯定对应一个非超时的线程。

需要继续考虑第一和第二点。当A和C同时执行时,如何设计包括B在内的一个机制,保证C肯定会被唤醒(允许多次唤醒)?

UnfairTimedLock1

以下是基于AQS改造的限时锁,具体分析下如何保证唤醒线程C。

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;

@SuppressWarnings("Duplicates")
public class UnfairTimedLock1 implements Lock {

    private final Queue queue = new Queue();
    private final AtomicInteger reentrantTimes = new AtomicInteger(0);
    private Thread owner;

    public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException {
        if (owner == Thread.currentThread()) {
            // reentrant
            reentrantTimes.incrementAndGet();
            return true;
        }
        if (reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
            owner = Thread.currentThread();
            return true;
        }
        final long deadline = unit.toNanos(time) + System.nanoTime();
        Node node = new Node(Thread.currentThread());
        Node predecessor = queue.enqueue(node);
        long nanos;
        while (true) {
            if (predecessor == queue.head.get() &&
                    reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
                myTurn(predecessor, node);
                return true;
            }
            nanos = deadline - System.nanoTime();
            // timeout
            if (nanos <= 0L) {
                abort(predecessor, node);
                return false;
            }
            switch (predecessor.status.get()) {
                case Node.STATUS_ABORTED:
                    predecessor = queue.skipAbortedPredecessors(predecessor, node);
                    predecessor.successor.set(node);
                    break;
                case Node.STATUS_SIGNAL_SUCCESSOR:
                    LockSupport.parkNanos(this, nanos);
                    break;
                case Node.STATUS_NORMAL:
                    /*
                     * recheck is required after CAS
                     * 1. CAS failed
                     * 2. CAS successfully, but status changed to ABORTED before parking
                     * 3. predecessor unlock between first check and CAS(no unpark)
                     */
                    predecessor.status.compareAndSet(Node.STATUS_NORMAL, Node.STATUS_SIGNAL_SUCCESSOR);
                    break;
            }
            if (Thread.interrupted()) {
                abort(predecessor, node);
                throw new InterruptedException();
            }
        }
    }

    private void abort(@Nonnull Node predecessor, @Nonnull Node node) {
        node.clearThread();

        Node p = queue.skipAbortedPredecessors(predecessor, node);
        Node ps = p.successor.get();

        // linearization point
        node.status.set(Node.STATUS_ABORTED);

        /*
         * at end
         *
         * A   -> |<B>
         * ANY -> |ABORTED
         *
         * no lost-wakeup problem
         */
        if (queue.tail.get() == node && queue.tail.compareAndSet(node, p)) {
            /*
             * failure is ok, which means
             * new node may enqueue between removing and setting successor
             */
            p.successor.compareAndSet(ps, null);
            return;
        }

        /*
         * at beginning
         *
         * A   -> |<B>       -> C
         * ANY -> |ABORTED   -> ANY
         *
         * lost-wakeup problem may happen
         *
         * scenarios
         * 1. B didn't set flag of A
         *
         * 2. B was signaled
         * sequence
         *    a. B set flag
         *    b. A signaled B
         *    c. B aborted
         */
        if (p == queue.head.get()) {
            signalNormalSuccessor(node);
            return;
        }

        /*
         * in middle
         *
         * A   -> |B   -> <C>     -> D
         * ANY -> |ANY -> ABORTED -> ANY
         *
         * lost-wakeup problem may happen
         *
         * conditions
         * 1. no one set flag of B
         * 1.1 D set flag of C before C aborts
         * 1.2 C didn't set the flag of B
         *
         * 2. B acquired lock and finished processing after p == head check
         *
         * first, try to set flag of B, then recheck if predecessor finished(unlocked or aborted)
         */
        if (p.ensureSignalSuccessorStatus() && p.thread.get() != null) {
            Node s = node.successor.get();
            if (s != null && s.status.get() != Node.STATUS_ABORTED) {
                p.successor.compareAndSet(ps, s);
            }
        } else {
            signalNormalSuccessor(node);
        }
    }

    private void myTurn(@Nonnull Node predecessor, @Nonnull Node node) {
        owner = Thread.currentThread();
        node.clearThread();
        queue.head.set(node);
        node.predecessor.set(null);
        predecessor.successor.set(null);
    }

    private void signalNormalSuccessor(@Nonnull Node node) {
        Node successor = queue.findNormalSuccessor(node);
        if (successor != null) {
            LockSupport.unpark(successor.thread.get());
        }
    }

    public void unlock() {
        if (owner != Thread.currentThread()) {
            throw new IllegalStateException("not the thread holding lock");
        }
        int rt = reentrantTimes.get();
        if (rt < 1) {
            throw new IllegalStateException("reentrant times < 1 when try to unlock");
        }
        if (rt > 1) {
            reentrantTimes.set(rt - 1);
            return;
        }
        // rt == 1
        owner = null;
        // linearization point
        reentrantTimes.set(0);

        Node node = queue.head.get();
        if (node != null &&
                node.status.get() == Node.STATUS_SIGNAL_SUCCESSOR &&
                node.status.compareAndSet(Node.STATUS_SIGNAL_SUCCESSOR, Node.STATUS_NORMAL)) {
            signalNormalSuccessor(node);
        }
    }

    @Override
    public void lock() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean tryLock() {
        throw new UnsupportedOperationException();
    }

    @Override
    @Nonnull
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }

    @SuppressWarnings("Duplicates")
    private static class Queue {
        final AtomicReference<Node> head = new AtomicReference<>();
        final AtomicReference<Node> tail = new AtomicReference<>();

        /**
         * Enqueue node.
         *
         * @param node new node
         * @return predecessor
         */
        @Nonnull
        Node enqueue(@Nonnull Node node) {
            Node t;
            while (true) {
                t = tail.get();
                if (t == null) {
                    // lazy initialization
                    Node sentinel = new Node();
                    if (head.get() == null && head.compareAndSet(null, sentinel)) {
                        tail.set(sentinel);
                    }
                } else {
                    node.predecessor.lazySet(t);
                    // linearization point
                    if (tail.compareAndSet(t, node)) {
                        t.successor.set(node);
                        return t;
                    }
                }
            }
        }

        @Nullable
        Node findNormalSuccessor(@Nonnull Node node) {
            Node n = node.successor.get();
            if (n != null && n.status.get() != Node.STATUS_ABORTED) {
                return n;
            }

            // find normal node from tail
            Node c = tail.get();
            n = null;
            // tail maybe null during lazy initialization
            while (c != null && c != node) {
                if (c.status.get() != Node.STATUS_ABORTED) {
                    n = c;
                }
                c = c.predecessor.get();
            }
            return n;
        }

        @Nonnull
        Node skipAbortedPredecessors(@Nonnull Node predecessor, @Nonnull Node node) {
            Node h = head.get();
            Node p = predecessor;
            while (p != h && p.status.get() != Node.STATUS_ABORTED) {
                p = p.predecessor.get();
                /*
                 * set predecessor every time to help successors of
                 * current node to find the normal predecessor more quickly
                 */
                node.predecessor.set(p);
            }
            return p;
        }
    }

    /**
     * Node.
     * <p>
     * Status change:
     * NORMAL -> ABORTED
     */
    private static class Node {
        static final int STATUS_NORMAL = 0;
        static final int STATUS_ABORTED = -1;
        static final int STATUS_SIGNAL_SUCCESSOR = 1;

        /**
         * thread will be null if
         * 1. abort
         * 2. enter mutual exclusion area
         */
        final AtomicReference<Thread> thread;
        final AtomicInteger status = new AtomicInteger(STATUS_NORMAL);
        final AtomicReference<Node> predecessor = new AtomicReference<>();
        // optimization
        final AtomicReference<Node> successor = new AtomicReference<>();

        Node() {
            this(null);
        }

        Node(@Nullable Thread thread) {
            this.thread = new AtomicReference<>(thread);
        }

        boolean ensureSignalSuccessorStatus() {
            int s = this.status.get();
            return s == STATUS_SIGNAL_SUCCESSOR ||
                    (s == STATUS_NORMAL && this.status.compareAndSet(s, STATUS_SIGNAL_SUCCESSOR));
        }

        void clearThread() {
            thread.set(null);
        }
    }
}

首先Node增加了status,替换掉了原先的signal successor标志。status的迁移图如下

总共有4种迁移。一个线程在创建了Node之后,先是NORMAL。有后续节点挂上了之后,被设置为SIGNAL。如果当前线程超时了的话,变成ABORTED(实际也就是直接设置为ABORTED)。除了后续节点所对应的线程通过CAS修改当前线程拥有的节点之外,其余都是当前线程自身修改。

为什么原先是直接设置的,现在要CAS?看了迁移图后相信很容易理解。为了防止后续节点将ABORTED状态修改为SIGNAL。

在此基础上继续分析保证唤醒的机制。

假设线程在设置自己的节点为ABORTED之后,开始解决唤醒问题。

先从最简单的开始,即上图的上部。B没有后续节点,那么肯定没有唤醒问题。这块对应abort方法中node == tail时候的代码。如果CAS tail为前置节点成功的话,说明自己真的是末尾节点,接下来CAS设置前置节点的next指针为空。这里并没有检查CAS的返回值。CAS失败的时候意味着在CAS tail指针之后有新节点入队了。从另一个角度来说,为了防止B的处理覆盖掉新入节点的设置,这里必须用CAS。

接下来分析B为第一个候选节点的情况,即上图的中间部分。要知道B为了让A能够唤醒自己,会设置A的status为SIGNAL。那么B abort时,A的status是SIGNAL还是NORMAL?回答是两者都有可能。tryLock的代码中,在第一次检查nanos是否小于0的时候,有可能超时时间太短,直接被判定为超时的话,A的status为NORMAL。另一种情况是设置A的status为SIGNAL之后,线程被中断。当然还有线程park中被中断,等待超时等情况,此时A的status为SIGNAL。

你可能会问,是否可以一律设置A的状态为SIGNAL,让分析更简单一些?回答是,这样做可能不太好。因为当B明显超时时间过短无法获取锁并放弃了的话,为什么要让A负责唤醒B?虽说多次唤醒比忘记唤醒引起的问题小,但是作为一个同步器还是希望尽量精确唤醒。

继续分析C的情况。

  1. C在B放弃之后,尝试让B唤醒自己,CAS不成功加上再次读取B的status的时候发现B已经放弃了,此时C会越过B尝试设置A的status,并在A的status被确实设置为SIGNAL之后park
  2. C在B放弃之前,尝试让B唤醒自己
    1. 再次读取B的status时为ABORTED,接下来与1一样
    2. 再次读取B的status时为SIGNAL,即B还没放弃,C会park

也就说B,C同时运行时,C有可能在不知道B放弃的情况下park

另一方面对A来说

  1. B已放弃
    1. B未设置A的状态为SIGNAL,A不会唤醒B,唤醒丢失
    2. B设置了A的状态为SIGNAL,A跳过B,唤醒C
    3. 虽然B未设置,但是C设置了A的状态为SIGNAL,A在查找后续节点时跳过了B,唤醒C
  2. B未放弃
    1. B未设置A的状态为SIGNAL,A不会唤醒B,唤醒丢失
    2. B设置了A的状态为SIGNAL,A唤醒B,唤醒丢失

额外的,假设A在unlock时,C以公平模式获取锁成功(非公平模式的话不存在C节点)并且推进了head节点(即,C跳过了B以及head变成了C)的话,A的错位唤醒有可能会唤醒C之后的节点。

可以看到,当B是第一个候选节点的时候,C有可能设置不了A的SIGNAL,A也有在好几种可能唤醒不了C。所以AQS的策略是一律让B唤醒C。UnfairTimedLock1中基本也是参考AQS,只是上述分析全部是我自己考虑出来的。

具体代码在abort方法中判断p(predecessor)是否为head,如果是的是,就无论自己的状态一律唤醒。

顺便说一句,在使用spin策略的限时锁中,全权由C来跳过放弃了的B,省去了上述的分析。

最后是B不是第一候选节点并且不在tail的情况,即上图下面的部分。直觉上来说,由于不是第一候选节点,A要唤醒的是D,C也会跳过B链接到D上,不太可能出现唤醒丢失的问题。这里最关键的是确认D的status一定被设置为SIGNAL。那么D的status有可能是NORMAL么?

看了之前中间部分的分析之后,可以说是有可能的,条件如下

  1. C在B放弃前park了(线程C分析2.2)
  2. B超时时间过短,直接放弃

此时要求B确保D的status为SIGNAL,所以会有abort中ensureSignalSuccessorStatus这个调用。

继续提问,是否只要设置D的status为SIGNAL就可以了?

答案是:除了设置D的status之外,还要确保D没有unlock。假设ensureSignalSuccessorStatus成功,D可能了发生什么呢?

  1. D在ensureSignalSuccessorStatus调用之前获取了锁又释放了锁,此时D不会唤醒C,唤醒丢失
  2. D在ensureSignalSuccessorStatus调用之后释放了锁(获取可以在之前或之后),此时D会唤醒C
  3. D在ensureSignalSuccessorStatus调用之前放弃,ensureSignalSuccessorStatus会失败,C需要A来唤醒
  4. D在ensureSignalSuccessorStatus调用之后放弃,由于D是第一候选节点,按照上面中间部分的说明,D会无条件唤醒后续节点,即C

情况1比较麻烦,检查D是否以及完成的条件只能在myTurn方法中找,实际可用的只有一个,thread == null。当然你也可以选择增加status。情况3由于ensureSignalSuccessorStatus会失败,不用增加判断。

以上分析对应abort最后的代码,如果ensureSignalSuccessorStatus成功和thread != null时才可以认为没有唤醒丢失的问题,其余情况即1和3,需要无条件唤醒C。情况3中有可能A唤醒了C,B也唤醒了C,但是考虑到A可能去唤醒D了,所以此处还是需要B确定C被唤醒。

条件中成功时此时B可以选择帮助D设置next指针为C,CAS失败此处没有影响。

abort中最复杂的分析至此结束。实际AQS的代码把图中中间和下部的情况和在一起考虑。

老实说个人觉得abort时的唤醒分析确实比较复杂,从编码上一律唤醒可能是最简单的,但是AQS分析了一部分并且用排除的方式解决了一部分不需要唤醒的情况,理解这块代码可能是设计UnfairTimedLock1最关键的地方。

abort中还有一些代码,比如最开始的设置thread为null。个人理解这是放弃了的线程减少被额外唤醒用的,比如作为第一个候选节点的时候。上面的分析中,基本都是依赖status来判断节点是否放弃,没有看到用thread == null来判断是否abort的地方。当然如果我的分析错了,欢迎指出。

abort中在设置status为ABORTED之前,还有一段跳过放弃了的节点的代码。个人理解在设置status之前是为了防止后续节点跳过自己简化分析。理论上放在设置status之后也是可以的,甚至不跳过也不会造成太大问题。

说到跳过放弃了的节点,Queue#skipAbortedPredecessors中不是找到了第一个未放弃的节点之后设置自己的previous指针,而是遍历时每次都设置。个人理解是想做多线程下的helping,即A在skip时,A后面的节点在skip时,可以通过A更快找到未放弃的节点。只是考虑到skipAbortedPredecessors时(包括abort方法)节点还没设置status为ABORTED,理论上,A的后续节点遍历到了A就停下了。即使A skip之后,立马abort,后续节点也不需要这种helping。所以个人这块持保留意见(如果AQS的cancelAcquire方法原先是在设置status之后skip的话另当别论)。至少在现在的ReentrantLock中应该是不需要的。

UnfairTimedLock2

在写好了限时版tryLock之后,其他lock方法依样画葫芦得到的最终版。

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;

@SuppressWarnings("Duplicates")
public class UnfairTimedLock2 implements Lock {

    private final Queue queue = new Queue();
    private final AtomicInteger reentrantTimes = new AtomicInteger(0);
    private Thread owner;

    public void lock() {
        if (tryLock()) {
            return;
        }
        Node node = new Node(Thread.currentThread());
        queue.enqueue(node);
        while (true) {
            if (queue.isNextCandidate(node) &&
                    reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
                myTurn(node);
                return;
            }
            if (isReadyToPark(node)) {
                LockSupport.park(this);
            }
        }
    }

    public void lockInterruptibly() throws InterruptedException {
        if (tryLock()) {
            return;
        }
        Node node = new Node(Thread.currentThread());
        queue.enqueue(node);
        while (true) {
            if (queue.isNextCandidate(node) &&
                    reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
                myTurn(node);
                return;
            }
            if (isReadyToPark(node)) {
                LockSupport.park(this);
            }
            if (Thread.interrupted()) {
                abort(node);
                throw new InterruptedException();
            }
        }
    }

    public boolean tryLock() {
        if (owner == Thread.currentThread()) {
            // reentrant
            reentrantTimes.incrementAndGet();
            return true;
        }
        if (reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
            owner = Thread.currentThread();
            return true;
        }
        return false;
    }

    public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException {
        if (tryLock()) {
            return true;
        }
        final long deadline = unit.toNanos(time) + System.nanoTime();
        Node node = new Node(Thread.currentThread());
        queue.enqueue(node);
        long nanos;
        while (true) {
            if (queue.isNextCandidate(node) &&
                    reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
                myTurn(node);
                return true;
            }
            nanos = deadline - System.nanoTime();
            // timeout
            if (nanos <= 0L) {
                abort(node);
                return false;
            }
            if (isReadyToPark(node)) {
                LockSupport.parkNanos(this, nanos);
            }
            if (Thread.interrupted()) {
                abort(node);
                throw new InterruptedException();
            }
        }
    }

    private boolean isReadyToPark(@Nonnull Node node) {
        Node p = node.predecessor.get();
        int s = p.status.get();
        if (s == Node.STATUS_SIGNAL_SUCCESSOR) {
            return true;
        }
        if (s == Node.STATUS_ABORTED) {
            p = queue.skipAbortedPredecessors(node);
            p.successor.set(node);
        } else if (s == Node.STATUS_NORMAL) {
            p.status.compareAndSet(Node.STATUS_NORMAL, Node.STATUS_SIGNAL_SUCCESSOR);
        }
        return false;
    }

    private void abort(@Nonnull Node node) {
        node.clearThread();

        Node p = queue.skipAbortedPredecessors(node);
        Node ps = p.successor.get();

        // linearization point
        node.status.set(Node.STATUS_ABORTED);

        if (queue.tail.get() == node && queue.tail.compareAndSet(node, p)) {
            p.successor.compareAndSet(ps, null);
            return;
        }

        if (p != queue.head.get() && p.ensureSignalSuccessorStatus() && p.thread.get() != null) {
            Node s = node.successor.get();
            if (s != null && s.status.get() != Node.STATUS_ABORTED) {
                p.successor.compareAndSet(ps, s);
            }
        } else {
            signalNormalSuccessor(node);
        }
    }

    private void myTurn(@Nonnull Node node) {
        owner = Thread.currentThread();
        node.clearThread();
        queue.head.set(node);
        reentrantTimes.set(1);
        
        Node predecessor = node.predecessor.get();
        node.predecessor.set(null);
        predecessor.successor.set(null);
    }

    private void signalNormalSuccessor(@Nonnull Node node) {
        if (node.status.get() == Node.STATUS_SIGNAL_SUCCESSOR) {
            node.status.compareAndSet(Node.STATUS_SIGNAL_SUCCESSOR, Node.STATUS_NORMAL);
        }

        Node successor = queue.findNormalSuccessor(node);
        if (successor != null) {
            LockSupport.unpark(successor.thread.get());
        }
    }

    public void unlock() {
        if (owner != Thread.currentThread()) {
            throw new IllegalStateException("not the thread holding lock");
        }
        int rt = reentrantTimes.get();
        if (rt < 1) {
            throw new IllegalStateException("reentrant times < 1 when try to unlock");
        }
        if (rt > 1) {
            reentrantTimes.set(rt - 1);
            return;
        }
        // rt == 1
        owner = null;
        // linearization point
        reentrantTimes.set(0);

        Node node = queue.head.get();
        if (node != null && node.status.get() == Node.STATUS_SIGNAL_SUCCESSOR) {
            signalNormalSuccessor(node);
        }
    }

    @Override
    @Nonnull
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }

    @SuppressWarnings("Duplicates")
    private static class Queue {
        final AtomicReference<Node> head = new AtomicReference<>();
        final AtomicReference<Node> tail = new AtomicReference<>();

        /**
         * Enqueue node.
         *
         * @param node new node
         * @return predecessor
         */
        @Nonnull
        Node enqueue(@Nonnull Node node) {
            Node t;
            while (true) {
                t = tail.get();
                if (t == null) {
                    // lazy initialization
                    Node sentinel = new Node();
                    if (head.get() == null && head.compareAndSet(null, sentinel)) {
                        tail.set(sentinel);
                    }
                } else {
                    node.predecessor.lazySet(t);
                    // linearization point
                    if (tail.compareAndSet(t, node)) {
                        t.successor.set(node);
                        return t;
                    }
                }
            }
        }

        @Nullable
        Node findNormalSuccessor(@Nonnull Node node) {
            Node n = node.successor.get();
            if (n != null && n.status.get() != Node.STATUS_ABORTED) {
                return n;
            }

            // find normal node from tail
            Node c = tail.get();
            n = null;
            // tail maybe null during lazy initialization
            while (c != null && c != node) {
                if (c.status.get() != Node.STATUS_ABORTED) {
                    n = c;
                }
                c = c.predecessor.get();
            }
            return n;
        }

        boolean isNextCandidate(@Nonnull Node node) {
            return node.predecessor.get() == head.get();
        }

        @Nonnull
        Node skipAbortedPredecessors(@Nonnull Node node) {
            Node h = head.get();
            Node p = node.predecessor.get();
            while (p != h && p.status.get() != Node.STATUS_ABORTED) {
                p = p.predecessor.get();
                /*
                 * set predecessor every time to help successors of
                 * current node to find the normal predecessor more quickly
                 */
                node.predecessor.set(p);
            }
            return p;
        }
    }

    /**
     * Node.
     * <p>
     * Status change:
     * NORMAL -> ABORTED
     */
    private static class Node {
        static final int STATUS_NORMAL = 0;
        static final int STATUS_ABORTED = -1;
        static final int STATUS_SIGNAL_SUCCESSOR = 1;

        /**
         * thread will be null if
         * 1. abort
         * 2. enter mutual exclusion area
         */
        final AtomicReference<Thread> thread;
        final AtomicInteger status = new AtomicInteger(STATUS_NORMAL);
        final AtomicReference<Node> predecessor = new AtomicReference<>();
        // optimization
        final AtomicReference<Node> successor = new AtomicReference<>();

        Node() {
            this(null);
        }

        Node(@Nullable Thread thread) {
            this.thread = new AtomicReference<>(thread);
        }

        boolean ensureSignalSuccessorStatus() {
            int s = this.status.get();
            return s == STATUS_SIGNAL_SUCCESSOR ||
                    (s == STATUS_NORMAL && this.status.compareAndSet(s, STATUS_SIGNAL_SUCCESSOR));
        }

        void clearThread() {
            thread.set(null);
        }
    }
}

如果你看过AQS的源代码的话,你可能会注意这里的tryLock和AQS的tryAcquire,isReadyToPark和shouldParkAfterFailedAcquire,abort和cancelAcquire比较接近。有兴趣的话可以对比一下。

小结

限时锁个人认为是比较难实现的一种锁,但是在解决死锁问题等地方会有用。如果你理解了限时锁的分析的话,相信你也能写出类似ReentrantLock一样的锁。

如果你想更全面地了解锁的实现的话,建议阅读《the art of multiprocessor programming》中关于锁的部分。虽然大部分都是spin lock,具体实际基于park代码比较远,但是对训练多线程代码分析和设计很有帮助。

接下来,个人要实现的是ReadWriteLock,具体在第三篇中讲解。


One response to “Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(2)”

  1. 感谢分享!已推荐到《开发者头条》:https://toutiao.io/posts/rdwbpb 欢迎点赞支持!
    使用开发者头条 App 搜索 385148 即可订阅《并发与分布式系统研究》