SemaphorerCountDownLatchCyclicBarrie详解

编程入门 行业动态 更新时间:2024-10-09 23:13:48

SemaphorerCountDownLatchCyclicBarrie<a href=https://www.elefans.com/category/jswz/34/1770044.html style=详解"/>

SemaphorerCountDownLatchCyclicBarrie详解

Semaphorer&CountDownLatch&CyclicBarrie详解

Semaphore

Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于 AbstractQueuedSynchronizer实现的。 Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获 取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同 时获取信号量。

// 构造器
public Semaphore(int permits) {sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}// 常用方法
public void acquire() throws InterruptedException  //表示阻塞并获取许可
public boolean tryAcquire() //在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
public void release() //表示释放许可
public int availablePermits() //返回此信号量中当前可用的许可证数
public final int getQueueLength() //返回正在等待获取许可证的线程数
public final boolean hasQueuedThreads() //是否有线程正在等待获取许可证
protected void reducePermits(int reduction) //是否有线程正在等待获取许可证
protected Collection<Thread> getQueuedThreads() //返回所有等待获取许可证的线程集合
  • permits 表示许可证的数量(资源数)

  • fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最 久的线程

代码示例

/*** Semaphore是一个计数信号量,Semaphore经常用于限制获取资源的线程数量*/
public class SemaphoreTest {public static void main(String[] args) {// 声明3个窗口  state:  资源数Semaphore windows = new Semaphore(3);for (int i = 0; i < 5; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {// 占用窗口    加锁windows.acquire();System.out.println(Thread.currentThread().getName() + ": 开始买票");//模拟买票流程Thread.sleep(5000);System.out.println(Thread.currentThread().getName() + ": 购票成功");} catch (InterruptedException e) {e.printStackTrace();} finally {// 释放窗口windows.release();}}}).start();}}
}结果
Thread-0: 开始买票
Thread-2: 开始买票
Thread-1: 开始买票
Thread-2: 购票成功
Thread-0: 购票成功
Thread-1: 购票成功
Thread-4: 开始买票
Thread-3: 开始买票
Thread-4: 购票成功
Thread-3: 购票成功

可以用于做流量控制,特别是公用资源有限的应用场景 限流

// 每次只能通过5个
public class SemaphoneTest2 {/*** 实现一个同时只能处理5个请求的限流器*/private static Semaphore semaphore = new Semaphore(5);/*** 定义一个线程池*/private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60,TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));/*** 模拟执行方法*/public static void exec() {try {//占用1个资源semaphore.acquire(1);//TODO  模拟业务执行System.out.println("执行exec方法");Thread.sleep(2000);} catch (Exception e) {e.printStackTrace();} finally {//释放一个资源semaphore.release(1);}}public static void main(String[] args) throws InterruptedException {{for (; ; ) {Thread.sleep(100);// 模拟请求以10个/s的速度executor.execute(() -> exec());}}}
}

CountDownLatch

CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。

CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值 (count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并 且随后对await方法的调用都会立即返回。这是一个一次性现象 —— count不会被重置。如果 你需要一个重置count的版本,那么请考虑使用CyclicBarrier。

常用方法 
// 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行public void await() throws InterruptedException { };
// 和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
// 会将 count 减 1,直至为 0public void countDown() { };

CountDownLatch一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch 的初始化决定)任务执行完成。

CountDownLatch应用场景:

