Java并发研究 Exchanger以及背后的dual data structure


在CLQ(ConcurrentLinkedQueue)之后,想了一下继续分析什么类比较好。

想到的是《the art of multiprocessor programming》11章中EliminationStack。当然Java的并发库里面没有这个Stack。EliminationStack是书中为了解决TreiberStack的串行特性提出的方案。TreiberStack虽然是比较简单的无锁并发数据类型,但是由于所有请求都在一个地方(top节点)上CAS,在高并发下,性能并不理想。书中认为这是TreiberStack或者说Stack本身的串行特性导致的(相对的,队列可以通过队首与队尾两个节点来分离请求),所以提出一种方案:dual stack,即在TreiberStack的基础上,push和pop两个请求配对。实际代码中先尝试常规的CAS push或者pop,如果失败,转而使用push和pop配对方式。这样做一方面把并发的请求分布到了不同的地方,另一方面提高了Stack的并发度。

为了实现push和pop请求配置,书中创建了一种Exchanger同步器,即线程A把数据给线程B,线程B把数据给线程A。(如果你奇怪pop可以给push什么数据的话,想想看pop把null给push,而push不使用pop给的数据)正好,Java的类库里也有一个叫做Exchanger的同步器,而且在注释中提到了dual data structure。所以,我想比较一下两者的实现细节。

Java类库中的Exchanger,作者除了Doug Lea还有Bill Scherer和Michael Scott,其中Michael Scott是写了dual data structure论文的两人之一(不确定Bill Scherer是不是William N. Scherer III)。这三人其实还写了SynchronousQueue,即CSP中用做channel的那种同步器,其实SynchronousQueue也是dual data structure的一种典型用法。dual data structure论文的名字是Nonblocking concurrent data structures with condition synchronization,完整的引用如下:

Scherer, William N., and Michael L. Scott. “Nonblocking concurrent data structures with condition synchronization.” International Symposium on Distributed Computing. Springer, Berlin, Heidelberg, 2004.

示例代码dualstack和dualqueue可以看这里。

http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html

在正式分析Java类库中的Exchanger之前,建议先了解一下《the art of multiprocessor programming》中给出的Exchanger实现,因为书中的实现比类库中的Exchanger要简单很多。Exchanger最重要的还是核心思想,如果你不懂dual data structure,你其实也可以实现一个这样的同步器:

  • 线程对等,即双方都有数据
  • 首先两个线程A和B抢占一个slot
  • 第一个抢到的人等待后一个人
  • 后一个人拿到第一个人的数据,把自己的数据给第一个人,之后后一个人结束
  • 第一个人拿到后一个人的数据,之后第一个人结束

以上是条件和正常流程,存在一个异常流程

  1. 线程A抢占slot
  2. 约定时间给没有等到后一个人出现
  3. 线程A超时退出

具体怎么实现?异常流程提到了超时,所以Exchanger的接口可以设计为

interface Exchanger<T> {
  T exchange(T value, long time, TimeUnit unit) throws TimeoutException;
}

线程A和B抢占一个slot,其实就是CAS同一个变量,设置为自己的值。由于有先后,所以需要用额外的状态变量,比如EMPTY,FIRST和SECOND。核心的状态迁移图:

(null, EMPTY) -> (data1, FIRST) -> (data2, SECOND) -> (null, EMPTY)

异常的状态迁移图(超时)

(null, EMPTY) -> (data1, FIRST) -> (null, EMPTY)

因为是两个变量,所以需要用AtomicStampedReference。以下是实际代码

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.LockSupport;

public class Exchanger1<T> {

    private static final int STATE_EMPTY = 0;
    private static final int STATE_FIRST = 1;
    private static final int STATE_SECOND = 2;

    private static final int THREAD_STATE_NORMAL = 0;
    private static final int THREAD_STATE_PARK = 1;
    private static final int THREAD_STATE_WAKE_UP = 2;

    private final AtomicStampedReference<Payload<T>> reference = new AtomicStampedReference<>(null, STATE_EMPTY);

