Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(1)


最近个人在研究如何实现Transaction Memory,这其实也是受《the art of multiprocessor programming》最后一章的影响。作为研究的一部分,个人需要一个类似ReadWriteLock语义的同步器,于是分析了一下现有ReentrantLock和ReentrantReadWriteLock。在分析之前我其实知道ReentrantLock和ReentrantReadWriteLock都是基于AbstractQueuedSynchronizer,所以本篇也可以作为AbstractQueuedSynchronizer的不完全分析。

首先从ReentrantLock开始分析。最简单的不公平模式,无等待(SPIN,PARK)的锁。

SimpleLock

import javax.annotation.Nonnull;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@SuppressWarnings("Duplicates")
public class SimpleLock implements Lock {
    // reentrant count
    private final AtomicInteger count = new AtomicInteger(0);
    private Thread owner;

    public boolean tryLock() {
        if (owner == Thread.currentThread()) {
            count.incrementAndGet();
            return true;
        }
        if (count.get() == 0 && count.compareAndSet(0, 1)) {
            owner = Thread.currentThread();
            return true;
        }
        return false;
    }

    public void unlock() {
        if (owner != Thread.currentThread()) {
            throw new IllegalMonitorStateException("attempt to unlock without holding lock");
        }
        int c = count.get();
        if (c < 1) {
            throw new IllegalStateException("count < 1 when unlock");
        }
        if (c == 1) {
            owner = null;
        }
        count.set(c - 1);
    }

    @Override
    public void lock() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException {
        throw new UnsupportedOperationException();
    }

    @Override
    @Nonnull
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}

SimpleLock只支持tryLock和unlock两个操作。tryLock是非公平模式,即两个线程A和B同时调用tryLock时,有可能A拿到锁,也有可能B拿到锁,没有严格的顺序保证。

失败时返回结果为false。调用方需要检查返回值来决定如何操作,比如不断尝试获取的话就是SPIN策略。严格来说,上面的锁不大符合一般锁的使用方式,但是可以作为锁内部是如何运作的参考。

锁的实现一般都依赖CAS。这里有一个叫count的原子变量,值是获取了锁的线程重入的次数。0的话表示没有被任何线程获取,1表示已被某个线程获取,超过1表示被某个线程多次获取。实现重入的方法是owner变量。在tryLock中如果owner为调用线程,则表示重入。同时在第一次获取后需要设置owner为获取了锁的线程,以及在unlock时,在重入线程最后一次unlock时,清空owner。

一个问题,为什么这里owner可以不加volatile?首先owner只对获取了锁的线程有意义,tryLock中设置和unlock中的删除都是已经获取了锁的线程在操作,不存在多个线程写的场景。其次,tryLock中读取owner判断是否重入的代码,对已经获取了锁的线程是肯定没有问题的,对于没有获取锁的线程也是安全的。后者的原因是

  1. unlock中对count的设置隐含memory barrier,会设置其他所有CPU cache中owner的值无效
  2. 对于想要获取锁的线程来说,tryLock是第一个执行的方法或者是在其他线程完全unlock后才能执行的方法

第一条不难理解,事实上,锁带来的“可见性“都是由原子变量的memory barrier引发的“副作用”。第二条简单来说是count不为0时,没有任何其他线程可以获取锁。两条合起来,当一个线程获取了锁之后,其他的线程再调用tryLock时,自己的CPU cache中owner为

  • 空,尝试获取
  • 被设置为无效,尝试获取
  • 获取了锁的线程

不管是哪种,owner不会是错误的值,即没有持有锁的线程。

ReentrantLock中没有直接的owner变量,具体是在AbstractOwnableSynchronizer中。如果比较难理解的话,建议查阅一下JMM和CPU Cache相关资料。

顺便说一句,这里owner用volatile也是可以的。只是volatile的写自带memory barrier效果,效率会有所损失。

FairLock1

接下来实现一个公平锁,也就是QueueLock。在竞争时使用park策略,即LockSupport#park。

QueueLock比较有名的是CLHLock和MCSLock等。CLHLock和MCSLock都是spin lock,区别是一个是前向链表,一个是后向链表。AbstractQueuedSynchronizer在注释中提到基于的是CLHLock,所以这里也基于CLHLock,即前向链表来实现。

QueueLock的特点就是,有线程想要获取锁时,它会被加入一个队列。队列中的线程会按照顺序获取锁。比如下面的队列(图上半部分)

 

 

队首是一个哨兵节点,后面有三个线程想要获取锁,所以是三个节点,最后一个节点N3在队尾。当N1可以获取锁时,N1变成队首节点(图下半部分)。

