接着上篇,本篇主要介绍如何添加限时tryLock方法。介绍完之后,ReentrantLock除了condition之外都会实现,而且基本和实际ReentrantLock代码接近。上篇也提到过,实际ReentrantLock依赖AQS,但是本文不会直接介绍AQS,只是AQS的一个不完全分析。
顺便提一下,“自己写ReentrantLock和ReentrantReadWriteLock”由于篇幅比较长,预计会分成3到4篇左右。
在进入限时tryLock方法的介绍之前,考虑一个问题:上篇中FairLock1和UnfairLock1是否可以复用Node?
为什么要问这个问题?因为C++代码比较关注对象的生命周期。使用C++实现锁的话必须关注何时可以回收Node的内存。如果你了解CLHLock的话,可以知道CLHLock是可以复用Node的,最多是N个(线程)+1个(哨兵)Node,理论上不需要回收。所以和CLHLock基本一致的FairLock1,同样可以用类似CLHLock的方式,即复用前序节点为自己的当前节点(CLHLock是复用前序节点的,至于为什么可以参阅CLHLock的介绍)。UnfairLock1中由于可能是非公平模式,不知道前序节点是谁,也就无法简单复用了。
继续考虑限时获取锁的时候,Node是否可以复用?老实说,个人不知道确切答案,但是即使是在公平模式下,限时tryLock想要复用Node比UnfairLock1也要难很多。比如线程B获取锁超时,但是之后线程C进入了队列,这时线程B再次尝试获取锁的时候会怎么样?首先B由于获取失败,无法复用前序节点。其次C有可能持有B的节点指针,也有可能跳过B链到了B的前序节点上。因此,实现带超时tryLock时一般会考虑每次都新建Node来减少复杂性。
可能因为示例代码是Java,所以每次new一个Node出来没什么感觉。但是假如你用C++实现带超时版本的tryLock的话,必须考虑谁来回收Node的内存。有兴趣的可以看一下相关带超时的锁的论文,比如TOLock。
第二个要考虑的问题是,允许限时会带来什么影响?
- 前序节点有可能已超时
- 后续节点有可能已超时
- (除了哨兵节点)head不可能是超时节点
在上图中,C的前序节点B已超时,为了让A能唤醒自己,C需要设置自己的previous指针到A以及A的next指针为自己。当然signal successor标志也是要设置的。对于A来说,后续节点B已超时,A需要唤醒自己的后续节点中没有超时的第一个节点,即C。第三点不难理解,head的推进是由已经获取锁的线程处理的,所以新的head指针肯定对应一个非超时的线程。
需要继续考虑第一和第二点。当A和C同时执行时,如何设计包括B在内的一个机制,保证C肯定会被唤醒(允许多次唤醒)?
UnfairTimedLock1
以下是基于AQS改造的限时锁,具体分析下如何保证唤醒线程C。
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
@SuppressWarnings("Duplicates")
public class UnfairTimedLock1 implements Lock {
private final Queue queue = new Queue();
private final AtomicInteger reentrantTimes = new AtomicInteger(0);
private Thread owner;
public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException {
if (owner == Thread.currentThread()) {
// reentrant
reentrantTimes.incrementAndGet();
return true;
}
if (reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
owner = Thread.currentThread();
return true;
}
final long deadline = unit.toNanos(time) + System.nanoTime();
Node node = new Node(Thread.currentThread());
Node predecessor = queue.enqueue(node);
long nanos;
while (true) {
if (predecessor == queue.head.get() &&
reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
myTurn(predecessor, node);
return true;
}
nanos = deadline - System.nanoTime();
// timeout
if (nanos <= 0L) {
abort(predecessor, node);
return false;
}
switch (predecessor.status.get()) {
case Node.STATUS_ABORTED:
predecessor = queue.skipAbortedPredecessors(predecessor, node);
predecessor.successor.set(node);
break;
case Node.STATUS_SIGNAL_SUCCESSOR:
LockSupport.parkNanos(this, nanos);
break;
case Node.STATUS_NORMAL:
/*
* recheck is required after CAS
* 1. CAS failed
* 2. CAS successfully, but status changed to ABORTED before parking
* 3. predecessor unlock between first check and CAS(no unpark)
*/
predecessor.status.compareAndSet(Node.STATUS_NORMAL, Node.STATUS_SIGNAL_SUCCESSOR);
break;
}
if (Thread.interrupted()) {
abort(predecessor, node);
throw new InterruptedException();
}
}
}
private void abort(@Nonnull Node predecessor, @Nonnull Node node) {
node.clearThread();
Node p = queue.skipAbortedPredecessors(predecessor, node);
Node ps = p.successor.get();
// linearization point
node.status.set(Node.STATUS_ABORTED);
/*
* at end
*
* A -> |<B>
* ANY -> |ABORTED
*
* no lost-wakeup problem
*/
if (queue.tail.get() == node && queue.tail.compareAndSet(node, p)) {
/*
* failure is ok, which means
* new node may enqueue between removing and setting successor
*/
p.successor.compareAndSet(ps, null);
return;
}
/*
* at beginning
*
* A -> |<B> -> C
* ANY -> |ABORTED -> ANY
*
* lost-wakeup problem may happen
*
* scenarios
* 1. B didn't set flag of A
*
* 2. B was signaled
* sequence
* a. B set flag
* b. A signaled B
* c. B aborted
*/
if (p == queue.head.get()) {
signalNormalSuccessor(node);
return;
}
/*
* in middle
*
* A -> |B -> <C> -> D
* ANY -> |ANY -> ABORTED -> ANY
*
* lost-wakeup problem may happen
*
* conditions
* 1. no one set flag of B
* 1.1 D set flag of C before C aborts
* 1.2 C didn't set the flag of B
*
* 2. B acquired lock and finished processing after p == head check
*
* first, try to set flag of B, then recheck if predecessor finished(unlocked or aborted)
*/
if (p.ensureSignalSuccessorStatus() && p.thread.get() != null) {
Node s = node.successor.get();
if (s != null && s.status.get() != Node.STATUS_ABORTED) {
p.successor.compareAndSet(ps, s);
}
} else {
signalNormalSuccessor(node);
}
}
private void myTurn(@Nonnull Node predecessor, @Nonnull Node node) {
owner = Thread.currentThread();
node.clearThread();
queue.head.set(node);
node.predecessor.set(null);
predecessor.successor.set(null);
}
private void signalNormalSuccessor(@Nonnull Node node) {
Node successor = queue.findNormalSuccessor(node);
if (successor != null) {
LockSupport.unpark(successor.thread.get());
}
}
public void unlock() {
if (owner != Thread.currentThread()) {
throw new IllegalStateException("not the thread holding lock");
}
int rt = reentrantTimes.get();
if (rt < 1) {
throw new IllegalStateException("reentrant times < 1 when try to unlock");
}
if (rt > 1) {
reentrantTimes.set(rt - 1);
return;
}
// rt == 1
owner = null;
// linearization point
reentrantTimes.set(0);
Node node = queue.head.get();
if (node != null &&
node.status.get() == Node.STATUS_SIGNAL_SUCCESSOR &&
node.status.compareAndSet(Node.STATUS_SIGNAL_SUCCESSOR, Node.STATUS_NORMAL)) {
signalNormalSuccessor(node);
}
}
@Override
public void lock() {
throw new UnsupportedOperationException();
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public boolean tryLock() {
throw new UnsupportedOperationException();
}
@Override
@Nonnull
public Condition newCondition() {
throw new UnsupportedOperationException();
}
@SuppressWarnings("Duplicates")
private static class Queue {
final AtomicReference<Node> head = new AtomicReference<>();
final AtomicReference<Node> tail = new AtomicReference<>();
/**
* Enqueue node.
*
* @param node new node
* @return predecessor
*/
@Nonnull
Node enqueue(@Nonnull Node node) {
Node t;
while (true) {
t = tail.get();
if (t == null) {
// lazy initialization
Node sentinel = new Node();
if (head.get() == null && head.compareAndSet(null, sentinel)) {
tail.set(sentinel);
}
} else {
node.predecessor.lazySet(t);
// linearization point
if (tail.compareAndSet(t, node)) {
t.successor.set(node);
return t;
}
}
}
}
@Nullable
Node findNormalSuccessor(@Nonnull Node node) {
Node n = node.successor.get();
if (n != null && n.status.get() != Node.STATUS_ABORTED) {
return n;
}
// find normal node from tail
Node c = tail.get();
n = null;
// tail maybe null during lazy initialization
while (c != null && c != node) {
if (c.status.get() != Node.STATUS_ABORTED) {
n = c;
}
c = c.predecessor.get();
}
return n;
}
@Nonnull
Node skipAbortedPredecessors(@Nonnull Node predecessor, @Nonnull Node node) {
Node h = head.get();
Node p = predecessor;
while (p != h && p.status.get() != Node.STATUS_ABORTED) {
p = p.predecessor.get();
/*
* set predecessor every time to help successors of
* current node to find the normal predecessor more quickly
*/
node.predecessor.set(p);
}
return p;
}
}
/**
* Node.
* <p>
* Status change:
* NORMAL -> ABORTED
*/
private static class Node {
static final int STATUS_NORMAL = 0;
static final int STATUS_ABORTED = -1;
static final int STATUS_SIGNAL_SUCCESSOR = 1;
/**
* thread will be null if
* 1. abort
* 2. enter mutual exclusion area
*/
final AtomicReference<Thread> thread;
final AtomicInteger status = new AtomicInteger(STATUS_NORMAL);
final AtomicReference<Node> predecessor = new AtomicReference<>();
// optimization
final AtomicReference<Node> successor = new AtomicReference<>();
Node() {
this(null);
}
Node(@Nullable Thread thread) {
this.thread = new AtomicReference<>(thread);
}
boolean ensureSignalSuccessorStatus() {
int s = this.status.get();
return s == STATUS_SIGNAL_SUCCESSOR ||
(s == STATUS_NORMAL && this.status.compareAndSet(s, STATUS_SIGNAL_SUCCESSOR));
}
void clearThread() {
thread.set(null);
}
}
}
首先Node增加了status,替换掉了原先的signal successor标志。status的迁移图如下
总共有4种迁移。一个线程在创建了Node之后,先是NORMAL。有后续节点挂上了之后,被设置为SIGNAL。如果当前线程超时了的话,变成ABORTED(实际也就是直接设置为ABORTED)。除了后续节点所对应的线程通过CAS修改当前线程拥有的节点之外,其余都是当前线程自身修改。
为什么原先是直接设置的,现在要CAS?看了迁移图后相信很容易理解。为了防止后续节点将ABORTED状态修改为SIGNAL。
在此基础上继续分析保证唤醒的机制。
假设线程在设置自己的节点为ABORTED之后,开始解决唤醒问题。
先从最简单的开始,即上图的上部。B没有后续节点,那么肯定没有唤醒问题。这块对应abort方法中node == tail时候的代码。如果CAS tail为前置节点成功的话,说明自己真的是末尾节点,接下来CAS设置前置节点的next指针为空。这里并没有检查CAS的返回值。CAS失败的时候意味着在CAS tail指针之后有新节点入队了。从另一个角度来说,为了防止B的处理覆盖掉新入节点的设置,这里必须用CAS。
接下来分析B为第一个候选节点的情况,即上图的中间部分。要知道B为了让A能够唤醒自己,会设置A的status为SIGNAL。那么B abort时,A的status是SIGNAL还是NORMAL?回答是两者都有可能。tryLock的代码中,在第一次检查nanos是否小于0的时候,有可能超时时间太短,直接被判定为超时的话,A的status为NORMAL。另一种情况是设置A的status为SIGNAL之后,线程被中断。当然还有线程park中被中断,等待超时等情况,此时A的status为SIGNAL。
你可能会问,是否可以一律设置A的状态为SIGNAL,让分析更简单一些?回答是,这样做可能不太好。因为当B明显超时时间过短无法获取锁并放弃了的话,为什么要让A负责唤醒B?虽说多次唤醒比忘记唤醒引起的问题小,但是作为一个同步器还是希望尽量精确唤醒。
继续分析C的情况。
- C在B放弃之后,尝试让B唤醒自己,CAS不成功加上再次读取B的status的时候发现B已经放弃了,此时C会越过B尝试设置A的status,并在A的status被确实设置为SIGNAL之后park
- C在B放弃之前,尝试让B唤醒自己
- 再次读取B的status时为ABORTED,接下来与1一样
- 再次读取B的status时为SIGNAL,即B还没放弃,C会park
也就说B,C同时运行时,C有可能在不知道B放弃的情况下park
另一方面对A来说
- B已放弃
- B未设置A的状态为SIGNAL,A不会唤醒B,唤醒丢失
- B设置了A的状态为SIGNAL,A跳过B,唤醒C
- 虽然B未设置,但是C设置了A的状态为SIGNAL,A在查找后续节点时跳过了B,唤醒C
- B未放弃
- B未设置A的状态为SIGNAL,A不会唤醒B,唤醒丢失
- B设置了A的状态为SIGNAL,A唤醒B,唤醒丢失
额外的,假设A在unlock时,C以公平模式获取锁成功(非公平模式的话不存在C节点)并且推进了head节点(即,C跳过了B以及head变成了C)的话,A的错位唤醒有可能会唤醒C之后的节点。
可以看到,当B是第一个候选节点的时候,C有可能设置不了A的SIGNAL,A也有在好几种可能唤醒不了C。所以AQS的策略是一律让B唤醒C。UnfairTimedLock1中基本也是参考AQS,只是上述分析全部是我自己考虑出来的。
具体代码在abort方法中判断p(predecessor)是否为head,如果是的是,就无论自己的状态一律唤醒。
顺便说一句,在使用spin策略的限时锁中,全权由C来跳过放弃了的B,省去了上述的分析。
最后是B不是第一候选节点并且不在tail的情况,即上图下面的部分。直觉上来说,由于不是第一候选节点,A要唤醒的是D,C也会跳过B链接到D上,不太可能出现唤醒丢失的问题。这里最关键的是确认D的status一定被设置为SIGNAL。那么D的status有可能是NORMAL么?
看了之前中间部分的分析之后,可以说是有可能的,条件如下
- C在B放弃前park了(线程C分析2.2)
- B超时时间过短,直接放弃
此时要求B确保D的status为SIGNAL,所以会有abort中ensureSignalSuccessorStatus这个调用。
继续提问,是否只要设置D的status为SIGNAL就可以了?
答案是:除了设置D的status之外,还要确保D没有unlock。假设ensureSignalSuccessorStatus成功,D可能了发生什么呢?
- D在ensureSignalSuccessorStatus调用之前获取了锁又释放了锁,此时D不会唤醒C,唤醒丢失
- D在ensureSignalSuccessorStatus调用之后释放了锁(获取可以在之前或之后),此时D会唤醒C
- D在ensureSignalSuccessorStatus调用之前放弃,ensureSignalSuccessorStatus会失败,C需要A来唤醒
- D在ensureSignalSuccessorStatus调用之后放弃,由于D是第一候选节点,按照上面中间部分的说明,D会无条件唤醒后续节点,即C
情况1比较麻烦,检查D是否以及完成的条件只能在myTurn方法中找,实际可用的只有一个,thread == null。当然你也可以选择增加status。情况3由于ensureSignalSuccessorStatus会失败,不用增加判断。
以上分析对应abort最后的代码,如果ensureSignalSuccessorStatus成功和thread != null时才可以认为没有唤醒丢失的问题,其余情况即1和3,需要无条件唤醒C。情况3中有可能A唤醒了C,B也唤醒了C,但是考虑到A可能去唤醒D了,所以此处还是需要B确定C被唤醒。
条件中成功时此时B可以选择帮助D设置next指针为C,CAS失败此处没有影响。
abort中最复杂的分析至此结束。实际AQS的代码把图中中间和下部的情况和在一起考虑。
老实说个人觉得abort时的唤醒分析确实比较复杂,从编码上一律唤醒可能是最简单的,但是AQS分析了一部分并且用排除的方式解决了一部分不需要唤醒的情况,理解这块代码可能是设计UnfairTimedLock1最关键的地方。
abort中还有一些代码,比如最开始的设置thread为null。个人理解这是放弃了的线程减少被额外唤醒用的,比如作为第一个候选节点的时候。上面的分析中,基本都是依赖status来判断节点是否放弃,没有看到用thread == null来判断是否abort的地方。当然如果我的分析错了,欢迎指出。
abort中在设置status为ABORTED之前,还有一段跳过放弃了的节点的代码。个人理解在设置status之前是为了防止后续节点跳过自己简化分析。理论上放在设置status之后也是可以的,甚至不跳过也不会造成太大问题。
说到跳过放弃了的节点,Queue#skipAbortedPredecessors中不是找到了第一个未放弃的节点之后设置自己的previous指针,而是遍历时每次都设置。个人理解是想做多线程下的helping,即A在skip时,A后面的节点在skip时,可以通过A更快找到未放弃的节点。只是考虑到skipAbortedPredecessors时(包括abort方法)节点还没设置status为ABORTED,理论上,A的后续节点遍历到了A就停下了。即使A skip之后,立马abort,后续节点也不需要这种helping。所以个人这块持保留意见(如果AQS的cancelAcquire方法原先是在设置status之后skip的话另当别论)。至少在现在的ReentrantLock中应该是不需要的。
UnfairTimedLock2
在写好了限时版tryLock之后,其他lock方法依样画葫芦得到的最终版。
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
@SuppressWarnings("Duplicates")
public class UnfairTimedLock2 implements Lock {
private final Queue queue = new Queue();
private final AtomicInteger reentrantTimes = new AtomicInteger(0);
private Thread owner;
public void lock() {
if (tryLock()) {
return;
}
Node node = new Node(Thread.currentThread());
queue.enqueue(node);
while (true) {
if (queue.isNextCandidate(node) &&
reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
myTurn(node);
return;
}
if (isReadyToPark(node)) {
LockSupport.park(this);
}
}
}
public void lockInterruptibly() throws InterruptedException {
if (tryLock()) {
return;
}
Node node = new Node(Thread.currentThread());
queue.enqueue(node);
while (true) {
if (queue.isNextCandidate(node) &&
reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
myTurn(node);
return;
}
if (isReadyToPark(node)) {
LockSupport.park(this);
}
if (Thread.interrupted()) {
abort(node);
throw new InterruptedException();
}
}
}
public boolean tryLock() {
if (owner == Thread.currentThread()) {
// reentrant
reentrantTimes.incrementAndGet();
return true;
}
if (reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
owner = Thread.currentThread();
return true;
}
return false;
}
public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException {
if (tryLock()) {
return true;
}
final long deadline = unit.toNanos(time) + System.nanoTime();
Node node = new Node(Thread.currentThread());
queue.enqueue(node);
long nanos;
while (true) {
if (queue.isNextCandidate(node) &&
reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) {
myTurn(node);
return true;
}
nanos = deadline - System.nanoTime();
// timeout
if (nanos <= 0L) {
abort(node);
return false;
}
if (isReadyToPark(node)) {
LockSupport.parkNanos(this, nanos);
}
if (Thread.interrupted()) {
abort(node);
throw new InterruptedException();
}
}
}
private boolean isReadyToPark(@Nonnull Node node) {
Node p = node.predecessor.get();
int s = p.status.get();
if (s == Node.STATUS_SIGNAL_SUCCESSOR) {
return true;
}
if (s == Node.STATUS_ABORTED) {
p = queue.skipAbortedPredecessors(node);
p.successor.set(node);
} else if (s == Node.STATUS_NORMAL) {
p.status.compareAndSet(Node.STATUS_NORMAL, Node.STATUS_SIGNAL_SUCCESSOR);
}
return false;
}
private void abort(@Nonnull Node node) {
node.clearThread();
Node p = queue.skipAbortedPredecessors(node);
Node ps = p.successor.get();
// linearization point
node.status.set(Node.STATUS_ABORTED);
if (queue.tail.get() == node && queue.tail.compareAndSet(node, p)) {
p.successor.compareAndSet(ps, null);
return;
}
if (p != queue.head.get() && p.ensureSignalSuccessorStatus() && p.thread.get() != null) {
Node s = node.successor.get();
if (s != null && s.status.get() != Node.STATUS_ABORTED) {
p.successor.compareAndSet(ps, s);
}
} else {
signalNormalSuccessor(node);
}
}
private void myTurn(@Nonnull Node node) {
owner = Thread.currentThread();
node.clearThread();
queue.head.set(node);
reentrantTimes.set(1);
Node predecessor = node.predecessor.get();
node.predecessor.set(null);
predecessor.successor.set(null);
}
private void signalNormalSuccessor(@Nonnull Node node) {
if (node.status.get() == Node.STATUS_SIGNAL_SUCCESSOR) {
node.status.compareAndSet(Node.STATUS_SIGNAL_SUCCESSOR, Node.STATUS_NORMAL);
}
Node successor = queue.findNormalSuccessor(node);
if (successor != null) {
LockSupport.unpark(successor.thread.get());
}
}
public void unlock() {
if (owner != Thread.currentThread()) {
throw new IllegalStateException("not the thread holding lock");
}
int rt = reentrantTimes.get();
if (rt < 1) {
throw new IllegalStateException("reentrant times < 1 when try to unlock");
}
if (rt > 1) {
reentrantTimes.set(rt - 1);
return;
}
// rt == 1
owner = null;
// linearization point
reentrantTimes.set(0);
Node node = queue.head.get();
if (node != null && node.status.get() == Node.STATUS_SIGNAL_SUCCESSOR) {
signalNormalSuccessor(node);
}
}
@Override
@Nonnull
public Condition newCondition() {
throw new UnsupportedOperationException();
}
@SuppressWarnings("Duplicates")
private static class Queue {
final AtomicReference<Node> head = new AtomicReference<>();
final AtomicReference<Node> tail = new AtomicReference<>();
/**
* Enqueue node.
*
* @param node new node
* @return predecessor
*/
@Nonnull
Node enqueue(@Nonnull Node node) {
Node t;
while (true) {
t = tail.get();
if (t == null) {
// lazy initialization
Node sentinel = new Node();
if (head.get() == null && head.compareAndSet(null, sentinel)) {
tail.set(sentinel);
}
} else {
node.predecessor.lazySet(t);
// linearization point
if (tail.compareAndSet(t, node)) {
t.successor.set(node);
return t;
}
}
}
}
@Nullable
Node findNormalSuccessor(@Nonnull Node node) {
Node n = node.successor.get();
if (n != null && n.status.get() != Node.STATUS_ABORTED) {
return n;
}
// find normal node from tail
Node c = tail.get();
n = null;
// tail maybe null during lazy initialization
while (c != null && c != node) {
if (c.status.get() != Node.STATUS_ABORTED) {
n = c;
}
c = c.predecessor.get();
}
return n;
}
boolean isNextCandidate(@Nonnull Node node) {
return node.predecessor.get() == head.get();
}
@Nonnull
Node skipAbortedPredecessors(@Nonnull Node node) {
Node h = head.get();
Node p = node.predecessor.get();
while (p != h && p.status.get() != Node.STATUS_ABORTED) {
p = p.predecessor.get();
/*
* set predecessor every time to help successors of
* current node to find the normal predecessor more quickly
*/
node.predecessor.set(p);
}
return p;
}
}
/**
* Node.
* <p>
* Status change:
* NORMAL -> ABORTED
*/
private static class Node {
static final int STATUS_NORMAL = 0;
static final int STATUS_ABORTED = -1;
static final int STATUS_SIGNAL_SUCCESSOR = 1;
/**
* thread will be null if
* 1. abort
* 2. enter mutual exclusion area
*/
final AtomicReference<Thread> thread;
final AtomicInteger status = new AtomicInteger(STATUS_NORMAL);
final AtomicReference<Node> predecessor = new AtomicReference<>();
// optimization
final AtomicReference<Node> successor = new AtomicReference<>();
Node() {
this(null);
}
Node(@Nullable Thread thread) {
this.thread = new AtomicReference<>(thread);
}
boolean ensureSignalSuccessorStatus() {
int s = this.status.get();
return s == STATUS_SIGNAL_SUCCESSOR ||
(s == STATUS_NORMAL && this.status.compareAndSet(s, STATUS_SIGNAL_SUCCESSOR));
}
void clearThread() {
thread.set(null);
}
}
}
如果你看过AQS的源代码的话,你可能会注意这里的tryLock和AQS的tryAcquire,isReadyToPark和shouldParkAfterFailedAcquire,abort和cancelAcquire比较接近。有兴趣的话可以对比一下。
小结
限时锁个人认为是比较难实现的一种锁,但是在解决死锁问题等地方会有用。如果你理解了限时锁的分析的话,相信你也能写出类似ReentrantLock一样的锁。
如果你想更全面地了解锁的实现的话,建议阅读《the art of multiprocessor programming》中关于锁的部分。虽然大部分都是spin lock,具体实际基于park代码比较远,但是对训练多线程代码分析和设计很有帮助。
接下来,个人要实现的是ReadWriteLock,具体在第三篇中讲解。



One response to “Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(2)”
感谢分享!已推荐到《开发者头条》:https://toutiao.io/posts/rdwbpb 欢迎点赞支持!
使用开发者头条 App 搜索 385148 即可订阅《并发与分布式系统研究》