    public T exchange(T value) {
        final Payload<T> payload = new Payload<>(value, Thread.currentThread());
        int[] stateHolder = new int[1];
        Payload<T> p;
        while (true) {
            p = reference.get(stateHolder);
            if (stateHolder[0] == STATE_EMPTY && reference.compareAndSet(null, payload, STATE_EMPTY, STATE_FIRST)) {
                LockSupport.park(this);
                // assert reference.getStamp() == STATE_SECOND;
                p = reference.getReference();
                reference.set(null, STATE_EMPTY); // EXCHANGED -> EMPTY
                return p.value;
            }
            if (stateHolder[0] == STATE_FIRST && reference.compareAndSet(p, payload, STATE_FIRST, STATE_SECOND)) {
                LockSupport.unpark(p.thread);
                return p.value;
            }
        }
    }

    public T exchange(T value, long time, TimeUnit unit) throws TimeoutException, InterruptedException {
        final long startTime = System.nanoTime();
        final long timeout = unit.toNanos(time);
        final Payload<T> payload = new Payload<>(value, Thread.currentThread());
        int[] stateHolder = new int[1];
        Payload<T> p;
        while (true) {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            p = reference.get(stateHolder);
            if (stateHolder[0] == STATE_EMPTY && reference.compareAndSet(null, payload, STATE_EMPTY, STATE_FIRST)) {
                payload.park(timeout);
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
                p = reference.get(stateHolder);
                if (stateHolder[0] == STATE_SECOND) {
                    reference.set(null, STATE_EMPTY);
                    return p.value;
                }
                // assert reference.getStamp() == STATE_FIRST
                if (reference.compareAndSet(payload, null, STATE_FIRST, STATE_EMPTY)) {
                    throw new TimeoutException();
                }
                // assert reference.getStamp() == STATE_SECOND
                p = reference.getReference();
                reference.set(null, STATE_EMPTY);
                return p.value;
            }
            if (stateHolder[0] == STATE_FIRST && reference.compareAndSet(p, payload, STATE_FIRST, STATE_SECOND)) {
                p.wakeUp();
                return p.value;
            }
            if (System.nanoTime() - startTime > timeout) {
                throw new TimeoutException();
            }
        }
    }

    private static class Payload<T> {
        final T value;
        final Thread thread;
        final AtomicInteger threadState = new AtomicInteger(THREAD_STATE_NORMAL);

        Payload(T value, Thread thread) {
            this.value = value;
            this.thread = thread;
        }

        void park(long nanos) {
            if (threadState.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_PARK)) {
                LockSupport.parkNanos(this, nanos);
                if (!threadState.compareAndSet(THREAD_STATE_PARK, THREAD_STATE_WAKE_UP)) {
                    Thread.interrupted();
                }
            }
        }

        void wakeUp() {
            int ts = threadState.get();
            if (ts == THREAD_STATE_NORMAL && threadState.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_WAKE_UP)) {
                return;
            }
            if (ts == THREAD_STATE_PARK && threadState.compareAndSet(THREAD_STATE_PARK, THREAD_STATE_WAKE_UP)) {
                thread.interrupt();
            }
        }
    }
}

 

如果你觉得超时流程有点复杂的话,可以看一下没有超时的exchange方法的实现。

线程先尝试CAS EMPTY -> FIRST,如果成功,表示自己是第一个线程,进入等待状态。否则第二次循环,尝试CAS FIRST -> SECOND,获取第一个线程的数据,设置自己的数据,唤醒第一个线程。第一个线程被唤醒后,由于只有第二个线程唤醒它,所以第一个线程知道现在第二个线程的数据已经就绪,获取第二个线程的数据,之后使用SET方法(因为第二个线程不会再CAS,即使有第三个线程,也不会对处于SECOND状态的数据做任何操作),重置状态的数据。

带超时的版本,第一个线程会进入带超时等待状态,醒来的时候(有可能超时)需判断当前状态,如果是SECOND就可以正常返回,否则再次尝试CAS。这里不能直接SET的原因是,可能第二个线程可以在此时出现并CAS。第一个线程超时后如果CAS成功,则抛出超时异常,否则第二个线程已出现,获取数据后通过SET重置状态。

Payload中的park和wakeUp和之前分析FutureTask用的代码ThreadScheduler是一样的,这里不再展开。

注意,《the art of multiprocessor programming》给的Exchanger代码和上面的代码有点不同,等待策略使用的是spin而不是park,虽然使用spin的实现一般都比较简单。

