接着上篇,本篇主要介绍如何添加限时tryLock方法。介绍完之后,ReentrantLock除了condition之外都会实现,而且基本和实际ReentrantLock代码接近。上篇也提到过,实际ReentrantLock依赖AQS,但是本文不会直接介绍AQS,只是AQS的一个不完全分析。
顺便提一下,“自己写ReentrantLock和ReentrantReadWriteLock”由于篇幅比较长,预计会分成3到4篇左右。
在进入限时tryLock方法的介绍之前,考虑一个问题:上篇中FairLock1和UnfairLock1是否可以复用Node?
为什么要问这个问题?因为C++代码比较关注对象的生命周期。使用C++实现锁的话必须关注何时可以回收Node的内存。如果你了解CLHLock的话,可以知道CLHLock是可以复用Node的,最多是N个(线程)+1个(哨兵)Node,理论上不需要回收。所以和CLHLock基本一致的FairLock1,同样可以用类似CLHLock的方式,即复用前序节点为自己的当前节点(CLHLock是复用前序节点的,至于为什么可以参阅CLHLock的介绍)。UnfairLock1中由于可能是非公平模式,不知道前序节点是谁,也就无法简单复用了。
继续考虑限时获取锁的时候,Node是否可以复用?老实说,个人不知道确切答案,但是即使是在公平模式下,限时tryLock想要复用Node比UnfairLock1也要难很多。比如线程B获取锁超时,但是之后线程C进入了队列,这时线程B再次尝试获取锁的时候会怎么样?首先B由于获取失败,无法复用前序节点。其次C有可能持有B的节点指针,也有可能跳过B链到了B的前序节点上。因此,实现带超时tryLock时一般会考虑每次都新建Node来减少复杂性。
可能因为示例代码是Java,所以每次new一个Node出来没什么感觉。但是假如你用C++实现带超时版本的tryLock的话,必须考虑谁来回收Node的内存。有兴趣的可以看一下相关带超时的锁的论文,比如TOLock。
第二个要考虑的问题是,允许限时会带来什么影响?
- 前序节点有可能已超时
- 后续节点有可能已超时
- (除了哨兵节点)head不可能是超时节点
在上图中,C的前序节点B已超时,为了让A能唤醒自己,C需要设置自己的previous指针到A以及A的next指针为自己。当然signal successor标志也是要设置的。对于A来说,后续节点B已超时,A需要唤醒自己的后续节点中没有超时的第一个节点,即C。第三点不难理解,head的推进是由已经获取锁的线程处理的,所以新的head指针肯定对应一个非超时的线程。
需要继续考虑第一和第二点。当A和C同时执行时,如何设计包括B在内的一个机制,保证C肯定会被唤醒(允许多次唤醒)?
UnfairTimedLock1
以下是基于AQS改造的限时锁,具体分析下如何保证唤醒线程C。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 |
import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; @SuppressWarnings(“Duplicates”) public class UnfairTimedLock1 implements Lock { private final Queue queue = new Queue(); private final AtomicInteger reentrantTimes = new AtomicInteger(0); private Thread owner; public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException { if (owner == Thread.currentThread()) { // reentrant reentrantTimes.incrementAndGet(); return true; } if (reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) { owner = Thread.currentThread(); return true; } final long deadline = unit.toNanos(time) + System.nanoTime(); Node node = new Node(Thread.currentThread()); Node predecessor = queue.enqueue(node); long nanos; while (true) { if (predecessor == queue.head.get() && reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) { myTurn(predecessor, node); return true; } nanos = deadline – System.nanoTime(); // timeout if (nanos <= 0L) { abort(predecessor, node); return false; } switch (predecessor.status.get()) { case Node.STATUS_ABORTED: predecessor = queue.skipAbortedPredecessors(predecessor, node); predecessor.successor.set(node); break; case Node.STATUS_SIGNAL_SUCCESSOR: LockSupport.parkNanos(this, nanos); break; case Node.STATUS_NORMAL: /* * recheck is required after CAS * 1. CAS failed * 2. CAS successfully, but status changed to ABORTED before parking * 3. predecessor unlock between first check and CAS(no unpark) */ predecessor.status.compareAndSet(Node.STATUS_NORMAL, Node.STATUS_SIGNAL_SUCCESSOR); break; } if (Thread.interrupted()) { abort(predecessor, node); throw new InterruptedException(); } } } private void abort(@Nonnull Node predecessor, @Nonnull Node node) { node.clearThread(); Node p = queue.skipAbortedPredecessors(predecessor, node); Node ps = p.successor.get(); // linearization point node.status.set(Node.STATUS_ABORTED); /* * at end * * A -> |<B> * ANY -> |ABORTED * * no lost-wakeup problem */ if (queue.tail.get() == node && queue.tail.compareAndSet(node, p)) { /* * failure is ok, which means * new node may enqueue between removing and setting successor */ p.successor.compareAndSet(ps, null); return; } /* * at beginning * * A -> |<B> -> C * ANY -> |ABORTED -> ANY * * lost-wakeup problem may happen * * scenarios * 1. B didn’t set flag of A * * 2. B was signaled * sequence * a. B set flag * b. A signaled B * c. B aborted */ if (p == queue.head.get()) { signalNormalSuccessor(node); return; } /* * in middle * * A -> |B -> <C> -> D * ANY -> |ANY -> ABORTED -> ANY * * lost-wakeup problem may happen * * conditions * 1. no one set flag of B * 1.1 D set flag of C before C aborts * 1.2 C didn’t set the flag of B * * 2. B acquired lock and finished processing after p == head check * * first, try to set flag of B, then recheck if predecessor finished(unlocked or aborted) */ if (p.ensureSignalSuccessorStatus() && p.thread.get() != null) { Node s = node.successor.get(); if (s != null && s.status.get() != Node.STATUS_ABORTED) { p.successor.compareAndSet(ps, s); } } else { signalNormalSuccessor(node); } } private void myTurn(@Nonnull Node predecessor, @Nonnull Node node) { owner = Thread.currentThread(); node.clearThread(); queue.head.set(node); node.predecessor.set(null); predecessor.successor.set(null); } private void signalNormalSuccessor(@Nonnull Node node) { Node successor = queue.findNormalSuccessor(node); if (successor != null) { LockSupport.unpark(successor.thread.get()); } } public void unlock() { if (owner != Thread.currentThread()) { throw new IllegalStateException(“not the thread holding lock”); } int rt = reentrantTimes.get(); if (rt < 1) { throw new IllegalStateException(“reentrant times < 1 when try to unlock”); } if (rt > 1) { reentrantTimes.set(rt – 1); return; } // rt == 1 owner = null; // linearization point reentrantTimes.set(0); Node node = queue.head.get(); if (node != null && node.status.get() == Node.STATUS_SIGNAL_SUCCESSOR && node.status.compareAndSet(Node.STATUS_SIGNAL_SUCCESSOR, Node.STATUS_NORMAL)) { signalNormalSuccessor(node); } } @Override public void lock() { throw new UnsupportedOperationException(); } @Override public void lockInterruptibly() throws InterruptedException { throw new UnsupportedOperationException(); } @Override public boolean tryLock() { throw new UnsupportedOperationException(); } @Override @Nonnull public Condition newCondition() { throw new UnsupportedOperationException(); } @SuppressWarnings(“Duplicates”) private static class Queue { final AtomicReference<Node> head = new AtomicReference<>(); final AtomicReference<Node> tail = new AtomicReference<>(); /** * Enqueue node. * * @param node new node * @return predecessor */ @Nonnull Node enqueue(@Nonnull Node node) { Node t; while (true) { t = tail.get(); if (t == null) { // lazy initialization Node sentinel = new Node(); if (head.get() == null && head.compareAndSet(null, sentinel)) { tail.set(sentinel); } } else { node.predecessor.lazySet(t); // linearization point if (tail.compareAndSet(t, node)) { t.successor.set(node); return t; } } } } @Nullable Node findNormalSuccessor(@Nonnull Node node) { Node n = node.successor.get(); if (n != null && n.status.get() != Node.STATUS_ABORTED) { return n; } // find normal node from tail Node c = tail.get(); n = null; // tail maybe null during lazy initialization while (c != null && c != node) { if (c.status.get() != Node.STATUS_ABORTED) { n = c; } c = c.predecessor.get(); } return n; } @Nonnull Node skipAbortedPredecessors(@Nonnull Node predecessor, @Nonnull Node node) { Node h = head.get(); Node p = predecessor; while (p != h && p.status.get() != Node.STATUS_ABORTED) { p = p.predecessor.get(); /* * set predecessor every time to help successors of * current node to find the normal predecessor more quickly */ node.predecessor.set(p); } return p; } } /** * Node. * <p> * Status change: * NORMAL -> ABORTED */ private static class Node { static final int STATUS_NORMAL = 0; static final int STATUS_ABORTED = –1; static final int STATUS_SIGNAL_SUCCESSOR = 1; /** * thread will be null if * 1. abort * 2. enter mutual exclusion area */ final AtomicReference<Thread> thread; final AtomicInteger status = new AtomicInteger(STATUS_NORMAL); final AtomicReference<Node> predecessor = new AtomicReference<>(); // optimization final AtomicReference<Node> successor = new AtomicReference<>(); Node() { this(null); } Node(@Nullable Thread thread) { this.thread = new AtomicReference<>(thread); } boolean ensureSignalSuccessorStatus() { int s = this.status.get(); return s == STATUS_SIGNAL_SUCCESSOR || (s == STATUS_NORMAL && this.status.compareAndSet(s, STATUS_SIGNAL_SUCCESSOR)); } void clearThread() { thread.set(null); } } } |
首先Node增加了status,替换掉了原先的signal successor标志。status的迁移图如下
总共有4种迁移。一个线程在创建了Node之后,先是NORMAL。有后续节点挂上了之后,被设置为SIGNAL。如果当前线程超时了的话,变成ABORTED(实际也就是直接设置为ABORTED)。除了后续节点所对应的线程通过CAS修改当前线程拥有的节点之外,其余都是当前线程自身修改。
为什么原先是直接设置的,现在要CAS?看了迁移图后相信很容易理解。为了防止后续节点将ABORTED状态修改为SIGNAL。
在此基础上继续分析保证唤醒的机制。
假设线程在设置自己的节点为ABORTED之后,开始解决唤醒问题。
先从最简单的开始,即上图的上部。B没有后续节点,那么肯定没有唤醒问题。这块对应abort方法中node == tail时候的代码。如果CAS tail为前置节点成功的话,说明自己真的是末尾节点,接下来CAS设置前置节点的next指针为空。这里并没有检查CAS的返回值。CAS失败的时候意味着在CAS tail指针之后有新节点入队了。从另一个角度来说,为了防止B的处理覆盖掉新入节点的设置,这里必须用CAS。
接下来分析B为第一个候选节点的情况,即上图的中间部分。要知道B为了让A能够唤醒自己,会设置A的status为SIGNAL。那么B abort时,A的status是SIGNAL还是NORMAL?回答是两者都有可能。tryLock的代码中,在第一次检查nanos是否小于0的时候,有可能超时时间太短,直接被判定为超时的话,A的status为NORMAL。另一种情况是设置A的status为SIGNAL之后,线程被中断。当然还有线程park中被中断,等待超时等情况,此时A的status为SIGNAL。
你可能会问,是否可以一律设置A的状态为SIGNAL,让分析更简单一些?回答是,这样做可能不太好。因为当B明显超时时间过短无法获取锁并放弃了的话,为什么要让A负责唤醒B?虽说多次唤醒比忘记唤醒引起的问题小,但是作为一个同步器还是希望尽量精确唤醒。
继续分析C的情况。
- C在B放弃之后,尝试让B唤醒自己,CAS不成功加上再次读取B的status的时候发现B已经放弃了,此时C会越过B尝试设置A的status,并在A的status被确实设置为SIGNAL之后park
- C在B放弃之前,尝试让B唤醒自己
- 再次读取B的status时为ABORTED,接下来与1一样
- 再次读取B的status时为SIGNAL,即B还没放弃,C会park
也就说B,C同时运行时,C有可能在不知道B放弃的情况下park
另一方面对A来说
- B已放弃
- B未设置A的状态为SIGNAL,A不会唤醒B,唤醒丢失
- B设置了A的状态为SIGNAL,A跳过B,唤醒C
- 虽然B未设置,但是C设置了A的状态为SIGNAL,A在查找后续节点时跳过了B,唤醒C
- B未放弃
- B未设置A的状态为SIGNAL,A不会唤醒B,唤醒丢失
- B设置了A的状态为SIGNAL,A唤醒B,唤醒丢失
额外的,假设A在unlock时,C以公平模式获取锁成功(非公平模式的话不存在C节点)并且推进了head节点(即,C跳过了B以及head变成了C)的话,A的错位唤醒有可能会唤醒C之后的节点。
可以看到,当B是第一个候选节点的时候,C有可能设置不了A的SIGNAL,A也有在好几种可能唤醒不了C。所以AQS的策略是一律让B唤醒C。UnfairTimedLock1中基本也是参考AQS,只是上述分析全部是我自己考虑出来的。
具体代码在abort方法中判断p(predecessor)是否为head,如果是的是,就无论自己的状态一律唤醒。
顺便说一句,在使用spin策略的限时锁中,全权由C来跳过放弃了的B,省去了上述的分析。
最后是B不是第一候选节点并且不在tail的情况,即上图下面的部分。直觉上来说,由于不是第一候选节点,A要唤醒的是D,C也会跳过B链接到D上,不太可能出现唤醒丢失的问题。这里最关键的是确认D的status一定被设置为SIGNAL。那么D的status有可能是NORMAL么?
看了之前中间部分的分析之后,可以说是有可能的,条件如下
- C在B放弃前park了(线程C分析2.2)
- B超时时间过短,直接放弃
此时要求B确保D的status为SIGNAL,所以会有abort中ensureSignalSuccessorStatus这个调用。
继续提问,是否只要设置D的status为SIGNAL就可以了?
答案是:除了设置D的status之外,还要确保D没有unlock。假设ensureSignalSuccessorStatus成功,D可能了发生什么呢?
- D在ensureSignalSuccessorStatus调用之前获取了锁又释放了锁,此时D不会唤醒C,唤醒丢失
- D在ensureSignalSuccessorStatus调用之后释放了锁(获取可以在之前或之后),此时D会唤醒C
- D在ensureSignalSuccessorStatus调用之前放弃,ensureSignalSuccessorStatus会失败,C需要A来唤醒
- D在ensureSignalSuccessorStatus调用之后放弃,由于D是第一候选节点,按照上面中间部分的说明,D会无条件唤醒后续节点,即C
情况1比较麻烦,检查D是否以及完成的条件只能在myTurn方法中找,实际可用的只有一个,thread == null。当然你也可以选择增加status。情况3由于ensureSignalSuccessorStatus会失败,不用增加判断。
以上分析对应abort最后的代码,如果ensureSignalSuccessorStatus成功和thread != null时才可以认为没有唤醒丢失的问题,其余情况即1和3,需要无条件唤醒C。情况3中有可能A唤醒了C,B也唤醒了C,但是考虑到A可能去唤醒D了,所以此处还是需要B确定C被唤醒。
条件中成功时此时B可以选择帮助D设置next指针为C,CAS失败此处没有影响。
abort中最复杂的分析至此结束。实际AQS的代码把图中中间和下部的情况和在一起考虑。
老实说个人觉得abort时的唤醒分析确实比较复杂,从编码上一律唤醒可能是最简单的,但是AQS分析了一部分并且用排除的方式解决了一部分不需要唤醒的情况,理解这块代码可能是设计UnfairTimedLock1最关键的地方。
abort中还有一些代码,比如最开始的设置thread为null。个人理解这是放弃了的线程减少被额外唤醒用的,比如作为第一个候选节点的时候。上面的分析中,基本都是依赖status来判断节点是否放弃,没有看到用thread == null来判断是否abort的地方。当然如果我的分析错了,欢迎指出。
abort中在设置status为ABORTED之前,还有一段跳过放弃了的节点的代码。个人理解在设置status之前是为了防止后续节点跳过自己简化分析。理论上放在设置status之后也是可以的,甚至不跳过也不会造成太大问题。
说到跳过放弃了的节点,Queue#skipAbortedPredecessors中不是找到了第一个未放弃的节点之后设置自己的previous指针,而是遍历时每次都设置。个人理解是想做多线程下的helping,即A在skip时,A后面的节点在skip时,可以通过A更快找到未放弃的节点。只是考虑到skipAbortedPredecessors时(包括abort方法)节点还没设置status为ABORTED,理论上,A的后续节点遍历到了A就停下了。即使A skip之后,立马abort,后续节点也不需要这种helping。所以个人这块持保留意见(如果AQS的cancelAcquire方法原先是在设置status之后skip的话另当别论)。至少在现在的ReentrantLock中应该是不需要的。
UnfairTimedLock2
在写好了限时版tryLock之后,其他lock方法依样画葫芦得到的最终版。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; @SuppressWarnings(“Duplicates”) public class UnfairTimedLock2 implements Lock { private final Queue queue = new Queue(); private final AtomicInteger reentrantTimes = new AtomicInteger(0); private Thread owner; public void lock() { if (tryLock()) { return; } Node node = new Node(Thread.currentThread()); queue.enqueue(node); while (true) { if (queue.isNextCandidate(node) && reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) { myTurn(node); return; } if (isReadyToPark(node)) { LockSupport.park(this); } } } public void lockInterruptibly() throws InterruptedException { if (tryLock()) { return; } Node node = new Node(Thread.currentThread()); queue.enqueue(node); while (true) { if (queue.isNextCandidate(node) && reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) { myTurn(node); return; } if (isReadyToPark(node)) { LockSupport.park(this); } if (Thread.interrupted()) { abort(node); throw new InterruptedException(); } } } public boolean tryLock() { if (owner == Thread.currentThread()) { // reentrant reentrantTimes.incrementAndGet(); return true; } if (reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) { owner = Thread.currentThread(); return true; } return false; } public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException { if (tryLock()) { return true; } final long deadline = unit.toNanos(time) + System.nanoTime(); Node node = new Node(Thread.currentThread()); queue.enqueue(node); long nanos; while (true) { if (queue.isNextCandidate(node) && reentrantTimes.get() == 0 && reentrantTimes.compareAndSet(0, 1)) { myTurn(node); return true; } nanos = deadline – System.nanoTime(); // timeout if (nanos <= 0L) { abort(node); return false; } if (isReadyToPark(node)) { LockSupport.parkNanos(this, nanos); } if (Thread.interrupted()) { abort(node); throw new InterruptedException(); } } } private boolean isReadyToPark(@Nonnull Node node) { Node p = node.predecessor.get(); int s = p.status.get(); if (s == Node.STATUS_SIGNAL_SUCCESSOR) { return true; } if (s == Node.STATUS_ABORTED) { p = queue.skipAbortedPredecessors(node); p.successor.set(node); } else if (s == Node.STATUS_NORMAL) { p.status.compareAndSet(Node.STATUS_NORMAL, Node.STATUS_SIGNAL_SUCCESSOR); } return false; } private void abort(@Nonnull Node node) { node.clearThread(); Node p = queue.skipAbortedPredecessors(node); Node ps = p.successor.get(); // linearization point node.status.set(Node.STATUS_ABORTED); if (queue.tail.get() == node && queue.tail.compareAndSet(node, p)) { p.successor.compareAndSet(ps, null); return; } if (p != queue.head.get() && p.ensureSignalSuccessorStatus() && p.thread.get() != null) { Node s = node.successor.get(); if (s != null && s.status.get() != Node.STATUS_ABORTED) { p.successor.compareAndSet(ps, s); } } else { signalNormalSuccessor(node); } } private void myTurn(@Nonnull Node node) { owner = Thread.currentThread(); node.clearThread(); queue.head.set(node); reentrantTimes.set(1); Node predecessor = node.predecessor.get(); node.predecessor.set(null); predecessor.successor.set(null); } private void signalNormalSuccessor(@Nonnull Node node) { if (node.status.get() == Node.STATUS_SIGNAL_SUCCESSOR) { node.status.compareAndSet(Node.STATUS_SIGNAL_SUCCESSOR, Node.STATUS_NORMAL); } Node successor = queue.findNormalSuccessor(node); if (successor != null) { LockSupport.unpark(successor.thread.get()); } } public void unlock() { if (owner != Thread.currentThread()) { throw new IllegalStateException(“not the thread holding lock”); } int rt = reentrantTimes.get(); if (rt < 1) { throw new IllegalStateException(“reentrant times < 1 when try to unlock”); } if (rt > 1) { reentrantTimes.set(rt – 1); return; } // rt == 1 owner = null; // linearization point reentrantTimes.set(0); Node node = queue.head.get(); if (node != null && node.status.get() == Node.STATUS_SIGNAL_SUCCESSOR) { signalNormalSuccessor(node); } } @Override @Nonnull public Condition newCondition() { throw new UnsupportedOperationException(); } @SuppressWarnings(“Duplicates”) private static class Queue { final AtomicReference<Node> head = new AtomicReference<>(); final AtomicReference<Node> tail = new AtomicReference<>(); /** * Enqueue node. * * @param node new node * @return predecessor */ @Nonnull Node enqueue(@Nonnull Node node) { Node t; while (true) { t = tail.get(); if (t == null) { // lazy initialization Node sentinel = new Node(); if (head.get() == null && head.compareAndSet(null, sentinel)) { tail.set(sentinel); } } else { node.predecessor.lazySet(t); // linearization point if (tail.compareAndSet(t, node)) { t.successor.set(node); return t; } } } } @Nullable Node findNormalSuccessor(@Nonnull Node node) { Node n = node.successor.get(); if (n != null && n.status.get() != Node.STATUS_ABORTED) { return n; } // find normal node from tail Node c = tail.get(); n = null; // tail maybe null during lazy initialization while (c != null && c != node) { if (c.status.get() != Node.STATUS_ABORTED) { n = c; } c = c.predecessor.get(); } return n; } boolean isNextCandidate(@Nonnull Node node) { return node.predecessor.get() == head.get(); } @Nonnull Node skipAbortedPredecessors(@Nonnull Node node) { Node h = head.get(); Node p = node.predecessor.get(); while (p != h && p.status.get() != Node.STATUS_ABORTED) { p = p.predecessor.get(); /* * set predecessor every time to help successors of * current node to find the normal predecessor more quickly */ node.predecessor.set(p); } return p; } } /** * Node. * <p> * Status change: * NORMAL -> ABORTED */ private static class Node { static final int STATUS_NORMAL = 0; static final int STATUS_ABORTED = –1; static final int STATUS_SIGNAL_SUCCESSOR = 1; /** * thread will be null if * 1. abort * 2. enter mutual exclusion area */ final AtomicReference<Thread> thread; final AtomicInteger status = new AtomicInteger(STATUS_NORMAL); final AtomicReference<Node> predecessor = new AtomicReference<>(); // optimization final AtomicReference<Node> successor = new AtomicReference<>(); Node() { this(null); } Node(@Nullable Thread thread) { this.thread = new AtomicReference<>(thread); } boolean ensureSignalSuccessorStatus() { int s = this.status.get(); return s == STATUS_SIGNAL_SUCCESSOR || (s == STATUS_NORMAL && this.status.compareAndSet(s, STATUS_SIGNAL_SUCCESSOR)); } void clearThread() { thread.set(null); } } } |
如果你看过AQS的源代码的话,你可能会注意这里的tryLock和AQS的tryAcquire,isReadyToPark和shouldParkAfterFailedAcquire,abort和cancelAcquire比较接近。有兴趣的话可以对比一下。
小结
限时锁个人认为是比较难实现的一种锁,但是在解决死锁问题等地方会有用。如果你理解了限时锁的分析的话,相信你也能写出类似ReentrantLock一样的锁。
如果你想更全面地了解锁的实现的话,建议阅读《the art of multiprocessor programming》中关于锁的部分。虽然大部分都是spin lock,具体实际基于park代码比较远,但是对训练多线程代码分析和设计很有帮助。
接下来,个人要实现的是ReadWriteLock,具体在第三篇中讲解。
One response to “Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(2)”
感谢分享!已推荐到《开发者头条》:https://toutiao.io/posts/rdwbpb 欢迎点赞支持!
使用开发者头条 App 搜索 385148 即可订阅《并发与分布式系统研究》