工具类"/>
java中的并发工具类
目录
java中的并发工具类
1、CountDownLatch的使用
2、CyclicBarrier
3、Semaphore
4、Exchanger
java中的并发工具类
序号 | 工具类 | 匹配的业务场景 |
1 | CountDownLatch | 统一服务 |
2 | CyclicBarrier | 统一命令 |
3 | Semaphore | 流量控制 |
4 | Exchanger | 线程之间交换数据 |
如果我们把线程比作人,那么线程之间的协作其实就是人类之间的协作。 而人类之间的协作会有以下几种情况:
1、统一服务:你们都完成一件事情后我再继续完成另一件事情; 典型的例子:几个人一起去吃饭,人齐了哈,服务员上菜!(也有不仗义的,自己去了就先吃,这种不在协作之列)
该场景对应CountDownLatch的使用场景!
2、统一命令:大家等待我的命令后统一行动; 典型的例子:打仗镜头里边,长官一声令下,打!
该场景对应CyclicBarrier的使用场景!
3、商量好:采取行动之前需要商量好,互相交换信息; 典型的例子约会(下班后一起去吃刷羊肉):A: 亲爱的我们在晓月楼门口见,我刚下班。 B:好的,稍等,马上到。
该场景对应Exchanger的使用场景!
4、狼多肉少吃不饱:需要限制获取资源的人数; 典型的例子就是北京摇车牌,数据库连接池中限制数据库的连接的个数等。
该场景对应Semaphore的使用场景!
1、CountDownLatch的使用
测试代码
package com.miller.fanxing;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;/** author :振彪(miller)* data :2021-01-10* des: test how to use CountDownLatch* */
public class CountDownLatchTest {//CountDownLatch构造器接收一个int类型的参数作为计数器,如果你想等待n个点完成,这里就传入nprivate static CountDownLatch countDownLatch = new CountDownLatch(6);private static ConcurrentHashMap map = new ConcurrentHashMap<String,Integer>();public static void main(String[] args) throws InterruptedException {new Thread(new Runnable() {@Overridepublic void run() {System.out.println("韩国被灭");countDownLatch.countDown();}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("魏国被灭");countDownLatch.countDown();}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("赵国被灭");countDownLatch.countDown();}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("楚国被灭");countDownLatch.countDown();}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("燕国被灭");countDownLatch.countDown();}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("齐国被灭");countDownLatch.countDown();}}).start();countDownLatch.await();System.out.println("六国被灭,天下一统!");}
}
测试结果:
韩国被灭
魏国被灭
赵国被灭
楚国被灭
燕国被灭
齐国被灭
六国被灭,天下一统!
结论:在其它线程执行之前主线程一直等待。六国都灭亡,即其它线程都执行完毕,秦国一统天下,就是主线程开始执行。
2、CyclicBarrier
测试代码
package com.miller.fanxing;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;/** author :冯振彪(miller)* data :2021-01-10* des: test how to use CyclicBarrier* */
public class CyclicBarrierTest {//合众讨伐秦国,6过军队集合于函谷关前,只有6国军队都到齐才能开战private static CyclicBarrier cyclicBarrier = new CyclicBarrier(6);private static ConcurrentHashMap map = new ConcurrentHashMap<String,Integer>();private static StringBuffer taoFanBaoQin = new StringBuffer("");public static void main(String[] args) {new Thread(new Runnable() {@Overridepublic void run() {try {System.out.println("齐国军队到达函谷关");cyclicBarrier.await();taoFanBaoQin.append("齐国会盟,");System.out.println("六国齐聚,对秦开战!" + taoFanBaoQin.toString());} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {map.put("魏国",1);try {System.out.println("魏国军队到达函谷关");cyclicBarrier.await();taoFanBaoQin.append("魏国会盟,");System.out.println("六国齐聚,对秦开战!" + taoFanBaoQin.toString());} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {map.put("韩国",1);try {System.out.println("韩国军队到达函谷关");cyclicBarrier.await();taoFanBaoQin.append("韩国会盟,");System.out.println("六国齐聚,对秦开战!" + taoFanBaoQin.toString());} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {map.put("燕国",1);try {System.out.println("燕国军队到达函谷关");cyclicBarrier.await();taoFanBaoQin.append("燕国会盟,");System.out.println("六国齐聚,对秦开战!" + taoFanBaoQin.toString());} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {map.put("楚国",1);try {System.out.println("楚国军队到达函谷关");cyclicBarrier.await();taoFanBaoQin.append("楚国会盟,");System.out.println("六国齐聚,对秦开战!" + taoFanBaoQin.toString());} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {map.put("赵国",1);try {System.out.println("赵国军队到达函谷关");cyclicBarrier.await();taoFanBaoQin.append("赵国会盟,");System.out.println("六国齐聚,对秦开战!" + taoFanBaoQin.toString());} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}).start();}}
测试结果:
齐国军队到达函谷关
魏国军队到达函谷关
韩国军队到达函谷关
燕国军队到达函谷关
楚国军队到达函谷关
赵国军队到达函谷关
六国齐聚,对秦开战!齐国会盟,魏国会盟,
六国齐聚,对秦开战!齐国会盟,魏国会盟,韩国会盟,
六国齐聚,对秦开战!齐国会盟,魏国会盟,韩国会盟,燕国会盟,
六国齐聚,对秦开战!齐国会盟,魏国会盟,
六国齐聚,对秦开战!齐国会盟,魏国会盟,韩国会盟,燕国会盟,楚国会盟,
六国齐聚,对秦开战!齐国会盟,魏国会盟,韩国会盟,燕国会盟,楚国会盟,赵国会盟,
结论:六国军队都到达函谷关之前没有举行会盟。 线程运行至
cyclicBarrier.await()方法时会等待其它线程执行。
3、Semaphore
测试代码:
package com.miller.fanxing;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;/** author :冯振彪(miller)* data :2021-01-10* des: test how to use Semaphore* */
public class SemaphoreTest {private static final int THREAD_COUNT = 30;//30个线程参与竞争:30个诸侯觐见周天子private static ExecutorService threadPool = Executors.newFixedThreadPool(30);//周天子每天只能见10个诸侯private static Semaphore s = new Semaphore(10); //流量控制private static AtomicInteger j = new AtomicInteger();public static void main(String[] args) {for (int i = 0; i < THREAD_COUNT ; i ++) {threadPool.execute(new Runnable() {@Overridepublic void run() {try {s.acquire();System.out.println("第" + j +"个诸侯觐见周天子!");j.addAndGet(1);//s.release(); //故意不释放,第二天才能见其它10诸侯,第三天才能见最后10个诸侯} catch (InterruptedException e) {e.printStackTrace();}}});}threadPool.shutdown();}
}
测试结果:
第0个诸侯觐见周天子!
第1个诸侯觐见周天子!
第2个诸侯觐见周天子!
第3个诸侯觐见周天子!
第4个诸侯觐见周天子!
第5个诸侯觐见周天子!
第6个诸侯觐见周天子!
第7个诸侯觐见周天子!
第8个诸侯觐见周天子!
第9个诸侯觐见周天子!
结论: 多个使用者竞争有限个资源时做流量控制非常好!
4、Exchanger
测试代码
package com.miller.fanxing;import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*
* author :振彪(miller)
* data :2021-01-10
* despetion: test how to use exchanger
* */
public class ExchangerTest {private static final Exchanger<String> exchanger = new Exchanger<String>();private static final ExecutorService threadPool = Executors.newFixedThreadPool(2);public static void main(String[] args) {threadPool.execute(new Runnable() {@Overridepublic void run() {try {String QuLing = "求贤令";String B = exchanger.exchange(QuLing);System.out.println("QuLing:" + QuLing);System.out.println("B:" + B);} catch (InterruptedException e) {e.printStackTrace();}}});threadPool.execute(new Runnable() {@Overridepublic void run() {try {String weiYang = "商鞅变法";String A = exchanger.exchange(weiYang);System.out.println("weiYang:" + weiYang);System.out.println("A:" + A);} catch (InterruptedException e) {e.printStackTrace();}}});threadPool.shutdown();}
}测试结果:
QuLing:求贤令
B:商鞅变法
weiYang:商鞅变法
A:求贤令
结论:Exchanger可以在线程之间传递消息!
更多推荐
java中的并发工具类
发布评论