如果你尝试分析类库中的Exchanger,你并不会发现AtomicStampedReference,或者类似AtomicStampedReference的东西。你其实可以考虑一下,假如不用AtomicStampedReference,而是常规的单个变量的CAS,是否可以实现Exchanger。答案是可以的。具体可以有两种方案

  1. 状态变量和数据分离,对状态单独CAS
  2. 不使用状态变量,用一个包装类交换数据

第一个方案,只需要在上面的代码上修改一下即可,参考代码如下

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

public class Exchanger3<T> {

    private static final int STATE_EMPTY = 0;
    private static final int STATE_FIRST = 1;
    private static final int STATE_SECOND = 2;

    private static final int THREAD_STATE_NORMAL = 0;
    private static final int THREAD_STATE_PARK = 1;
    private static final int THREAD_STATE_WAKE_UP = 2;

    private final AtomicInteger state = new AtomicInteger(STATE_EMPTY);
    private volatile Payload<T> payload = null;

    public T exchange(T value, long time, TimeUnit unit) throws TimeoutException, InterruptedException {
        final long startTime = System.nanoTime();
        final long timeout = unit.toNanos(time);
        final Payload<T> payload = new Payload<>(value, Thread.currentThread());
        int s;
        Payload<T> p;
        boolean first = false;
        while (true) {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            s = state.get();
            if (first) {
                if (s == STATE_SECOND) {
                    p = waitForPayload();
                    state.set(STATE_EMPTY);
                    return p.value;
                }
                // assert reference.getStamp() == STATE_FIRST
                if (state.compareAndSet(STATE_FIRST, STATE_EMPTY)) {
                    throw new TimeoutException();
                }
                // assert reference.getStamp() == STATE_SECOND
                p = waitForPayload();
                state.set(STATE_EMPTY);
                return p.value;
            }
            if (s == STATE_EMPTY && state.compareAndSet(STATE_EMPTY, STATE_FIRST)) {
                this.payload = payload;
                first = true;
                payload.park(timeout);
            } else if (s == STATE_FIRST && state.compareAndSet(STATE_FIRST, STATE_SECOND)) {
                p = waitForPayload();
                this.payload = payload;
                p.wakeUp();
                return p.value;
            } else if (System.nanoTime() - startTime > timeout) {
                throw new TimeoutException();
            }
        }
    }

    private Payload<T> waitForPayload() {
        Payload<T> p;
        while ((p = this.payload) == null) {
            Thread.yield();
        }
        return p;
    }

    private static class Payload<T> {
        final T value;
        final Thread thread;
        final AtomicInteger threadState = new AtomicInteger(THREAD_STATE_NORMAL);

        Payload(T value, Thread thread) {
            this.value = value;
            this.thread = thread;
        }

        void park(long nanos) {
            if (threadState.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_PARK)) {
                LockSupport.parkNanos(this, nanos);
                if (!threadState.compareAndSet(THREAD_STATE_PARK, THREAD_STATE_WAKE_UP)) {
                    Thread.interrupted();
                }
            }
        }

        void wakeUp() {
            int ts = threadState.get();
            if (ts == THREAD_STATE_NORMAL && threadState.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_WAKE_UP)) {
                return;
            }
            if (ts == THREAD_STATE_PARK && threadState.compareAndSet(THREAD_STATE_PARK, THREAD_STATE_WAKE_UP)) {
                thread.interrupt();
            }
        }
    }
}

你可以注意一个方法waitForPayload,这是Exchanger1里面没有的方法。之后第二种方案其实也有类似的等待过程。

第二种方案,所谓的包装类,就是一个类里面既有第一个线程的数据,也有第二个线程的数据。线程首先设置自己包装类实例中的数据,第一个线程CAS一个slot null -> not null,假如成功,进入等待状态。第二个线程CAS not null -> null,此时得到了第一个线程的包装类实例,然后获取第一个类的数据,把自己的值设置进去,最后唤醒第一个线程。

类库中的Exchanger的注释中其实提到了这一过程

for (;;) {
 if (slot is empty) {                       // offer
   place item in a Node;
   if (can CAS slot from empty to node) {
     wait for release;
     return matching item in node;
   }
 }
 else if (can CAS slot from node to empty) { // release
   get the item in node;
   set matching item in node;
   release waiting thread;
 }
 // else retry on CAS failure
}