为什么需要哨兵节点?不能像普通队列一样先进先出么?这是一个很好的问题。事实上CLHLock和基于M&S算法的无锁队列,都有一个哨兵节点。我个人能回答的是因为需要操作head和tail两个指针,如果没有哨兵节点的话,同时CAS两个指针比较困难。有兴趣的话可以自己试试看无锁队列的实现。

使用spin策略的QueueLock比如CLHLock由于是后续节点检查前置节点的状态来决定时候可以推进的,所以在换到park策略时,第一个要考虑的问题是谁来unpark。答案很明显,前置节点。但是由于是CLHLock,不能直接获取后续节点,所以AQS中给节点加了next指针。

在无锁队列的实现中,一般不会有双向队列。原因和上面哨兵节点存在的理由是一样的,无法同时CAS和设置多个位置。具体来说,入队过程是这样的

  1. 创建当前节点
  2. 设置当前节点的previous指针为队列tail指针
  3. CAS队列tail指针为当前节点
  4. 如果失败的话,从步骤2重新开始

严格来说,在步骤3之前,无法确定当前节点是否入队了。只有在入队了之后,才能设置前置节点的next指针。正因为入队和设置next指针之间有时间差,next指针的可靠性要比previous指针要差一些,在查找后续节点时,需要准备两手方案,即

  1. 读取next指针的节点,如果不为null,则可以放心使用
  2. next指针为null时,从tail开始不断访问previous指针,直到找到自己的后续节点

下图的上半部分表示了方案1,下半部分表示了方案2。访问路径为红色。

 

核心概念介绍到此为止,具体代码如下

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;

@SuppressWarnings("Duplicates")
public class FairLock1 implements Lock {

    private final Queue queue = new Queue();
    private final AtomicInteger reentrantTimes = new AtomicInteger(0);
    private Thread owner;

    public void lock() {
        if (owner == Thread.currentThread()) {
            // reentrant
            reentrantTimes.incrementAndGet();
            return;
        }
        // OPTIMIZATION POINT: if there's only one thread, node could be eliminated
        Node node = new Node(Thread.currentThread());
        Node predecessor = queue.enqueue(node);
        // PROBLEM: thread may be signaled here
        if (predecessor == queue.head.get() && reentrantTimes.get() == 0) {
            myTurn(predecessor, node);
            return;
        }
        // signal only once
        LockSupport.park(this);
        // predecessor must be head and reentrant times is 0
        myTurn(predecessor, node);
    }

    private void myTurn(@Nonnull Node predecessor, @Nonnull Node node) {
        owner = Thread.currentThread();
        node.clearThread();
        queue.head.set(node);
        reentrantTimes.set(1);
        // help GC
        node.predecessor.set(null);
        predecessor.successor.set(null);
    }

    public void unlock() {
        if (owner != Thread.currentThread()) {
            throw new IllegalStateException("not the thread holding lock");
        }
        int rt = reentrantTimes.get();
        if (rt < 1) {
            throw new IllegalStateException("reentrant times < 1 when try to unlock");
        }
        if (rt > 1) {
            reentrantTimes.set(rt - 1);
            return;
        }
        // rt == 1
        owner = null;
        reentrantTimes.set(0);

        // signal successor
        Node node = queue.head.get();
        Node successor = queue.findSuccessor(node);
        if (successor != null) {
            LockSupport.unpark(successor.thread.get());
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean tryLock() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException {
        throw new UnsupportedOperationException();
    }

    @Override
    @Nonnull
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }

    @SuppressWarnings("Duplicates")
    private static class Queue {
        final AtomicReference<Node> head;
        final AtomicReference<Node> tail;

        Queue() {
            Node sentinel = new Node();
            head = new AtomicReference<>(sentinel);
            tail = new AtomicReference<>(sentinel);
        }

        /**
         * Enqueue node.
         *
         * @param node new node
         * @return predecessor
         */
        @Nonnull
        Node enqueue(@Nonnull Node node) {
            Node t;
            while (true) {
                t = tail.get();
                node.predecessor.lazySet(t);
                if (tail.compareAndSet(t, node)) {
                    t.successor.set(node);
                    return t;
                }
            }
        }

        @Nullable
        Node findSuccessor(@Nonnull Node node) {
            Node n = node.successor.get();
            if (n != null) {
                return n;
            }

            // find node from tail
            Node c = tail.get();
            while (c != node) {
                n = c;
                c = c.predecessor.get();
            }
            return n;
        }
    }

    private static class Node {
        final AtomicReference<Thread> thread;
        final AtomicReference<Node> predecessor = new AtomicReference<>();
        // optimization
        final AtomicReference<Node> successor = new AtomicReference<>();

        Node() {
            this(null);
        }

        Node(@Nullable Thread thread) {
            this.thread = new AtomicReference<>(thread);
        }

        void clearThread() {
            thread.set(null);
        }
    }
}

Queue#enqueue为入队代码,Queue#findSuccessor为查找后续节点的代码,原理之前已经介绍过。这里分析一下lock和unlock的代码。

方法lock中,首先检查是否是重入,如果不是的话进入队列。入队之后检查是否可以获取锁,没法获取时再park。最后从park中苏醒的时候可以认定可以获取锁了,所以直接调用myTurn。myTurn通过设置head指针为自己的节点让前置节点出队,这样在释放锁时自己肯定是在队首。这样可以省去通过ThreadLocal保存自己的节点。

注意入队之后不能立马park,否则没人能获取锁了,大家都入队后park掉了。其次myTurn中为了帮助GC,清除了自己的previous指针和前置节点的next指针。

方法unlock中,在修改了count之后,尝试唤醒后续节点。

提问:这里是否存在持有锁的线程在unlock时,新线程加入并park之后,持有锁的线程没有唤醒新线程的可能?

这里可以分别简化lock和unlock代码后,分析交叉运行会得到什么结果。

lock:
  enqueue
  try acquire
  park
  acquire

unlock:
  release
  signal successor

假如持有锁的线程为A,新线程为B。