  • 场景1:让多个线程等待
  • 场景1:让多个线程等待

场景1 让多个线程等待:模拟并发,让并发线程一起执行

/*** 让多个线程等待:模拟并发,让并发线程一起执行*/
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 5; i++) {new Thread(() -> {try {//准备完毕……运动员都阻塞在这,等待号令countDownLatch.await();String parter = "【" + Thread.currentThread().getName() + "】";System.out.println(parter + "开始执行……");} catch (InterruptedException e) {e.printStackTrace();}}).start();}Thread.sleep(2000);// 裁判准备发令countDownLatch.countDown();// 发令枪:执行发令}
}

场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并

很多时候,我们的并发任务,存在前后依赖关系;比如数据详情页需要同时调用多个接口 获取数据,并发请求获取到数据后、需要进行结果合并;或者多个数据操作完成后,需要数据 check;这其实都是:在多个线程(任务)完成后,进行汇总合并的场景。

/*** 让单个线程等待:多个线程(任务)完成后,进行汇总合并*/
public class CountDownLatchTest2 {public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {final int index = i;new Thread(() -> {try {Thread.sleep(1000 +ThreadLocalRandom.current().nextInt(1000));System.out.println(Thread.currentThread().getName()+ " finish task" + index);countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}).start();}// 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。countDownLatch.await();System.out.println("主线程:在所有任务运行完成后,进行结果汇总");}
}

CountDownLatch实现原理

底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的 count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻 塞线程;这一步是由最后一个执行countdown方法的线程执行的。

而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在 await()方法中等待的线程

CountDownLatch与Thread.join的区别

  • CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来 有点类似join() 方法,但其提供了比 join() 更加灵活的API。
  • CountDownLatch可以手动控制在n个线程里调用n次countDown()方法使计数器 进行减一操作,也可以在一个线程里调用n次执行减一操作。
  • 而 join() 的实现原理是不停检查join线程是否存活,如果 join 线程存活则让当前线 程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。

CountDownLatch与CyclicBarrier的区别

CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

  • .CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可 以重置计数器,并让线程们重新执行一次
  • CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、 isBroken(用来知道阻塞的线程是否被中断)等方法
  • CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程
  • CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不 同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执 行。CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
  • CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  • CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现

CyclicBarrier

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同 时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。

// 构造方法
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {this(parties, null);
}//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循环 通过reset()方法可以进行重置
public void reset()

CyclicBarrier应用场景

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。

/*** 栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。*/
public class CyclicBarrierTest2 {//保存每个学生的平均成绩private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();private ExecutorService threadPool= Executors.newFixedThreadPool(3);private CyclicBarrier cb=new CyclicBarrier(3,()->{int result=0;Set<String> set = map.keySet();for(String s:set){result+=map.get(s);}System.out.println("三人平均成绩为:"+(result/3)+"分");});public void count(){for(int i=0;i<3;i++){threadPool.execute(new Runnable(){@Overridepublic void run() {//获取学生平均成绩int score=(int)(Math.random()*40+60);map.put(Thread.currentThread().getName(), score);System.out.println(Thread.currentThread().getName()+"同学的平均成绩为:"+score);try {//执行完运行await(),等待所有学生平均成绩都计算完毕cb.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}});}}public static void main(String[] args) {CyclicBarrierTest2 cb=new CyclicBarrierTest2();cb.count();}
}

利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发 车”的场景

@Slf4j
public class CyclicBarrierTest3 {public static void main(String[] args) {AtomicInteger counter = new AtomicInteger();ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 1000, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),(r) -> new Thread(r, counter.addAndGet(1) + " 号 "),new ThreadPoolExecutor.AbortPolicy());CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() -> System.out.println("裁判:比赛开始~~"));for (int i = 0; i < 10; i++) {threadPoolExecutor.submit(new Runner(cyclicBarrier));}}static class Runner extends Thread{private CyclicBarrier cyclicBarrier;public Runner (CyclicBarrier cyclicBarrier) {this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {try {int sleepMills = ThreadLocalRandom.current().nextInt(1000);Thread.sleep(sleepMills);System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();}catch(BrokenBarrierException e){e.printStackTrace();}}}
}

更多推荐

SemaphorerCountDownLatchCyclicBarrie详解

本文发布于:2024-02-06 11:21:08,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1748697.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:详解   SemaphorerCountDownLatchCyclicBarrie

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!