基于以上方案的代码(借用了类库中Exchanger的一些优化)

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class Exchanger4<T> {

    private static final int THREAD_STATE_NORMAL = 0;
    private static final int THREAD_STATE_PARK = 1;
    private static final int THREAD_STATE_WAKE_UP = 2;

    private static final int SPINS = 1 << 5;

    private final ThreadLocal<Pair<T>> myPair = ThreadLocal.withInitial(Pair::new);
    private final AtomicReference<Pair<T>> slot = new AtomicReference<>(null);

    public T exchange(T value, long time, TimeUnit unit) throws TimeoutException, InterruptedException {
        final long deadline = System.nanoTime() + unit.toNanos(time);
        final Pair<T> myPair = this.myPair.get();
        myPair.item = value;
        Pair<T> p;
        while (true) {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            p = slot.get();
            if (p == null && slot.compareAndSet(null, myPair)) { // pair first
                return waitForMatchSpinPark(myPair, deadline);
            } else if (slot.compareAndSet(p, null)) { // pair second
                assert p != null;
                // item will be reset, so read item to local variable before setting match
                T item = p.item;
                p.match = value;
                p.wakeUp();
                return item;
            } else if (System.nanoTime() > deadline) {
                myPair.item = null;
                throw new TimeoutException();
            }
        }
    }

    private T waitForMatchPark(Pair<T> myPair, long deadline) throws TimeoutException, InterruptedException {
        myPair.parkUntil(deadline);
        T m = myPair.match;
        if (m != null) {
            myPair.reset();
            return m;
        }
        if (slot.compareAndSet(myPair, null)) {
            myPair.reset();
            if (Thread.interrupted()) {
                throw new InterruptedException();
            } else {
                throw new TimeoutException();
            }
        }
        m = myPair.match;
        myPair.reset();
        return m;
    }

    private T waitForMatchSpinPark(Pair<T> myPair, long deadline) throws TimeoutException, InterruptedException {
        int spins = SPINS;
        T m;
        long nanos;
        while (true) {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            m = myPair.match;
            if (m != null) {
                myPair.reset();
                return m;
            }
            spins--;
            nanos = deadline - System.nanoTime();
            if (slot.get() != myPair) {
                // slot changed, match will come
                spins = SPINS;
            } else if (spins < 0 && nanos > 0) { // spin -> park
                myPair.parkUntil(deadline);
            } else if (nanos < 0 && slot.compareAndSet(myPair, null)) { // timeout
                myPair.reset();
                throw new TimeoutException();
            }
        }
    }

    private static final class Pair<T> {
        T item = null;
        volatile T match = null;

        final Thread thread;
        final AtomicInteger threadState = new AtomicInteger(THREAD_STATE_NORMAL);

        Pair() {
            this.thread = Thread.currentThread();
        }

        void reset() {
            item = null;
            match = null; // lazy set is also ok
            int ts = threadState.get();
            if (ts != THREAD_STATE_NORMAL) {
                threadState.set(THREAD_STATE_NORMAL); // lazy set is also ok
            }
        }

        void parkUntil(long deadline) {
            long nanos = deadline - System.nanoTime();
            if (nanos <= 0) {
                return;
            }
            if (threadState.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_PARK)) {
                LockSupport.parkNanos(this, nanos);
                if (!threadState.compareAndSet(THREAD_STATE_PARK, THREAD_STATE_WAKE_UP)) {
                    Thread.interrupted();
                }
            }
        }

        void wakeUp() {
            int ts = threadState.get();
            if (ts == THREAD_STATE_NORMAL && threadState.compareAndSet(THREAD_STATE_NORMAL, THREAD_STATE_WAKE_UP)) {
                return;
            }
            if (ts == THREAD_STATE_PARK && threadState.compareAndSet(THREAD_STATE_PARK, THREAD_STATE_WAKE_UP)) {
                thread.interrupt();
            }
        }
    }
}

如果你理解了使用AtomicStampedReference的方案的话,相信上述代码也不会太难。

讲几个细节。

第一个是为什么使用threadlocal,或者说为什么可以使用threadlocal。一般来说,实例的生命周期完全可控,或者只有固定一个线程使用的话,可以使用threadlocal,就像CLH Lock那样。这里,第一个线程把自己的包装类Pair的实例CAS设置到slot中之后,虽然第二个线程会访问到这个实例,但是第二个线程并不拥有这个实例,而且设置了match之后,控制权就交还给第一个线程,所以第一个线程对这个Pair的实例有控制权,实际运行中不会有ABA问题,即使碰到超时场景。类库中的Exchanger其实也使用了threadlocal,有兴趣的人可以看一下。