  1. B的enqueue在A的release之前执行
    1. B的try acquire在A的release之前执行,B会park,A的signal successor会发现B,执行正确
    2. B的try acquire在A的release之后执行,B获取锁成功,B不会park,但是A会signal B
  2. B的enqueue在A的release之后执行,B的try acquire会成功,B不会睡死,但是A会signal B
  3. B的enqueue在A的signal successor后执行,A的signal successor不会发现B,但是B的try acquire会成功,执行正确

结论B不存在睡死的情况。

注意这里的分析2,假如B在正常运行时被A unpark了的话,B会怎么样?这里涉及到LockSupport具体实现。简单来说,unpark给予线程一个许可,park消费一个许可。park时如果有许可时会直接消费掉并返回,所以除了park/unpark执行序列之外,unpark/park也是可以执行的(个人不确定这是否算正确)。

在本次A释放,B获取锁时,B不会表现出问题,但是在A再次获取了锁(还没有释放),B再次尝试获取锁时,由于B已经有了一个许可,所以park会直接返回,导致A和B同时获取了锁!这是FairLock1存在的一个问题。

如果你看得仔细的话,你可能注意到myTurn里有调用clearThread。也就是说,线程在获取到了锁之后,会将自己的节点的node中的thread清除。如果你看过LockSupport的unpark方法的话,你可能知道unpark在输入参数thread为null不会做任何事情。这里调用clearThread的目的也是减少重复或者说意外的unpark。实际中分析2和1.2的发生可能性会进一步降低。

再深入分析一下1.2。线程B在获取了锁之后,除了clearThread之外,还会推进head。有没有一种可能:

  1. A unlock
  2. B lock成功并推进head
  3. C enqueue了节点
  4. A尝试唤醒head(B)的后续节点(C)

结论是可能的,这也是FairLock1存在的另外一个问题。

除了FairLock1存在的问题之外,FairLock1的注释中也提到,对于没有竞争的情况,其实没有必要 入队>尝试获取 的过程,直接尝试获取也是OK的。当然,这么做的话,当前的锁不再是公平锁了。

UnfairLock1

假如我们要实现非公平锁,其实可以参照之前的SimpleLock,在检查是否重入之后,直接尝试是否可以获取锁。成功的话,直接返回。这里的问题是,FairLock1的一些规则是否还正确。

  1. 线程肯定对应一个节点,不再成立
  2. unlock时队首节点为自己,不再成立

特别是第二条,队首节点不是自己的话,就不能直接唤醒后续节点了。为此,需要引入一个机制,在真正需要唤醒的时候,让前置节点唤醒自己。

其次,为了解决FairLock1中多余许可和意外唤醒可能引起的问题,lock中从park中醒来之后需要重新检查是否可以获取锁,即死循环,而不是认定可以获取锁了。顺便说一句,AQS中unpark引起的多余许可问题也没有完全解决,主体代码也是一个死循环。

最后,AQS认为无竞争的锁不需要哨兵节点,所以head指针和tail指针是惰性初始化的。个人认为这是一个优化,不是必须的。但是为了更接近AQS,UnfairLock1中也使用了惰性初始化。

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;

@SuppressWarnings("Duplicates")
public class UnfairLock1 implements Lock {

