Java多线程同步工具类:Semaphore、CountDownLatch 、CyclicBarrier、Exchanger原理剖析

编程入门 行业动态 更新时间:2024-10-10 05:16:46

Java<a href=https://www.elefans.com/category/jswz/34/1767532.html style=多线程同步工具类:Semaphore、CountDownLatch 、CyclicBarrier、Exchanger原理剖析"/>

Java多线程同步工具类:Semaphore、CountDownLatch 、CyclicBarrier、Exchanger原理剖析

Java多线程同步工具类:Semaphore、CountDownLatch 、CyclicBarrier、Exchanger原理剖析

文章目录

  • Java多线程同步工具类:Semaphore、CountDownLatch 、CyclicBarrier、Exchanger原理剖析
  • 一、Semaphore
    • 原理剖析
    • 实战案例
  • 二、CountDownLatch
    • 原理剖析
      • await()实现分析
      • countDown()实现分析
    • 总结
    • 实战案例
  • 三、CyclicBarrier
    • 原理剖析
    • 实战案例
  • 四、Exchanger
    • 原理剖析
      • exchange(V x)实现分析
      • slotExchange的实现
      • arenaExchange的实现
    • 实战案例

前驱知识准备:AbstractQueuedSynchronizer队列同步器

[Java多线程之:队列同步器AbstractQueuedSynchronizer原理剖析]

一、Semaphore

原理剖析

Semaphore也就是信号量,提供了资源数量的并发访问控制,可以用于限制访问某些资源(物理或逻辑的)的线程数目。Semaphore是AQS队列同步器中对共享锁的子类实现。Semaphore维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。

Semaphore核心代码功能使用如下所示:

// ⼀开始有5份共享资源。第⼆个参数表示是否是公平
Semaphore myResources = new Semaphore(5, true);// 工作线程每获取⼀份资源,就在该对象上记下来
// 在获取的时候是按照公平的方式还是非公平的方式,就要看上⼀行代码的第二个参数了。
// ⼀般⾮公平抢占效率较高。
myResources.acquire();// 工作线程每归还⼀份资源,就在该对象上记下来
// 此时资源可以被其他线程使⽤
myResources.release();/*
释放指定数目的许可,并将它们归还给信标。
可用许可数加上该指定数目。
如果线程需要获取N个许可,在有N个许可可用之前,该线程阻塞。
如果线程获取了N个许可,还有可用的许可,则依次将这些许可赋予等待获取许可的其他线程。
*/
semaphore.release(2);/*
从信标获取指定数⽬的许可。如果可用许可数目不够,则线程阻塞,直到被中断。该⽅法效果与循环相同,
for (int i = 0; i < permits; i++) acquire();
只不过该方法是原⼦操作。如果可用许可数不够,则当前线程阻塞,直到:(⼆选⼀)
1. 如果其他线程释放了许可,并且可用的许可数满足当前线程的请求数字;
2. 其他线程中断了当前线程。permits – 要获取的许可数
*/
semaphore.acquire(3);

Semaphore在争抢资源时的示意图如下图所示,假设有n个线程来获取Semaphore⾥⾯的10份资源(n > 10), n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ro65uLPv-1670164010648)(E:\笔记\截图\SemaPhore争抢资源示意图.png)]

当初始的资源个数为1的时候, Semaphore退化为排他锁。正因为如此, Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分

public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
public void release() {sync.releaseShared(1);
}

SemaPhore也是使用了队列同步器AbstractQueuedSynchronizer来实现多线程间的同步操作,abstract static class Sync extends AbstractQueuedSynchronizer

acquire()方法通过判断剩余资源数和线程所需资源数的差值是否小于0,如果小于0,则当前线程进行阻塞,如果大于0,对state变量(state变量在AQS中表示同步状态)进行CAS减操作,减到0之后,线程阻塞。在release里对state变量进行CAS加操作。

public class Semaphore {protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}static final class FairSync extends Sync {// ...FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();// 判断剩余资源数,如果<0,说明资源数不够了,获取资源失败int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining))return remaining;}}}
}/*** Acquires in shared interruptible mode.* @param arg the acquire argument*/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// addWaiter将当前线程封装成共享节点,放在等待队列尾部final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {// 当前线程前驱节点final Node p = node.predecessor();if (p == head) {// 尝试获取args数量的资源数,如果资源数不够,则返回<0int r = tryAcquireShared(arg);if (r >= 0) {// 向后传播唤醒当前节点后的节点setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