第二个是包装类Pair里面item为什么没有volatile修饰。Pair这个类中item是第一个线程的数据,match是第二个线程的数据。理论上有多个线程访问的数据应该设置为volatile,即使是两线程,一方写一方读的情况。这里的原因在于item在被第二个线程访问之前,有一次CAS。基本所有的CPU在CAS时会同时执行memory barrier,加上item的写在CAS之前,构成了一个有序关系

thread 1: write item
thread 1: CAS slot from null to my pair
thread 2: CAS slot from some pair to null
thread 2: read item

上述步骤因为有CAS在中间,执行顺序无法改变,所以可以保证item被第二个线程读到。如果还是比较难理解的话,可以考虑一个没有volatile的变量item通过lock控制访问的场景

thread 1
lock
write item
unlock

thread 2
lock
read item
unlock

用Java的happens-before来讲就是,因为unlock发生在lock前,所以write item肯定发生在read item之前。虽然有点不准确,但是大致就是这种意思。

第三个是等待的代码waitForMatchSpinPark。类库中的Exchanger实际用的是spin约2048次,yield两次,最后park的方式,这里简化为spin+park。理论上来说,可以只等待match被设置为非null(注意到一个细节没有,因为条件是非null,所以Exchanger中使用的值不可以为null。然而类库中的Exchanger并没有提到不可以null,那是因为Exchanger使用了特殊的值NULL_ITEM代替),但是考虑到第二个线程需要先一次CAS,然后再设置match的值,所以spin过程中发现当前slot被修改了,往往是被设置为null了,那么第二个线程已经出现,此时即使spins为0也应该继续spin,而不是park。所以代码中会重置spins再次通过spin等待match的值。这是相对难理解的一个地方。

代码的演化基本到这里就结束了,看过Exchanger代码的人可以发现上述代码和Exchanger的slotExchange很像。但是Exchanger还有一个arenaExchange方法,与slotExchange不同的地方是会通过一个数组允许超过两个的线程相互交换数据。个人在分析了arenaExchange之后,觉得代码本身可以运作,但是太过复杂。个人认为大部分原因可能是作者认为inline代码可以提高效率,没有使用LockSupport等现有类,而是直接全用unsafe实现,以至于让我有一种错觉,1.5之后这代码就没改过。但是又有1.8开始的Contended标注。

尽管如此,个人还是分析一些实际代码中的点,对想要学习Exchanger的人一些参考。

第一个是刚才简化掉的spin + yield + park等待策略,代码中有一段xorshift

h ^= h << 1;
h ^= h >>> 3;
h ^= h << 10;

这段其实是用xorshift实现的伪随机数。有兴趣的人可以看下这个链接

https://www.javamex.com/tutorials/random_numbers/xorshift.shtml

注意1, 3, 10不是随便选的,这是论文中给出的可用的32位随机数生成器的可用a, b, c之一。同时xorshift要求输入不能为0,所以代码中会有一段 SPINS | (int) thread.getId()。

因为是伪随机数,所以输出序列是固定的。实际运行中,大约会有50%几率(arenaExchange注释中有提到)得到负数。又由于代码中在随机数是负数时减少spins,所以默认的1024次spins实际会被执行大约2048次,换句话说for循环大约执行2048次后,spins变成0,而不是1024次。同时,代码中会判断spins和 (SPINS >>> 1) – 1的位操作结果来执行yield,注释提到会执行两次。为什么是两次?因为这两次分别是spins为1 << 9和0的时候,只有这两次时位操作结果才是0。

第二个是代码中对于false sharing的处理。这里不准备讲false sharing的概念。只是提一下false sharing的解决方法。就是1.8引入的Contended标注,在类上设置这个标注之后,类的前后会有128字节(是的,字节)的padding。假如你在类上设置了这个标注,那么这个类构成的数组是否也解决了false sharing问题?答案是否定的。这涉及到Java对象的内存布局。Java的数组首先有一个数组头,然后是每个对象object的指针,这个指针在Java 1.8默认启动指针压缩的情况下是4个字节。所以你对数组元素进行CAS的话,还是会碰到false sharing问题,即便你在对象上设置了Contended。记住,Contended只能保证连续的对象内容之间不会有false sharing问题,但是不能处理连续对象指针之间的问题。所以Exchanger中是这样初始化数组的