    private final Queue queue = new Queue();
    private final AtomicInteger reentrantTimes = new AtomicInteger(0);
    private Thread owner;

    public void lock() {
        if (owner == Thread.currentThread()) {
            // reentrant
            reentrantTimes.incrementAndGet();
            return;
        }
        if (reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
            owner = Thread.currentThread();
            /*
             * owner is visible to current thread
             * in theory, it's ok for other thread not to see the latest value
             */
            return;
        }
        Node node = new Node(Thread.currentThread());
        Node predecessor = queue.enqueue(node);
        // PROBLEM: thread may be signaled here
        while (true) {
            if (predecessor == queue.head.get() &&
                    reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
                myTurn(predecessor, node);
                return;
            }
            if (predecessor.signalSuccessor.get()) {
                LockSupport.park(this);
            } else {
                predecessor.signalSuccessor.set(true);
            }
        }
    }

    private void myTurn(@Nonnull Node predecessor, @Nonnull Node node) {
        owner = Thread.currentThread();
        node.clearThread();
        queue.head.set(node);
        node.predecessor.set(null);
        predecessor.successor.set(null);
    }

    public void unlock() {
        if (owner != Thread.currentThread()) {
            throw new IllegalStateException("not the thread holding lock");
        }
        int rt = reentrantTimes.get();
        if (rt < 1) {
            throw new IllegalStateException("reentrant times < 1 when try to unlock");
        }
        if (rt > 1) {
            reentrantTimes.set(rt - 1);
            return;
        }
        // rt == 1
        owner = null;
        reentrantTimes.set(0);

        Node node = queue.head.get();
        if (node != null && node.signalSuccessor.get() &&
                node.signalSuccessor.compareAndSet(true, false)) {
            Node successor = queue.findSuccessor(node);
            if (successor != null) {
                LockSupport.unpark(successor.thread.get());
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean tryLock() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException {
        throw new UnsupportedOperationException();
    }

    @Override
    @Nonnull
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }

    @SuppressWarnings("Duplicates")
    private static class Queue {
        final AtomicReference<Node> head = new AtomicReference<>();
        final AtomicReference<Node> tail = new AtomicReference<>();

        /**
         * Enqueue node.
         *
         * @param node new node
         * @return predecessor
         */
        @Nonnull
        Node enqueue(@Nonnull Node node) {
            Node t;
            while (true) {
                t = tail.get();
                if (t == null) {
                    // lazy initialization
                    Node sentinel = new Node();
                    if (head.get() == null && head.compareAndSet(null, sentinel)) {
                        tail.set(sentinel);
                    }
                } else {
                    node.predecessor.lazySet(t);
                    if (tail.compareAndSet(t, node)) {
                        t.successor.set(node);
                        return t;
                    }
                }
            }
        }

        @Nullable
        Node findSuccessor(@Nonnull Node node) {
            Node n = node.successor.get();
            if (n != null) {
                return n;
            }

            // find node from tail
            Node c = tail.get();
            // tail maybe null during lazy initialization
            while (c != null && c != node) {
                n = c;
                c = c.predecessor.get();
            }
            return n;
        }
    }

    private static class Node {
        final AtomicReference<Thread> thread;
        final AtomicBoolean signalSuccessor = new AtomicBoolean(false);
        final AtomicReference<Node> predecessor = new AtomicReference<>();
        // optimization
        final AtomicReference<Node> successor = new AtomicReference<>();

        Node() {
            this(null);
        }

        Node(@Nullable Thread thread) {
            this.thread = new AtomicReference<>(thread);
        }

        void clearThread() {
            thread.set(null);
        }
    }
}

head指针和tail指针惰性初始化带来以下影响

  1. 入队时可能需要初始化,先head再tail的顺序
  2. 查找后续节点时,tail可能为null(虽然从调用顺序上来说tail不会为null)
  3. 唤醒后续节点时,head可能为null

第一条理论上先初始化tail再head也是可行的,因为惰性初始化是CAS(head or tail, null, sentinel)的过程,不存在多次初始化,同时入队的情况下,保存了sentinel节点就不会有问题。AQS里应该是其他方法依赖这个顺序,比如hasQueuedPredecessors导致的。

节点上增加了signal successor标志,在head节点不为null并且signal successor被设置的情况下,唤醒后续节点。请问这里是否有新节点设置了signal successor但是没有被唤醒的情况?

同样分析lock与unlock的交叉执行。

lock:
  try acquire 1
  enqueue
  try acquire 2
  set signal successor
  try acquire 3
  park -> try acquire -> park...

unlock:
  release
  if signal successor
    find successor
    unpark

注意,lock中设置好标志之后,会再执行一次check(AQS也是一样),所有在park之前有三次try acquire。

持有锁的线程为A,新线程为B。

  1. B的try acquire 1在A的release前执行,结果为失败
    1. enqueue在release之前
      1. try acquire 2在release前,结果为失败
        1. set signal successor在release前
          1. try acquire 3在release前,try acquire 3执行失败,B入队,设置了A的标志,B park,A尝试唤醒B,B成功获取锁,最终结果正确
          2. try acquire 3在release后,try acquire 3执行成功,B入队,设置了A的标志,A尝试唤醒B,B成功获取锁,最终结果多余的unpark许可
        2. set signal successor在release和标志检查中,try acquire 3执行成功,B入队,设置了A的标志,A尝试唤醒B,B成功获取锁,最终结果多余的unpark许可
        3. set signal successor在标志检查后,try acquire 3执行成功,B入队,设置了A的标志,A不会唤醒B,B成功获取锁,最终结果正确
      2. try acquire 2在release后,try acquire 2执行成功,B入队,没有设置A的标志,A不会唤醒B,B成功获取锁,最终结果正确
    2. enqueue在release之后,try acquire 2执行成功,B入队,没有设置A的标志,A不会唤醒B,B成功获取锁,最终结果正确
  2. B的try acquire 1在A的release后执行,结果为成功,B没有入队,A不会唤醒B,B成功获取锁,最终结果正确

结论是不会发生没有被唤醒的情况。当然多余的unpark许可问题依旧存在。

注意这里FairLock1的A错位唤醒了C的问题没有在这里分析,一方面上面的执行分支已经足够复杂,另一方面即使错位唤醒了,死循环下的重复检查可以保证不会影响正确性,还有B在unlock之后也会尝试唤醒C。

接下来需要考虑的一个问题是增加了signal successor标志了,是否可以避免unlock时被唤醒两次的问题?

结论是仍旧存在,事实上非公平模式下的AQS存在这个问题(注意,UnfairLock1的unlock和AQS的release代码不同)。

考虑线程A,B,C的执行序列

  1. 线程A以非公平模式获取了锁
  2. 线程B尝试获取锁,失败,进入队列
  3. 线程A unlock,设置count为0,此后,新的线程可以以非公平模式获取锁
  4. 线程C以非公平模式获取了锁,unlock,同样执行到尝试唤醒后续节点的地方
  5. 此时A和C都可以看到后续节点,即线程B的节点,此时队列如上图

在AQS的release中,只检查后续节点是否需要唤醒,没有类似UnfairLock这里检查CAS结果的代码,所以A和C会同时唤醒B,相对的UnfairLock1只会唤醒一次。

当然,这并不是大的bug,在使用park策略的锁里面,未被唤醒比重复唤醒引起的问题要严重。请记住这点。

第三个问题,既然unlock有可能重复唤醒,那么每个线程用ThreadLocal持有自己的节点(以非公平模式获取的线程持有节点为null)如何?因为unlock时只需关注自己的后续节点。

结论是不行。同样考虑上面的执行序列和队列,当A和C都以非公平模式获取了锁之后,持有节点为null,根本无法唤醒后续节点,即线程B的节点。事实上,没有线程C时,A也无法唤醒B。当A以非公平模式获取了锁之后,B进入队列等待,A在unlock时根本不知道后续节点是谁,所以无法唤醒B。

假如你又提出,当自己所持节点为null和,就查看head的话,建议考虑一下unlock时head节点是什么。

  1. null,无竞争时
  2. 刚unlock了的线程的节点,即线程以公平模式获取了锁
  3. 等价于哨兵节点,当线程以非公平模式获取了的话

作为参考,考虑一下unlock时尝试唤醒后续节点的有可能是哪些线程

  1. 非公平方式获取了锁又释放的线程
  2. 以公平方式获取了锁又释放的线程
  3. 上述的组合,注意
    1. 以公平方式获取了锁的线程个数:0~1
    2. 非公平方式获取了锁的线程个数:0~n

合起来考虑的话,不管是公平模式还是非公平模式下获取了锁的线程,都应该访问head,所以理论上ThreadLocal是不需要的。

小结

可以看到,单从spin lock改成了基于park的queue lock就有那么多复杂的细节问题。总体来说,UnfairLock1比FairLock1要复杂,需要分析的细节更多。

接下来继续要添加的是限时版本的tryLock方法,具体将在第二篇中说明和解释。