关于该方法的更多详细原理剖析,可以看AQS介绍的相关博客:[Java多线程之:队列同步器AbstractQueuedSynchronizer原理剖析](=1001.2014.3001.5502)

实战案例

下面给出一个案例来测试Semaphore的使用:

case: 自习室抢座,写作业:

抢座位的线程:

import java.util.Random;
import java.util.concurrent.Semaphore;public class MyThread extends Thread {private final Semaphore semaphore;private final Random random = new Random();public MyThread(String name, Semaphore semaphore) {super(name);this.semaphore = semaphore;}@Overridepublic void run() {try {semaphore.acquire();System.out.println(Thread.currentThread().getName() + " - 抢座成功,开始写作业");Thread.sleep(random.nextInt(1000));System.out.println(Thread.currentThread().getName() + " - 作业完成,腾出座位");} catch (InterruptedException e) {e.printStackTrace();}semaphore.release();}
}

主方法:

import java.util.concurrent.Semaphore;
public class Demo {public static void main(String[] args) throws InterruptedException {Semaphore semaphore = new Semaphore(2);for (int i = 0; i < 5; i++) {new MyThread("学⽣-" + (i + 1), semaphore).start();}}
}

上面主方法中,调用new Semaphore(2)定义了2份共享资源,或者两份许可证,也就是同一时刻最多只允许2个线程执行。输出结果:

学⽣-1 - 抢座成功,开始写作业
学⽣-2 - 抢座成功,开始写作业
学⽣-2 - 作业完成,腾出座位
学⽣-3 - 抢座成功,开始写作业
学⽣-1 - 作业完成,腾出座位
学⽣-4 - 抢座成功,开始写作业
学⽣-3 - 作业完成,腾出座位
学⽣-5 - 抢座成功,开始写作业
学⽣-5 - 作业完成,腾出座位
学⽣-4 - 作业完成,腾出座位

二、CountDownLatch

原理剖析

CountDownLoatch类常被用于计数递减的多线程同步中。在构造CountDownLatch的时候需要传入一个整数n,在这个整数“倒数”到0之前,主线程需要等待,而这个“倒数”过程则是由各个执行线程驱动的,每个线程执行完一个任务“倒数”一次。

下图为CountDownLatch相关类的继承层次, CountDownLatch原理和Semaphore原理类似,同样是基于AQS,不过没有公平和非公平之分。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ThWA3Ee0-1670164010650)(E:\笔记\截图\CountDownLatch1.png)]

await()实现分析

public void await() throws InterruptedException {// AQS的模板⽅法sync.acquireSharedInterruptibly(1);
}public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 被CountDownLatch.Sync实现if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}

从tryAcquireShared(…)⽅法的实现来看,只要state != 0,调⽤await()方法的线程便会被放⼊AQS的阻塞队列,进⼊阻塞状态。

countDown()实现分析

public void countDown() {sync.releaseShared(1);
}// AQS的模板⽅法
public final boolean releaseShared(int arg) {// 由CountDownLatch.Sync实现if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c - 1;if (compareAndSetState(c, nextc))return nextc == 0;}
}

countDown()调用的AQS的模板方法releaseShared(),里面的tryReleaseShared()由CountDownLatch.Sync实现。从上面的代码可以看出,只有state=0, tryReleaseShared()才会返回true,然后执行doReleaseShared(…),⼀次性唤醒队列中所有阻塞的线程。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5dDPy7p4-1670164010650)(E:\笔记\截图\CountDownLatch2.png)]

总结

由于是基于AQS阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过countDown()⼀直减state,减到0后⼀次性唤醒所有线程。如上图所示,假设初始总数为M, N个线程await(), M个线程countDown(),减到0之后, N个线程被唤醒。

实战案例

假设⼀个主线程要等待5个 Worker 线程执行完才能退出,可以使用CountDownLatch来实现:

import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class MyThread extends Thread {private final CountDownLatch latch;private final Random random = new Random();public MyThread(String name, CountDownLatch latch) {super(name);this.latch = latch;}@Overridepublic void run() {try {Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "运⾏结束");latch.countDown();}
}

主方法:

import java.util.concurrent.CountDownLatch;
public class Main {public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(5);new MyThread("线程1", latch).start();new MyThread("线程2", latch).start();new MyThread("线程3", latch).start();new MyThread("线程4", latch).start();// new MyThread("线程5", latch).start();// 主线程等待,直到计数为0latch.await();System.out.println("程序运⾏结束");}
}

三、CyclicBarrier

原理剖析

CyclicBarrier使用方式比较简单:

CyclicBarrier barrier = new CyclicBarrier(5);
barrier.await();

该类用于协调多个线程同步执行操作的场合。

CyclicBarrier基于ReentrantLock+Condition实现 。

public class CyclicBarrier {private final ReentrantLock lock = new ReentrantLock();// ⽤于线程之间相互唤醒private final Condition trip = lock.newCondition();// 线程总数private final int parties;private int count;private Generation generation = new Generation();// ...
}

下面详细介绍 CyclicBarrier 的实现原理。先看构造方法:

public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();// 参与⽅数量this.parties = parties;this.count = parties;// 当所有线程被唤醒时,执⾏barrierCommand表示的Runnable。this.barrierCommand = barrierAction;
}

接下来看⼀下await()方法的实现过程。

    public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}/*** Main barrier code, covering the various policies.*/private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken)throw new BrokenBarrierException();// 响应中断if (Thread.interrupted()) {// 唤醒所有阻塞的线程breakBarrier();throw new InterruptedException();}// 每个线程调用一次await()int index = --count;// 当count减成0的时候,此线程唤醒其他所有线程if (index == 0) {  // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {try {if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}private void nextGeneration() {// signal completion of last generationtrip.signalAll();// set up next generationcount = parties;generation = new Generation();}

关于上面的方法,有几点说明:

  1. **CyclicBarrier是可以被重用的。**比如对于10个线程,这10个线程互相等待,到齐后⼀起被唤醒,各自执行接下来的逻辑。然后,这10个线程继续互相等待,到齐后再⼀起被唤醒。每⼀轮被称为⼀个Generation,就是⼀次同步点。
  2. CyclicBarrier 会响应中断。 10 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上⾯的breakBarrier()方法。然后count被重置为初始值(parties),重新开始。
  3. 上面的回调方法, barrierAction只会被第10个线程执行1次(在唤醒其他9个线程之前),而不是10个线程每个都执行1次。

实战案例

使用场景: 10个工程师⼀起来公司应聘,招聘方式分为笔试和面试。首先,要等⼈到齐后,开始笔试。笔试结束之后,再⼀起参加面试。把10个⼈看作10个线程, 10个线程之间的同步过程如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vsxIf3Nv-1670164010650)(E:\笔记\截图\CyclicBarrier.png)]

Main类:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class Main {public static void main(String[] args) throws BrokenBarrierException,InterruptedException {CyclicBarrier barrier = new CyclicBarrier(5);for (int i = 0; i < 5; i++) {new MyThread("线程-" + (i + 1), barrier).start();}}
}

MyThread类:

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class MyThread extends Thread {private final CyclicBarrier barrier;private final Random random = new Random();public MyThread(String name, CyclicBarrier barrier) {super(name);this.barrier = barrier;}@Overridepublic void run() {try {Thread.sleep(random.nextInt(2000));System.out.println(Thread.currentThread().getName() + " - 已经到达公司");barrier.await();Thread.sleep(random.nextInt(2000));System.out.println(Thread.currentThread().getName() + " - 已经笔试结束");barrier.await();Thread.sleep(random.nextInt(2000));System.out.println(Thread.currentThread().getName() + " - 已经⾯试结束");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}super.run();}
}

在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再⼀起开始笔试;第2个同步点,要等所有应聘者都结束笔试,之后⼀起进⼊面试环节。

输出结果:

线程-2 - 已经到达公司
线程-1 - 已经到达公司
线程-5 - 已经到达公司
线程-3 - 已经到达公司
线程-4 - 已经到达公司
线程-3 - 已经笔试结束
线程-2 - 已经笔试结束
线程-1 - 已经笔试结束
线程-4 - 已经笔试结束
线程-5 - 已经笔试结束
线程-1 - 已经面试结束
线程-4 - 已经面试结束
线程-2 - 已经面试结束
线程-5 - 已经面试结束
线程-3 - 已经面试结束

可以看到,每次5个线程结束后,才会执行barrier.await();下面的代码。

四、Exchanger

原理剖析

Exchanger用于于线程之间交换数据,其核心机制和Lock⼀样,也是CAS+park/unpark。

首先,在Exchanger内部,有两个内部类: Participant和Node,代码如下:

public class Exchanger<V> {// ...// 添加了Contended注解,表示伪共享与缓存⾏填充@jdk.internal.vm.annotation.Contended static final class Node {int index; // Arena indexint bound; // Last recorded value of Exchanger.boundint collides; // 本次绑定中, CAS操作失败次数int hash; // ⾃旋伪随机Object item; // 本线程要交换的数据volatile Object match; // 对⽅线程交换来的数据// 当前线程volatile Thread parked; // 当前线程阻塞的时候设置该属性,不阻塞为null。}static final class Participant extends ThreadLocal<Node> {public Node initialValue() { return new Node(); }}// ...
}

每个线程在调用exchange(...)方法交换数据的时候,会先创建⼀个Node对象。
这个Node对象就是对该线程的包装,里面包含了3个重要字段:第⼀个是该线程要交互的数据,第⼆个是对方线程交换来的数据,最后⼀个是该线程自身。
⼀个Node只能⽀持2个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Node,因此在Exchanger里面定义了Node数组:

	/*** Elimination array; null until enabled (within slotExchange).* Element accesses use emulation of volatile gets and CAS.*/private volatile Node[] arena;

exchange(V x)实现分析