arena = new Node[(FULL + 2) << ASHIFT]

假如FULL为4(在8核机器上实际运行一下得到的结果,FULL表示数组可用元素的最大长度,可以想象8核机器上8个线程使用同一个Exchanger,那么最多只需要4个空位就可以交换数据),那么会创建长度为768的数组!远远超过预想中的4。原因是之前提到的对象指针长度为4,为了解决false sharing问题,Exchanger创建如下对象结构

数组头
padding 128 bytes
element 0 128 bytes
element 1 128 bytes
element 2 128 bytes
element 3 128 bytes
padding 128 bytes

总共数组元素长度加padding总共为768bytes,这是解决false sharing的最小数组大小。实际768个元素*对象指针大小4个字节得到占用空间为3072bytes,约3KB,满足最小大小(当然3/4是被浪费掉了,如果未开启指针压缩,7/8都是被浪费掉的)。Exchanger实际使用数组元素是类似这样的

U.getObjectVolatile(a, (i << ASHIFT) + ABASE)

ABASE是数组头偏移加上第一个padding,ASHIFT为128,ABASE加上数组元素偏移得到预想中的数据位置,完全可以解决false sharing问题。

第三个是Exchanger中bound这个变量。内容是一个复合变量。低16位是arena的逻辑数组长度,16位之前是版本号。为什么需要一个版本号?原因在于arenaExchange在运行过程中会动态修改逻辑数组长度,而且这个长度会增长也会缩小的,所以单纯对长度进行CAS的话会碰到ABA问题。ABA问题最简单的解决方案是加版本号,所以这里是一个复合变量,带版本号。看一下bound是怎么变化的。

U.compareAndSwapInt(this, BOUND, 0, SEQ)

// shrink
// b = bound
// m = bound & MMASK
if (m != 0) {
  U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1)
}

// grow
U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)

第一行是创建数组时,SEQ为0x100,表示版本号为1,数组长度为0。第二种是缩小,在长度不为0时,增加SEQ,减去长度1。第三种是增加,加SEQ即版本,增加长度1。

相信理解bound是复合变量之后,上面的代码也容易理解了。

最后讲一下Exchanger基于数组的扩大和缩小策略。这是Exchanger核心算法的一部分。Exchanger选择使用threadlocal和数组而不是原来dual data structure的链表。用链表的话,就无法用threadlocal,因为用于交换的节点会被其他线程访问和修改,主要是next,一方面生成周期不可控,另外一方可能会有ABA问题。但是用链表就不需要现在Exchanger里这么复杂的扩大和缩小策略,因为新来的线程在失败时会创建新节点,然后等待其他线程来访。Exchanger在讲到等待match时候优先选择用threadlocal,这是一个tradeoff。扩大和缩小的策略如下:

  • 逻辑数组初始大小为0
  • 当线程无法通过slotExchange交换时开始arenaExchange,这往往是第三个线程
  • 数组逻辑大小为0时,CAS bound长度加1,版本加1。由于数组预先全部创建完成,所以不会有物理扩大,只是逻辑上的
  • 在数组上进行类似slotExchange的操作
  • 如果有更多的线程进来,会扩大逻辑大小直到最大值FULL
  • 如果线程等不到配对线程,会到当前数组索引号/2的位置继续尝试交换,同时缩小数组逻辑大小(减1),版本加1;如果当前索引号时0,则会park
  • arenaExchange一旦被开启,之后所有请求都不会进入slotExchange

按照Exchanger注释的说法,这是一种比较保守的扩大缩小策略。这个策略是否真得那么有用,我个人持保留意见。顺便说一句,《the art of multiprocessor programming》中的EliminationStack在Exchanger上做了一个ElimiationArray,用随机的方式选择交换位置。

以上就是我对Exchanger的分析,老实说,Exchanger和原来的dual data structure还是有比较大的不同的,优化做了很多,但是为了提高效率,把unsafe代码全放进来导致代码异常复杂个人觉得有点得不偿失。另外这里使用的扩大缩小策略相比原来链表的方式是一种不同的选择。