最近个人在研究如何实现Transaction Memory,这其实也是受《the art of multiprocessor programming》最后一章的影响。作为研究的一部分,个人需要一个类似ReadWriteLock语义的同步器,于是分析了一下现有ReentrantLock和ReentrantReadWriteLock。在分析之前我其实知道ReentrantLock和ReentrantReadWriteLock都是基于AbstractQueuedSynchronizer,所以本篇也可以作为AbstractQueuedSynchronizer的不完全分析。
首先从ReentrantLock开始分析。最简单的不公平模式,无等待(SPIN,PARK)的锁。
SimpleLock
import javax.annotation.Nonnull; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @SuppressWarnings("Duplicates") public class SimpleLock implements Lock { // reentrant count private final AtomicInteger count = new AtomicInteger(0); private Thread owner; public boolean tryLock() { if (owner == Thread.currentThread()) { count.incrementAndGet(); return true; } if (count.get() == 0 && count.compareAndSet(0, 1)) { owner = Thread.currentThread(); return true; } return false; } public void unlock() { if (owner != Thread.currentThread()) { throw new IllegalMonitorStateException("attempt to unlock without holding lock"); } int c = count.get(); if (c < 1) { throw new IllegalStateException("count < 1 when unlock"); } if (c == 1) { owner = null; } count.set(c - 1); } @Override public void lock() { throw new UnsupportedOperationException(); } @Override public void lockInterruptibly() throws InterruptedException { throw new UnsupportedOperationException(); } @Override public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException(); } @Override @Nonnull public Condition newCondition() { throw new UnsupportedOperationException(); } }
SimpleLock只支持tryLock和unlock两个操作。tryLock是非公平模式,即两个线程A和B同时调用tryLock时,有可能A拿到锁,也有可能B拿到锁,没有严格的顺序保证。
失败时返回结果为false。调用方需要检查返回值来决定如何操作,比如不断尝试获取的话就是SPIN策略。严格来说,上面的锁不大符合一般锁的使用方式,但是可以作为锁内部是如何运作的参考。
锁的实现一般都依赖CAS。这里有一个叫count的原子变量,值是获取了锁的线程重入的次数。0的话表示没有被任何线程获取,1表示已被某个线程获取,超过1表示被某个线程多次获取。实现重入的方法是owner变量。在tryLock中如果owner为调用线程,则表示重入。同时在第一次获取后需要设置owner为获取了锁的线程,以及在unlock时,在重入线程最后一次unlock时,清空owner。
一个问题,为什么这里owner可以不加volatile?首先owner只对获取了锁的线程有意义,tryLock中设置和unlock中的删除都是已经获取了锁的线程在操作,不存在多个线程写的场景。其次,tryLock中读取owner判断是否重入的代码,对已经获取了锁的线程是肯定没有问题的,对于没有获取锁的线程也是安全的。后者的原因是
- unlock中对count的设置隐含memory barrier,会设置其他所有CPU cache中owner的值无效
- 对于想要获取锁的线程来说,tryLock是第一个执行的方法或者是在其他线程完全unlock后才能执行的方法
第一条不难理解,事实上,锁带来的“可见性“都是由原子变量的memory barrier引发的“副作用”。第二条简单来说是count不为0时,没有任何其他线程可以获取锁。两条合起来,当一个线程获取了锁之后,其他的线程再调用tryLock时,自己的CPU cache中owner为
- 空,尝试获取
- 被设置为无效,尝试获取
- 获取了锁的线程
不管是哪种,owner不会是错误的值,即没有持有锁的线程。
ReentrantLock中没有直接的owner变量,具体是在AbstractOwnableSynchronizer中。如果比较难理解的话,建议查阅一下JMM和CPU Cache相关资料。
顺便说一句,这里owner用volatile也是可以的。只是volatile的写自带memory barrier效果,效率会有所损失。
FairLock1
接下来实现一个公平锁,也就是QueueLock。在竞争时使用park策略,即LockSupport#park。
QueueLock比较有名的是CLHLock和MCSLock等。CLHLock和MCSLock都是spin lock,区别是一个是前向链表,一个是后向链表。AbstractQueuedSynchronizer在注释中提到基于的是CLHLock,所以这里也基于CLHLock,即前向链表来实现。
QueueLock的特点就是,有线程想要获取锁时,它会被加入一个队列。队列中的线程会按照顺序获取锁。比如下面的队列(图上半部分)
队首是一个哨兵节点,后面有三个线程想要获取锁,所以是三个节点,最后一个节点N3在队尾。当N1可以获取锁时,N1变成队首节点(图下半部分)。
为什么需要哨兵节点?不能像普通队列一样先进先出么?这是一个很好的问题。事实上CLHLock和基于M&S算法的无锁队列,都有一个哨兵节点。我个人能回答的是因为需要操作head和tail两个指针,如果没有哨兵节点的话,同时CAS两个指针比较困难。有兴趣的话可以自己试试看无锁队列的实现。
使用spin策略的QueueLock比如CLHLock由于是后续节点检查前置节点的状态来决定时候可以推进的,所以在换到park策略时,第一个要考虑的问题是谁来unpark。答案很明显,前置节点。但是由于是CLHLock,不能直接获取后续节点,所以AQS中给节点加了next指针。
在无锁队列的实现中,一般不会有双向队列。原因和上面哨兵节点存在的理由是一样的,无法同时CAS和设置多个位置。具体来说,入队过程是这样的
- 创建当前节点
- 设置当前节点的previous指针为队列tail指针
- CAS队列tail指针为当前节点
- 如果失败的话,从步骤2重新开始
严格来说,在步骤3之前,无法确定当前节点是否入队了。只有在入队了之后,才能设置前置节点的next指针。正因为入队和设置next指针之间有时间差,next指针的可靠性要比previous指针要差一些,在查找后续节点时,需要准备两手方案,即
- 读取next指针的节点,如果不为null,则可以放心使用
- next指针为null时,从tail开始不断访问previous指针,直到找到自己的后续节点
下图的上半部分表示了方案1,下半部分表示了方案2。访问路径为红色。
核心概念介绍到此为止,具体代码如下
import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; @SuppressWarnings("Duplicates") public class FairLock1 implements Lock { private final Queue queue = new Queue(); private final AtomicInteger reentrantTimes = new AtomicInteger(0); private Thread owner; public void lock() { if (owner == Thread.currentThread()) { // reentrant reentrantTimes.incrementAndGet(); return; } // OPTIMIZATION POINT: if there's only one thread, node could be eliminated Node node = new Node(Thread.currentThread()); Node predecessor = queue.enqueue(node); // PROBLEM: thread may be signaled here if (predecessor == queue.head.get() && reentrantTimes.get() == 0) { myTurn(predecessor, node); return; } // signal only once LockSupport.park(this); // predecessor must be head and reentrant times is 0 myTurn(predecessor, node); } private void myTurn(@Nonnull Node predecessor, @Nonnull Node node) { owner = Thread.currentThread(); node.clearThread(); queue.head.set(node); reentrantTimes.set(1); // help GC node.predecessor.set(null); predecessor.successor.set(null); } public void unlock() { if (owner != Thread.currentThread()) { throw new IllegalStateException("not the thread holding lock"); } int rt = reentrantTimes.get(); if (rt < 1) { throw new IllegalStateException("reentrant times < 1 when try to unlock"); } if (rt > 1) { reentrantTimes.set(rt - 1); return; } // rt == 1 owner = null; reentrantTimes.set(0); // signal successor Node node = queue.head.get(); Node successor = queue.findSuccessor(node); if (successor != null) { LockSupport.unpark(successor.thread.get()); } } @Override public void lockInterruptibly() throws InterruptedException { throw new UnsupportedOperationException(); } @Override public boolean tryLock() { throw new UnsupportedOperationException(); } @Override public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException(); } @Override @Nonnull public Condition newCondition() { throw new UnsupportedOperationException(); } @SuppressWarnings("Duplicates") private static class Queue { final AtomicReference<Node> head; final AtomicReference<Node> tail; Queue() { Node sentinel = new Node(); head = new AtomicReference<>(sentinel); tail = new AtomicReference<>(sentinel); } /** * Enqueue node. * * @param node new node * @return predecessor */ @Nonnull Node enqueue(@Nonnull Node node) { Node t; while (true) { t = tail.get(); node.predecessor.lazySet(t); if (tail.compareAndSet(t, node)) { t.successor.set(node); return t; } } } @Nullable Node findSuccessor(@Nonnull Node node) { Node n = node.successor.get(); if (n != null) { return n; } // find node from tail Node c = tail.get(); while (c != node) { n = c; c = c.predecessor.get(); } return n; } } private static class Node { final AtomicReference<Thread> thread; final AtomicReference<Node> predecessor = new AtomicReference<>(); // optimization final AtomicReference<Node> successor = new AtomicReference<>(); Node() { this(null); } Node(@Nullable Thread thread) { this.thread = new AtomicReference<>(thread); } void clearThread() { thread.set(null); } } }
Queue#enqueue为入队代码,Queue#findSuccessor为查找后续节点的代码,原理之前已经介绍过。这里分析一下lock和unlock的代码。
方法lock中,首先检查是否是重入,如果不是的话进入队列。入队之后检查是否可以获取锁,没法获取时再park。最后从park中苏醒的时候可以认定可以获取锁了,所以直接调用myTurn。myTurn通过设置head指针为自己的节点让前置节点出队,这样在释放锁时自己肯定是在队首。这样可以省去通过ThreadLocal保存自己的节点。
注意入队之后不能立马park,否则没人能获取锁了,大家都入队后park掉了。其次myTurn中为了帮助GC,清除了自己的previous指针和前置节点的next指针。
方法unlock中,在修改了count之后,尝试唤醒后续节点。
提问:这里是否存在持有锁的线程在unlock时,新线程加入并park之后,持有锁的线程没有唤醒新线程的可能?
这里可以分别简化lock和unlock代码后,分析交叉运行会得到什么结果。
lock: enqueue try acquire park acquire unlock: release signal successor
假如持有锁的线程为A,新线程为B。
- B的enqueue在A的release之前执行
- B的try acquire在A的release之前执行,B会park,A的signal successor会发现B,执行正确
- B的try acquire在A的release之后执行,B获取锁成功,B不会park,但是A会signal B
- B的enqueue在A的release之后执行,B的try acquire会成功,B不会睡死,但是A会signal B
- B的enqueue在A的signal successor后执行,A的signal successor不会发现B,但是B的try acquire会成功,执行正确
结论B不存在睡死的情况。
注意这里的分析2,假如B在正常运行时被A unpark了的话,B会怎么样?这里涉及到LockSupport具体实现。简单来说,unpark给予线程一个许可,park消费一个许可。park时如果有许可时会直接消费掉并返回,所以除了park/unpark执行序列之外,unpark/park也是可以执行的(个人不确定这是否算正确)。
在本次A释放,B获取锁时,B不会表现出问题,但是在A再次获取了锁(还没有释放),B再次尝试获取锁时,由于B已经有了一个许可,所以park会直接返回,导致A和B同时获取了锁!这是FairLock1存在的一个问题。
如果你看得仔细的话,你可能注意到myTurn里有调用clearThread。也就是说,线程在获取到了锁之后,会将自己的节点的node中的thread清除。如果你看过LockSupport的unpark方法的话,你可能知道unpark在输入参数thread为null不会做任何事情。这里调用clearThread的目的也是减少重复或者说意外的unpark。实际中分析2和1.2的发生可能性会进一步降低。
再深入分析一下1.2。线程B在获取了锁之后,除了clearThread之外,还会推进head。有没有一种可能:
- A unlock
- B lock成功并推进head
- C enqueue了节点
- A尝试唤醒head(B)的后续节点(C)
结论是可能的,这也是FairLock1存在的另外一个问题。
除了FairLock1存在的问题之外,FairLock1的注释中也提到,对于没有竞争的情况,其实没有必要 入队>尝试获取 的过程,直接尝试获取也是OK的。当然,这么做的话,当前的锁不再是公平锁了。
UnfairLock1
假如我们要实现非公平锁,其实可以参照之前的SimpleLock,在检查是否重入之后,直接尝试是否可以获取锁。成功的话,直接返回。这里的问题是,FairLock1的一些规则是否还正确。
- 线程肯定对应一个节点,不再成立
- unlock时队首节点为自己,不再成立
特别是第二条,队首节点不是自己的话,就不能直接唤醒后续节点了。为此,需要引入一个机制,在真正需要唤醒的时候,让前置节点唤醒自己。
其次,为了解决FairLock1中多余许可和意外唤醒可能引起的问题,lock中从park中醒来之后需要重新检查是否可以获取锁,即死循环,而不是认定可以获取锁了。顺便说一句,AQS中unpark引起的多余许可问题也没有完全解决,主体代码也是一个死循环。
最后,AQS认为无竞争的锁不需要哨兵节点,所以head指针和tail指针是惰性初始化的。个人认为这是一个优化,不是必须的。但是为了更接近AQS,UnfairLock1中也使用了惰性初始化。
import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; @SuppressWarnings("Duplicates") public class UnfairLock1 implements Lock { private final Queue queue = new Queue(); private final AtomicInteger reentrantTimes = new AtomicInteger(0); private Thread owner; public void lock() { if (owner == Thread.currentThread()) { // reentrant reentrantTimes.incrementAndGet(); return; } if (reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) { owner = Thread.currentThread(); /* * owner is visible to current thread * in theory, it's ok for other thread not to see the latest value */ return; } Node node = new Node(Thread.currentThread()); Node predecessor = queue.enqueue(node); // PROBLEM: thread may be signaled here while (true) { if (predecessor == queue.head.get() && reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) { myTurn(predecessor, node); return; } if (predecessor.signalSuccessor.get()) { LockSupport.park(this); } else { predecessor.signalSuccessor.set(true); } } } private void myTurn(@Nonnull Node predecessor, @Nonnull Node node) { owner = Thread.currentThread(); node.clearThread(); queue.head.set(node); node.predecessor.set(null); predecessor.successor.set(null); } public void unlock() { if (owner != Thread.currentThread()) { throw new IllegalStateException("not the thread holding lock"); } int rt = reentrantTimes.get(); if (rt < 1) { throw new IllegalStateException("reentrant times < 1 when try to unlock"); } if (rt > 1) { reentrantTimes.set(rt - 1); return; } // rt == 1 owner = null; reentrantTimes.set(0); Node node = queue.head.get(); if (node != null && node.signalSuccessor.get() && node.signalSuccessor.compareAndSet(true, false)) { Node successor = queue.findSuccessor(node); if (successor != null) { LockSupport.unpark(successor.thread.get()); } } } @Override public void lockInterruptibly() throws InterruptedException { throw new UnsupportedOperationException(); } @Override public boolean tryLock() { throw new UnsupportedOperationException(); } @Override public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException(); } @Override @Nonnull public Condition newCondition() { throw new UnsupportedOperationException(); } @SuppressWarnings("Duplicates") private static class Queue { final AtomicReference<Node> head = new AtomicReference<>(); final AtomicReference<Node> tail = new AtomicReference<>(); /** * Enqueue node. * * @param node new node * @return predecessor */ @Nonnull Node enqueue(@Nonnull Node node) { Node t; while (true) { t = tail.get(); if (t == null) { // lazy initialization Node sentinel = new Node(); if (head.get() == null && head.compareAndSet(null, sentinel)) { tail.set(sentinel); } } else { node.predecessor.lazySet(t); if (tail.compareAndSet(t, node)) { t.successor.set(node); return t; } } } } @Nullable Node findSuccessor(@Nonnull Node node) { Node n = node.successor.get(); if (n != null) { return n; } // find node from tail Node c = tail.get(); // tail maybe null during lazy initialization while (c != null && c != node) { n = c; c = c.predecessor.get(); } return n; } } private static class Node { final AtomicReference<Thread> thread; final AtomicBoolean signalSuccessor = new AtomicBoolean(false); final AtomicReference<Node> predecessor = new AtomicReference<>(); // optimization final AtomicReference<Node> successor = new AtomicReference<>(); Node() { this(null); } Node(@Nullable Thread thread) { this.thread = new AtomicReference<>(thread); } void clearThread() { thread.set(null); } } }
head指针和tail指针惰性初始化带来以下影响
- 入队时可能需要初始化,先head再tail的顺序
- 查找后续节点时,tail可能为null(虽然从调用顺序上来说tail不会为null)
- 唤醒后续节点时,head可能为null
第一条理论上先初始化tail再head也是可行的,因为惰性初始化是CAS(head or tail, null, sentinel)的过程,不存在多次初始化,同时入队的情况下,保存了sentinel节点就不会有问题。AQS里应该是其他方法依赖这个顺序,比如hasQueuedPredecessors导致的。
节点上增加了signal successor标志,在head节点不为null并且signal successor被设置的情况下,唤醒后续节点。请问这里是否有新节点设置了signal successor但是没有被唤醒的情况?
同样分析lock与unlock的交叉执行。
lock: try acquire 1 enqueue try acquire 2 set signal successor try acquire 3 park -> try acquire -> park... unlock: release if signal successor find successor unpark
注意,lock中设置好标志之后,会再执行一次check(AQS也是一样),所有在park之前有三次try acquire。
持有锁的线程为A,新线程为B。
- B的try acquire 1在A的release前执行,结果为失败
- enqueue在release之前
- try acquire 2在release前,结果为失败
- set signal successor在release前
- try acquire 3在release前,try acquire 3执行失败,B入队,设置了A的标志,B park,A尝试唤醒B,B成功获取锁,最终结果正确
- try acquire 3在release后,try acquire 3执行成功,B入队,设置了A的标志,A尝试唤醒B,B成功获取锁,最终结果多余的unpark许可
- set signal successor在release和标志检查中,try acquire 3执行成功,B入队,设置了A的标志,A尝试唤醒B,B成功获取锁,最终结果多余的unpark许可
- set signal successor在标志检查后,try acquire 3执行成功,B入队,设置了A的标志,A不会唤醒B,B成功获取锁,最终结果正确
- set signal successor在release前
- try acquire 2在release后,try acquire 2执行成功,B入队,没有设置A的标志,A不会唤醒B,B成功获取锁,最终结果正确
- try acquire 2在release前,结果为失败
- enqueue在release之后,try acquire 2执行成功,B入队,没有设置A的标志,A不会唤醒B,B成功获取锁,最终结果正确
- enqueue在release之前
- B的try acquire 1在A的release后执行,结果为成功,B没有入队,A不会唤醒B,B成功获取锁,最终结果正确
结论是不会发生没有被唤醒的情况。当然多余的unpark许可问题依旧存在。
注意这里FairLock1的A错位唤醒了C的问题没有在这里分析,一方面上面的执行分支已经足够复杂,另一方面即使错位唤醒了,死循环下的重复检查可以保证不会影响正确性,还有B在unlock之后也会尝试唤醒C。
接下来需要考虑的一个问题是增加了signal successor标志了,是否可以避免unlock时被唤醒两次的问题?
结论是仍旧存在,事实上非公平模式下的AQS存在这个问题(注意,UnfairLock1的unlock和AQS的release代码不同)。
考虑线程A,B,C的执行序列
- 线程A以非公平模式获取了锁
- 线程B尝试获取锁,失败,进入队列
- 线程A unlock,设置count为0,此后,新的线程可以以非公平模式获取锁
- 线程C以非公平模式获取了锁,unlock,同样执行到尝试唤醒后续节点的地方
- 此时A和C都可以看到后续节点,即线程B的节点,此时队列如上图
在AQS的release中,只检查后续节点是否需要唤醒,没有类似UnfairLock这里检查CAS结果的代码,所以A和C会同时唤醒B,相对的UnfairLock1只会唤醒一次。
当然,这并不是大的bug,在使用park策略的锁里面,未被唤醒比重复唤醒引起的问题要严重。请记住这点。
第三个问题,既然unlock有可能重复唤醒,那么每个线程用ThreadLocal持有自己的节点(以非公平模式获取的线程持有节点为null)如何?因为unlock时只需关注自己的后续节点。
结论是不行。同样考虑上面的执行序列和队列,当A和C都以非公平模式获取了锁之后,持有节点为null,根本无法唤醒后续节点,即线程B的节点。事实上,没有线程C时,A也无法唤醒B。当A以非公平模式获取了锁之后,B进入队列等待,A在unlock时根本不知道后续节点是谁,所以无法唤醒B。
假如你又提出,当自己所持节点为null和,就查看head的话,建议考虑一下unlock时head节点是什么。
- null,无竞争时
- 刚unlock了的线程的节点,即线程以公平模式获取了锁
- 等价于哨兵节点,当线程以非公平模式获取了的话
作为参考,考虑一下unlock时尝试唤醒后续节点的有可能是哪些线程
- 非公平方式获取了锁又释放的线程
- 以公平方式获取了锁又释放的线程
- 上述的组合,注意
- 以公平方式获取了锁的线程个数:0~1
- 非公平方式获取了锁的线程个数:0~n
合起来考虑的话,不管是公平模式还是非公平模式下获取了锁的线程,都应该访问head,所以理论上ThreadLocal是不需要的。
小结
可以看到,单从spin lock改成了基于park的queue lock就有那么多复杂的细节问题。总体来说,UnfairLock1比FairLock1要复杂,需要分析的细节更多。
接下来继续要添加的是限时版本的tryLock方法,具体将在第二篇中说明和解释。