明白了⼤致思路,下面来看exchange(V x)⽅法的详细实现:

	@SuppressWarnings("unchecked")public V exchange(V x) throws InterruptedException {Object v;Object item = (x == null) ? NULL_ITEM : x; // translate null argsif ((arena != null ||(v = slotExchange(item, false, 0L)) == null) &&((Thread.interrupted() || // disambiguates null return(v = arenaExchange(item, false, 0L)) == null)))throw new InterruptedException();return (v == NULL_ITEM) ? null : (V)v;}

上面方法中

  • 如果arena不是null,表示启⽤了arena方式交换数据。如果arena不是null,并且线程被中断,则抛异常
  • 如果arena不是null,并且arenaExchange的返回值为null,则抛异常。对⽅线程交换来的null值是封装为NULL_ITEM对象的,而不是null。
  • 如果slotExchange的返回值是null,并且线程被中断,则抛异常。
  • 如果slotExchange的返回值是null,并且areaExchange的返回值是null,则抛异常。

slotExchange的实现

public class Exchanger<V> {// .../*** 如果不启⽤arenas,则使⽤该⽅法进⾏线程间数据交换。** @param item 需要交换的数据* @param timed 是否是计时等待, true表示是计时等待* @param ns 如果是计时等待,该值表示最⼤等待的时⻓。* @return 对⽅线程交换来的数据;如果等待超时或线程中断,或者启⽤了arena,则返回null。*/private final Object slotExchange(Object item, boolean timed, long ns) {// participant在初始化的时候设置初始值为new Node()// 获取本线程要交换的数据节点Node p = participant.get();// 获取当前线程Thread t = Thread.currentThread();// 如果线程被中断,则返回null。if (t.isInterrupted())return null;for (Node q;;) {// 如果slot⾮空,表明有其他线程在等待该线程交换数据if ((q = slot) != null) {// CAS操作,将当前线程的slot由slot设置为null// 如果操作成功,则执⾏if中的语句if (SLOTpareAndSet(this, q, null)) {// 获取对⽅线程交换来的数据Object v = q.item;// 设置要交换的数据q.match = item;// 获取q中阻塞的线程对象Thread w = q.parked;if (w != null)// 如果对⽅阻塞的线程⾮空,则唤醒阻塞的线程LockSupport.unpark(w);return v;}// create arena on contention, but continue until slot null// 创建arena⽤于处理多个线程需要交换数据的场合,防⽌slot冲突if (NCPU > 1 && bound == 0 &&BOUNDpareAndSet(this, 0, SEQ)) {arena = new Node[(FULL + 2) << ASHIFT];}}// 如果arena不是null,需要调⽤者调⽤arenaExchange⽅法接着获取对⽅线程交换来的数据else if (arena != null)return null;else {// 如果slot为null,表示对⽅没有线程等待该线程交换数据// 设置要交换的本⽅数据p.item = item;// 设置当前线程要交换的数据到slot// CAS操作,如果设置失败,则进⼊下⼀轮for循环if (SLOTpareAndSet(this, null, p))break;p.item = null;}}// 没有对⽅线程等待交换数据,将当前线程要交换的数据放到slot中,是⼀个Node对象// 然后阻塞,等待唤醒int h = p.hash;// 如果是计时等待交换,则计算超时时间;否则设置为0。long end = timed ? System.nanoTime() + ns : 0L;// 如果CPU核⼼数⼤于1,则使⽤SPINS数,⾃旋;否则为1,没必要⾃旋。int spins = (NCPU > 1) ? SPINS : 1;// 记录对⽅线程交换来的数据Object v;// 如果p.match==null,表示还没有线程交换来数据while ((v = p.match) == null) {// 如果⾃旋次数⼤于0,计算hash随机if (spins > 0) {// 生成随机数,⽤于⾃旋次数控制h ^= h << 1; h ^= h >>> 3; h ^= h << 10;if (h == 0)h = SPINS | (int)t.getId();else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)Thread.yield();// p是ThreadLocal记录的当前线程的Node。// 如果slot不是p表示slot是别的线程放进去的} else if (slot != p) {spins = SPINS;} else if (!t.isInterrupted() && arena == null &&(!timed || (ns = end - System.nanoTime()) > 0L)) {p.parked = t;if (slot == p) {if (ns == 0L)// 阻塞当前线程LockSupport.park(this);else// 如果是计时等待,则阻塞当前线程指定时间LockSupport.parkNanos(this, ns);}p.parked = null;} else if (SLOTpareAndSet(this, p, null)) {// 没有被中断但是超时了,返回TIMED_OUT,否则返回nullv = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;break;}}// match设置为null值 CASMATCH.setRelease(p, null);p.item = null;p.hash = h;// 返回获取的对⽅线程交换来的数据return v;}// ...
}

arenaExchange的实现

public class Exchanger<V> {// .../*** 当启⽤arenas的时候,使⽤该⽅法进⾏线程间的数据交换。** @param item 本线程要交换的⾮null数据。* @param timed 如果需要计时等待,则设置为true。* @param ns 表示计时等待的最⼤时⻓。* @return 对⽅线程交换来的数据。如果线程被中断,或者等待超时,则返回null。*/private final Object arenaExchange(Object item, boolean timed, long ns) {Node[] a = arena;int alen = a.length;Node p = participant.get();// 访问下标为i处的slot数据for (int i = p.index;;) { // access slot at iint b, m, c;int j = (i << ASHIFT) + ((1 << ASHIFT) - 1);if (j < 0 || j >= alen)j = alen - 1;// 取出arena数组的第j个Node元素Node q = (Node)AA.getAcquire(a, j);// 如果q不是null,则将数组的第j个元素由q设置为nullif (q != null && AApareAndSet(a, j, q, null)) {// 获取对⽅线程交换来的数据Object v = q.item; // release// 设置本⽅线程交换的数据q.match = item;// 获取对⽅线程对象Thread w = q.parked;if (w != null)// 如果对⽅线程⾮空,则唤醒对⽅线程LockSupport.unpark(w);return v;}// 如果⾃旋次数没达到边界,且q为nullelse if (i <= (m = (b = bound) & MMASK) && q == null) {// 提供本⽅数据p.item = item; // offer// 将arena的第j个元素由null设置为pif (AApareAndSet(a, j, null, p)) {long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;Thread t = Thread.currentThread(); // wait// ⾃旋等待for (int h = p.hash, spins = SPINS;;) {// 获取对⽅交换来的数据Object v = p.match;// 如果对⽅交换来的数据⾮空if (v != null) {// 将p设置为null, CAS操作MATCH.setRelease(p, null);// 清空p.item = null; // clear for next usep.hash = h;// 返回交换来的数据return v;}// 产⽣随机数,⽤于限制⾃旋次数else if (spins > 0) {h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshiftif (h == 0) // initialize hashh = SPINS | (int)t.getId();else if (h < 0 && // approx 50% true(--spins & ((SPINS >>> 1) - 1)) == 0)Thread.yield(); // two yields per wait}// 如果arena的第j个元素不是pelse if (AA.getAcquire(a, j) != p)spins = SPINS; // releaser hasn't set match yetelse if (!t.isInterrupted() && m == 0 &&(!timed ||(ns = end - System.nanoTime()) > 0L)) {p.parked = t; // minimize windowif (AA.getAcquire(a, j) == p) {if (ns == 0L)// 当前线程阻塞,等待交换数据LockSupport.park(this);elseLockSupport.parkNanos(this, ns);}p.parked = null;}// arena的第j个元素是p并且CAS设置arena的第j个元素由p设置为null成功else if (AA.getAcquire(a, j) == p &&AApareAndSet(a, j, p, null)) {if (m != 0) // try to shrinkBOUNDpareAndSet(this, b, b + SEQ - 1);p.item = null;p.hash = h;i = p.index >>>= 1; // descend// 如果线程被中断,则返回null值if (Thread.interrupted())return null;if (timed && m == 0 && ns <= 0L)// 如果超时,返回TIMED_OUT。return TIMED_OUT;break; // expired; restart}}}elsep.item = null; // clear offer}//else {if (p.bound != b) { // stale; resetp.bound = b;p.collides = 0;i = (i != m || m == 0) ? m : m - 1;}else if ((c = p.collides) < m || m == FULL ||!BOUNDpareAndSet(this, b, b + SEQ + 1)) {p.collides = c + 1;i = (i == 0) ? m : i - 1; // cyclically traverse}elsei = m + 1; // growp.index = i;}}}// ...
}

实战案例

Exchanger用于于线程之间交换数据,其使用代码很简单,是⼀个exchange(…)方法,使用示例如下:

import java.util.Random;
import java.util.concurrent.Exchanger;public class ExchangerTest {private static final Random random = new Random();public static void main(String[] args) {// 建⼀个多线程共⽤的exchange对象// 把exchange对象传给3个线程对象。每个线程在自己的run⽅法中调⽤exchange,把自己的数据作为参数// 传递进去,返回值是另外⼀个线程调⽤exchange传进去的参数Exchanger<String> exchanger = new Exchanger<>();new Thread("线程1") {@Overridepublic void run() {while (true) {try {// 如果没有其他线程调⽤exchange,线程阻塞,直到有其他线程调⽤exchange为⽌。String otherData = exchanger.exchange("交换数据1");System.out.println(Thread.currentThread().getName() + "得到 <==" + otherData);Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}}}}.start();new Thread("线程2") {@Overridepublic void run() {while (true) {try {String otherData = exchanger.exchange("交换数据2");System.out.println(Thread.currentThread().getName() + "得到 <==" + otherData);Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}}}}.start();new Thread("线程3") {@Overridepublic void run() {while (true) {try {String otherData = exchanger.exchange("交换数据3");System.out.println(Thread.currentThread().getName() + "得到 <==" + otherData);Thread.sleep(random.nextInt(2000));} catch (InterruptedException e) {e.printStackTrace();}}}}.start();}
}

输出结果:

线程2得到 <==交换数据1
线程1得到 <==交换数据2
线程2得到 <==交换数据3
线程3得到 <==交换数据2
线程1得到 <==交换数据2
线程2得到 <==交换数据1
线程3得到 <==交换数据1
线程1得到 <==交换数据3
线程2得到 <==交换数据1
线程1得到 <==交换数据2
线程1得到 <==交换数据2
线程2得到 <==交换数据1
...

可以看到,三个线程间彼此交换自己的数据。

更多推荐

Java多线程同步工具类:Semaphore、CountDownLatch 、CyclicBarrier、Exchanger原理剖析

本文发布于:2024-02-06 20:34:41,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1751332.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:多线程   原理   工具   Java   Semaphore

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!