艺术》"/>
记《Java并发编程的艺术》
本文内容基于《Java并发编程的艺术》,方腾飞、魏鹏、程晓明著。
一、并发编程的挑战
二、Java并发机制的底层实现原理
三、Java内存模型
四、线程应用实例
五、Java中的并发容器和框架
六、Java中的13个原子操作类
七、Java中的并发工具类
八、Java中的线程池
九、Executor框架
一、并发编程的挑战
1. 上下文切换
- 即使单核处理器也支持多线程执行代码,CPU通过给每个线程分配CPU时间片来实现这个机制。时间片是CPU分配给各个线程的时间,因为时间片非常短,所以CPU通过不停地切换线程执行,让我们感觉多个线程是同时执行的,时间片一般是几十毫秒。
- 在切换前会保存上一个任务的状态,以便下次切换回到这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
- 如何减少上下文切换
- 无锁并发编程:多线程竞争锁时,会引起上下文切换,所以多线程处理数据时,可以用一些办法来避免使用锁,如将数据的ID按照Hash算法取模分段,不同的线程处理不同段的数据。
- CAS算法:如Java的Atomic包使用CAS算法来更新数据,而不需要加锁。
- 使用最少线程:避免创建不需要的线程,比如任务很少,但是创建了很多线程来处理,这样会造成大量线程都处于等待状态。
- 协程:在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换。
2. 死锁
- 示例代码
public class DeadLockTest {public static void main(String[] args) {//初始化两个锁Object lock1 = new Object();Object lock2 = new Object();new Thread(new Runnable() {@Overridepublic void run() {synchronized (lock1) {try {System.out.println(Thread.currentThread().getName() + " 获得锁lock1 " + System.currentTimeMillis());Thread.sleep(1000); //do somethingsynchronized (lock2) {System.out.println(Thread.currentThread().getName() + " 获得锁lock2 " + System.currentTimeMillis());Thread.sleep(1000); //do somethingSystem.out.println(Thread.currentThread().getName() + " 释放锁lock2 " + System.currentTimeMillis());}System.out.println(Thread.currentThread().getName() + " 释放锁lock1 " + System.currentTimeMillis());} catch (InterruptedException e) {e.printStackTrace();}}}}, "A").start();new Thread(new Runnable() {@Overridepublic void run() {synchronized (lock2) {try {System.out.println(Thread.currentThread().getName() + " 获得锁lock2 " + System.currentTimeMillis());Thread.sleep(1000); //do somethingsynchronized (lock1) {System.out.println(Thread.currentThread().getName() + " 获得锁lock1 " + System.currentTimeMillis());Thread.sleep(1000); //do somethingSystem.out.println(Thread.currentThread().getName() + " 释放锁lock1 " + System.currentTimeMillis());}System.out.println(Thread.currentThread().getName() + " 释放锁lock2 " + System.currentTimeMillis());} catch (InterruptedException e) {e.printStackTrace();}}}}, "B").start();}
}
- 运行结果
- 一旦出现死锁,只能通过dump线程查看到底是哪个线程出现了问题
- 避免死锁的几个常见方法
- 避免一个线程同时获取多个锁。
- 避免一个线程在锁内同时占用多个资源,尽量保证每个锁都只占用一个资源。
- 尝试使用定时锁,使用lock.tryLock(timeout)来替代使用内部锁机制。
- 对于数据库锁,加锁和解锁必须在一个数据库连接里,否则会出现解锁失败的情况。
3. 资源限制的挑战
- 资源限制是指在进行并发编程时,程序的执行速度受限于计算机硬件资源或软件资源。硬件资源限制有带宽的上传/下载速度、硬盘读写速度和CPU的处理速度。软件资源限制有数据库的连接数和socket连接数等。
- 对于硬件资源限制,可以考虑使用集群并行执行程序。对于软件资源限制,可以考虑使用资源池将资源复用。
二、Java并发机制的底层实现原理
1. volatile的实现原理与应用
- volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的“可见性”。可见性是指当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。volatile的使用和执行成本更低,因为它不会引起线程上下文的切换和调度。
- 有volatile关键词修饰的共享变量进行写操作的时候会多出第二行汇编代码,即Lock前缀的指令,在多核处理器下会引发两件事情:
- 将当前处理器缓存行的数据写回到系统内存。
- 这个写回内存的操作会使在其他CPU里缓存了该内存地址的数据无效。
2. synchronized的实现原理与应用
- Java中每一个对象都可以作为锁:
- 对于普通同步方法,锁是当前实例对象。
- 对于静态同步方法,锁是当前类的Class对象。
- 对于同步方法块,锁是Synchronized括号配置的对象。
- synchronized在JVM里的实现原理是JVM基于进入和退出Monitor对象来实现方法同步和代码块同步。
- synchronized用的锁是存在Java对象头里的,Java对象头里的Mark Word里默认存储对象的HashCode、分代年龄和锁标记位。
- 锁一共有4种状态,锁可以升级但不能降级,级别从低到高依次是:
锁 | 优点 | 缺点 | 适用场景 |
无锁状态 | - | - | - |
偏向锁状态 | 加锁和解锁不需要额外的消耗,和执行非同步方法相比仅存在纳秒级的差距 | 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 | 适用于只有一个线程访问同步块的场景 |
轻量级锁状态 | 竞争的线程不会阻塞,提高了程序的相应速度 | 如果始终得不到锁,竞争的线程使用自旋会消耗CPU | 追求响应时间,同步块执行速度非常快 |
重量级锁状态 | 线程竞争不使用自旋,不会消耗CPU | 线程阻塞,响应时间缓慢 | 追求吞吐量,同步块执行速度较慢 |
3. 原子操作的实现原理与应用
- 原子本意是“不能被进一步分割的最小粒子”,而原子操作意为“不可被中断的一个或一系列操作”。
- Java中可以通过锁和循环CAS的方式来实现原子操作。
- 锁机制保证了只有获得锁的线程才能够操作锁定的内存区域。
- 使用循环CAS来实现原子操作,自旋CAS实现的基本思路就是循环进行CAS操作直到成功为止。
public class Counter {private AtomicInteger ai = new AtomicInteger(0);private int i = 0;public static void main(String[] args) {final Counter counter = new Counter();//创建线程List<Thread> threadList = new ArrayList<Thread>();for (int i = 0; i < 100; i++) {threadList.add(new Thread(new Runnable() {@Overridepublic void run() {for (int j = 0; j < 10000; j++) {counter.count();counter.safeCount();}}}));}//启动线程for (Thread thread : threadList) {thread.start();}//等待所有线程执行完成for (Thread thread : threadList) {try {thread.join();} catch (InterruptedException e) {e.printStackTrace();}}//输出结果System.out.println(counter.i); //结果不一定,线程不安全System.out.println(counter.ai.get()); //1000000,线程安全}private void safeCount() {/*for (;;) {int i = ai.get();boolean success = aipareAndSet(i, ++i);if (success) {break;}}*/ai.incrementAndGet(); //与上面注释代码效果一样}private void count() {i++;}
}
- CAS实现原子操作的三大问题
- ABA问题:因为CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号,即1A->2B->3A。Java 1.5开始,JDK的Atomic包里提供了一个类AtomicaStampedReference来解决ABA问题。
- 循环时间长开销大:自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
- 只有保证一个共享变量的原子操作:对多个共享变量操作时,循环CAS无法保证操作的原子性,这个时候就可以用锁。
-
CAS即compareAndSet()方法,如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值,此操作具有volatile读和写的内存语义。
三、Java内存模型
1. 基础
- 在并发编程中,需要处理两个关键问题:线程之间如何通信及线程之间如何同步。
- 通信是指线程之间以何种机制来交换信息,线程之间的通信机制有两种:共享内存和消息传递。
- 在共享内存的并发模型中,线程之间共享程序的公共状态,通过写-读内存中的公共状态进行隐式通信。
- 在消息传递的并发模型中,线程之间没有公共状态,线程之间必须通过发现消息来显式进行通信。
- 同步是指程序中用于控制不同线程间操作发生相对顺序的机制。
- 在共享内存的并发模型中,同步是显式进行的,程序员必须显式地指定某个方法或某段代码需要在线程之间互斥执行。
- 在消息传递的并发模型中,由于消息的发送必须在消息的接受之前,因此同步是隐式进行的。
- Java的并发采用的是共享内存模型
- 通信是指线程之间以何种机制来交换信息,线程之间的通信机制有两种:共享内存和消息传递。
- Java内存模型的抽象结构
- 在Java中,所有实例域、静态域和数组元素都存储在堆内存中,堆内存在线程之间共享。局部变量、方法定义参数和异常处理器参数不会在线程之间共享,它们不会有内存可见性问题,也不会受内存模型的影响。
- Java线程之间的通信由Java内存模型(JMM)控制,JMM决定一个线程对共享变量的写入何时对另一个线程可见。从抽象的角度来看,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存中存储了该线程以读/写共享变量的副本。
- JMM通过控制主内存与每个线程的本地内存之间的交互,来为Java程序提供内存可见性保证。
2. 重排序
- 在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序,可能会导致多线程程序出现内存可见性问题。重排序有3种类型:
- 编译器优化的重排序:编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
- 指令级并行的重排序:如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
- 内存系统的重排序:由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。
- 重排序对多线程的影响
- 代码中,编译器和处理器可以对操作1和操作2重排序,也可以对操作3和操作4重排序。所以可能出现这样的执行顺序:2->3->4->1、4->1->2->3
public class ReorderTest {int a = 0;boolean flag = false;public void writer() {a = 1; //1flag = true; //2}public void reader() {if (flag) { //3int i = a * a; //4}}
}
3. 顺序一致性
- JMM对正确同步的多线程程序的内存一致性做了如下保证:如果程序是正确同步的,程序的执行将具有顺序一致性,即程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同。
- 同步程序的顺序一致性效果
public class SynchronizedReorderTest {int a = 0;boolean flag = false;public synchronized void writer() { //获取锁a = 1; flag = true; } //释放锁public synchronized void reader() { //获取锁if (flag) { int i = a * a; }} //释放锁
}
4. volatile的内存语义
- 理解volatile特性的一个好办法就是把对volatile变量的单个读/写,看成是使用同一个锁对这些单个读/写操作做了同步。
- 可见性:对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。
- 原子性:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性。
public class VolatileTest {//使用volatile声明64位的long型变量volatile long vl = 0L;//单个volatile变量的写public void set(long l) {this.vl = l;}//复合(多个)volatile变量的读/写public void getAndIncrement() {vl++;}//单个volatile变量的读public long get() {return vl;}
}
语义上等价于:
public class VolatileTest {//声明64位的long型变量long vl = 0L;//单个变量的写用锁同步public synchronized void set(long l) {this.vl = l;}//复合(多个)变量的读/写public void getAndIncrement() {//调用同步的读方法long temp = get();temp += 1L;//调用同步的写方法set(temp);}//单个变量的读用锁同步public synchronized long get() {return vl;}
}
- volatile写的内存语义
- 当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存。
- volatile读的内存语义
- 当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效,线程接下来将从主内存中读取共享变量。
- 为了实现volatile的内存语义,JMM会限制重排序。
5. 锁的内存语义
- 当线程释放锁时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存中。
- 当线程获取锁时,JMM会把该线程对应的本地内存置为无效,从而使得被监视器保护的临界区代码必须从主内存中读取共享变量。
6. final域的内存语义
- 对于基本数据类型
- 在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序。(即代码中操作1和操作2不能重排序)
- 初次读一个包含final域的对象的引用,与随后初次读这个final域,这两个操作之间不能重排序。(即代码中操作3和操作4不能重排序)
public class FinalTest {//final变量private final int i;static FinalTest ft;//构造方法public FinalTest() {//写final域this.i = 1; //1}public static void writer() {ft = new FinalTest(); //2}public static void reader() {//读对象引用FinalTest finalTest = ft; //3//读final域int j = finalTest.i; //4}
}
- 对于引用数据类型
- 在构造函数内对一个final引用的对象的成员域的写入,与随后在构造函数外把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序。(即代码中操作1和操作3不能重排序、代码中操作2和操作3也不能重排序)
- 在构造函数返回之前,被构造对象的引用不能被其他线程看到,因为此时的final域可能还没有被初始化,在构造函数返回之后,任意线程都将被保证能看到final域正确初始化之后的值。
public class FinalReferenceTest {//final引用变量private final int[] arr;static FinalReferenceTest frt;//构造方法public FinalReferenceTest() {//写final域arr = new int[1]; //1//写final域引用的对象的成员域arr[0] = 1; //2}public static void writer() {frt = new FinalReferenceTest(); //3}
}
7. JMM的内存可见性保证
- 单线程程序:单线程程序不会出现内存可见性问题,编译器、runtime核处理器会共同确保单线程程序的执行结果与该程序在顺序一致性模型中的执行结果相同。
- 正确同步的多线程程序:正确同步的多线程程序的执行将具有顺序一致性,程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同,JMM通过限制编译器和处理器的重排序来为程序提供内存可见性保证。
- 未同步/未正确同步的多线程程序:JMM为它们提供了最小安全性保障,线程执行时读取到的值,要么是之前某个线程写入的值,要么是默认值。
四、线程应用实例
1. 等待超时模式
- 调用一个方法时等待一段时间,如果该方法能够在给定的时间段之内得到结果,那么将结果立即返回,反之,超时返回默认结果。
public synchronized Object get(long mills) throws InterruptedException {long future = System.currentTimeMillis() + mills;long remaining = mills;//获取resultObject result = null;//当超时大于0并且result返回值不满足要求while ((result == null) && remaining > 0) {wait(remaining);remaining = future - System.currentTimeMillis();}return result;
}
2. 一个简单的数据库连接池示例
public class ConnectionPool {private LinkedList<Connection> pool = new LinkedList<Connection>();/*** 通过构造函数初始化连接的最大上限* @param initialSize*/public ConnectionPool(int initialSize) {if (initialSize > 0) {for (int i = 0; i < initialSize; i++) {pool.add(ConnectionDriver.createConnection());}}}/*** 当连接使用完成后,需要调用该方法将连接放回线程池* @param connection*/public void releaseConnection(Connection connection) {if (connection != null) {synchronized (pool) {//连接释放后需要进行通知,这样其他消费者能够感知到连接池中已经归还了一个连接pool.addLast(connection);pool.notifyAll();}}}/*** 调用方需要先调用该方法来制定在多少毫秒内超时获取连接* @param mills* @return* @throws InterruptedException*/public Connection fetchConnection(long mills) throws InterruptedException {synchronized (pool) {//完全超时if (mills <= 0) {while (pool.isEmpty()) {pool.wait();}return pool.removeFirst();} else {long future = System.currentTimeMillis() + mills;long remaining = mills;while (pool.isEmpty() && remaining > 0) {pool.wait(remaining);remaining = future - System.currentTimeMillis();}Connection result = null;if (!pool.isEmpty()) {result = pool.removeFirst();}return result;}}}
}
public class ConnectionDriver {static class ConnectionHandler implements InvocationHandler {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (method.getName().equals("commit")) {TimeUnit.MILLISECONDS.sleep(100);}return null;}}/*** 创建一个Connection的代理,在commit时休眠100毫秒* @return*/public static final Connection createConnection() {return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),new Class<?>[] {Connection.class},new ConnectionHandler());}
}
public class ConnectionPoolTest {static ConnectionPool pool = new ConnectionPool(10);//保证所有ConnectionRunner能够同时开始static CountDownLatch start = new CountDownLatch(1);//main线程将会等待所有ConnectionRunner结束后才继续执行static CountDownLatch end;public static void main(String[] args) throws InterruptedException {//线程数量int threadCount = 10;end = new CountDownLatch(threadCount);//每个线程获取连接、释放连接的次数int count = 20;//记录获取到连接次数AtomicInteger got = new AtomicInteger();//记录未获取到连接次数AtomicInteger notGot = new AtomicInteger();//创建threadCount个线程for (int i = 0; i < threadCount; i++) {Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread");thread.start();}//让threadCount个线程同时开始执行start.countDown();//等threadCount个线程运行完才往下执行end.await();//输出结果System.out.println(got.get() + " " + notGot.get());}static class ConnectionRunner implements Runnable {int count;AtomicInteger got;AtomicInteger notGot;public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {this.count = count;this.got = got;this.notGot = notGot;}@Overridepublic void run() {try {//等待main中执行到start.countDown();这句代码,该线程才往下执行start.await();} catch (InterruptedException e) {e.printStackTrace();}//每个线程执行count次获取连接、释放连接操作while (count > 0) {try {//从线程池中获取连接,如果1000ms内无法获取到,将会返回null//分别统计连接获取的数量got和未获取到的数量notGotConnection connection = pool.fetchConnection(1000);if (connection != null) {try {connection.createStatement();connectionmit();} finally {pool.releaseConnection(connection);got.incrementAndGet();}} else {notGot.incrementAndGet();}} catch (Exception e) {e.printStackTrace();} finally {count--;}}//执行完1个线程end.countDown();}}
}
3. 线程池示例
public class DefaultThreadPool<Job extends Runnable> {//线程池最大限制数private static final int MAX_WORKER_NUMBERS = 10;//线程池默认的数量private static final int DEFAULT_WORKER_NUMBERS = 5;//线程池最小的数量private static final int MIN_WORKER_NUMBERS = 1;//工作列表private final LinkedList<Job> jobs = new LinkedList<Job>();//工作者列表private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());//工作者线程的数量private int workerNum = DEFAULT_WORKER_NUMBERS;//线程编号private AtomicLong threadNum = new AtomicLong();public DefaultThreadPool() {initializeWorkers(workerNum);}public DefaultThreadPool(int num) {workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;initializeWorkers(workerNum);}/*** 初始化工作者线程* @param num*/private void initializeWorkers(int num) {for (int i = 0; i < num; i++) {Worker worker = new Worker();workers.add(worker);Thread thread = new Thread(worker, "worder" + threadNum.incrementAndGet());thread.start();}}/*** 执行一个Job,这个Job需要实现Runnable* @param job*/public void execute(Job job) {if (job != null) {//添加一个job,然后进行通知synchronized (jobs) {jobs.addLast(job);jobs.notify();}}}/*** 关闭线程池*/public void shutdown() {for (Worker worker : workers) {worker.shutdown();}}/*** 增加工作者线程* @param num*/public void addWorkers(int num) {synchronized (jobs) {//限制新增的worker数量if (num + this.workerNum > MAX_WORKER_NUMBERS) {num = MAX_WORKER_NUMBERS - this.workerNum;}initializeWorkers(num);this.workerNum += num;}}/*** 减少工作者线程* @param num*/public void removeWorker(int num) {synchronized (jobs) {if (num >= this.workerNum) {throw new IllegalArgumentException("beyond workNum");}//按照给定的数量停止workerint count = 0;while (count < num) {Worker worker = workers.get(count);if (workers.remove(worker)) {count++;}}this.workerNum -= count;}}/*** 获取正在等待执行的任务数量* @return*/public int getJobSize() {return jobs.size();}/*** 工作者线程,负责消费任务*/class Worker implements Runnable {//是否工作private volatile boolean running = true;@Overridepublic void run() {while (running) {Job job = null;synchronized (jobs) {//如果工作列表是空的,那么就waitif (jobs.isEmpty()) {try {jobs.wait();} catch (InterruptedException e) {//感知到外部对工作者线程的中断操作,返回Thread.currentThread().interrupt();return;}}//取出一个jobjob = jobs.removeFirst();}if (job != null) {try {job.run();} catch (Exception e) {//忽略job执行中的异常}}}}public void shutdown() {running = false;}}
}
五、Java中的并发容器和框架
1. ConcurrentHashMap
- ConcurrentHashMap是线程安全且高效的HashMap,在并发编程中使用HashMap可能导致程序死循环,而使用线程安全的HashTable效率又非常低下,基于以上两个原因,便有了ConcurrentHashMap。
- HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
- HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下,因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。
- ConcurrentHashMap使用锁分段技术,首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
2. ConcurrentLinkedQueue
- 要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。
- 使用阻塞算法的队列可以用锁来实现。
- 非阻塞的实现方式则可以使用循环CAS来实现。
3. Java中的阻塞队列
- 阻塞队列是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法。
- 支持阻塞的插入方法意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法意思是在队列空时,获取元素的线程会等待队列变为非空。
- 阻塞队列常用于生产者和消费者的场景。
- 方法
方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
插入方法 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
检查方法 | element() | peek() | - | - |
- JDK 7 提供了7个阻塞队列
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
- DelayQueue:一个使用优先级队列实现的无界阻塞队列
- SynchronousQueue:一个不存储元素的阻塞队列
- LinkedThansferQueue:一个由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列
4. Fork/Join框架
- Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
public class CountTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 2; //阈值private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;//如果任务足够小就开始计算boolean canCompute = (end - start) <= THRESHOLD;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {//如果任务大于阈值,就分裂成子任务int middle = (start + end) / 2;CountTask leftTask = new CountTask(start, middle);CountTask rightTask = new CountTask(middle + 1, end);//执行子任务leftTask.fork();rightTask.fork();//等待子任务执行完int leftResult = leftTask.join();int rightResult = rightTask.join();//合并子任务sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();//生成一个计算任务CountTask task = new CountTask(1, 10);//执行任务Future<Integer> result = forkJoinPool.submit(task);try {System.out.println(result.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
- 异常处理
if (task.isCompletedAbnormally()) {System.out.println(task.getException());
}
六、Java中的13个原子操作类
1. 原子更新基本类型类
- AtomicBoolean
- AtomicInteger
- AtomicLong
public class AtomicIntegerTest {static AtomicInteger ai = new AtomicInteger(1);public static void main(String[] args) {System.out.println(ai.addAndGet(1)); //2,返回修改后的值,当前值为2System.out.println(ai.getAndAdd(1)); //2,返回修改前的值,当前值为3System.out.println(aipareAndSet(3, 4)); //期望值为3,当前值为3,返回true,修改为4System.out.println(aipareAndSet(3, 4)); //期望值为3,当前值为4,返回falseSystem.out.println(ai.getAndIncrement()); //4,返回修改前的值,当前值为5System.out.println(ai.get()); //5}
}
2. 原子更新数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
public class AtomicIntegerArrayTest {static int[] value = new int[] { 1, 2 };static AtomicIntegerArray aia = new AtomicIntegerArray(value);public static void main(String[] args) {System.out.println(aia.getAndSet(0, 3)); //1,返回修改前的值,当前值为3System.out.println(aiapareAndSet(0, 3, 4)); //当前值为3,期望值为3,返回true,修改为4System.out.println(aia.get(0)); //4System.out.println(value[0]); //1,说明AtomicIntegerArray不影响原数组}
}
3. 原子更新引用类型
- AtomicReference
- AtomicReferenceFieldUpdater
- AtomicMarkableReference
public class AtomicReferenceTest {static class User {private String name;private int age;public User(String name, int age) {this.name = name;this.age = age;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';}}public static AtomicReference<User> ar = new AtomicReference<User>();public static void main(String[] args) {User user = new User("yeta", 23);ar.set(user);User uUser = new User("yeta1", 24);System.out.println(arpareAndSet(user, uUser)); //当前值为user,期望值为user,返回true,修改为uUserSystem.out.println(ar.get()); //User{name='yeta1', age=24}System.out.println(user); //User{name='yeta', age=23},说明AtomicReference不影响原引用类型}
}
4. 原子更新字段类
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
- AtomicStampedReference
public class AtomicIntegerFieldUpdaterTest {static class User {private String name;public volatile int age; //必须用public volatile修饰public User(String name, int age) {this.name = name;this.age = age;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';}}//创建原子更新器,并设置需要更新的对象类和对象的属性private static AtomicIntegerFieldUpdater<User> aifu = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");public static void main(String[] args) {User user = new User("yeta", 23);System.out.println(aifu.getAndIncrement(user)); //23System.out.println(aifu.get(user)); //24System.out.println(user); //24}
}
七、Java中的并发工具类
1. 等待多线程完成:CountDownLatch
- 允许一个或多个线程等待其他线程完成操作。
- CountDownLatch的默认构造函数是CountDownLatch(int counter),接收一个int类型的参数作为计数器,传入N,表示需要等待N个点完成。
- 当调用countDown()方法时,N就会减1,await()方法会阻塞当前线程,直到N变为0。
public class CountDownLatchTest {static CountDownLatch cdl = new CountDownLatch(2); //计数器初始值为2public static void main(String[] args) {new Thread(new Runnable() {@Overridepublic void run() {System.out.println(1);cdl.countDown(); //计数器值减1System.out.println(2);cdl.countDown(); //计数器值减1}}).start();try {cdl.await(); //计数器值为0,不会阻塞当前线程} catch (InterruptedException e) {e.printStackTrace();}System.out.println(3);}
}
2. 同步屏障:CyclicBarrier
- 让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续执行。
- CyclicBarrier的默认构造方法是CyclicBarrier(int parties),参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier该线程已经到达了屏障,然后该线程被阻塞。
public class CyclicBarrierTest {//初始化屏障拦截的线程数量为2,表示只要有2个线程到达屏障,屏障就会开门static CyclicBarrier cb = new CyclicBarrier(2);public static void main(String[] args) {try {new Thread(new Runnable() {@Overridepublic void run() {try {cb.await(); //第1个线程到达屏障,阻塞System.out.println(1);} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}).start();Thread.sleep(1000); //让第1个线程先到达屏障cb.await(); //第2个线程到达屏障,屏障开门,运行结果不一定,因为屏障开门后执行哪个线程不一定System.out.println(2);} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}
}
- CyclicBarrier还提供一个构造方法是CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction。
public class CyclicBarrierTest2 {static class MyAction implements Runnable {private String name;public MyAction(String name) {this.name = name;}@Overridepublic void run() {try {System.out.println(this.name + " start " + System.currentTimeMillis());Thread.sleep(1000);System.out.println(this.name + " end " + System.currentTimeMillis());} catch (InterruptedException e) {e.printStackTrace();}}}//初始化屏障拦截的线程数量为2,表示只要有2个线程到达屏障,屏障就会开门//设置屏障开门后优先执行的动作static CyclicBarrier cb = new CyclicBarrier(2, new MyAction("A"));public static void main(String[] args) {try {System.out.println(Thread.currentThread().getName() + " start " + System.currentTimeMillis());new Thread(new Runnable() {@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName() + " start " + System.currentTimeMillis());cb.await(); //第1个线程到达屏障,阻塞System.out.println(Thread.currentThread().getName() + " end " + System.currentTimeMillis());} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}, "B").start();Thread.sleep(1000); //让第B线程先到达屏障cb.await(); //第2个线程到达屏障,屏障开门,先执行A线程,之后运行结果不一定,因为执行哪个线程不一定System.out.println(Thread.currentThread().getName() + " end " + System.currentTimeMillis());} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}
}
- CyclicBarrier应用场景:多线程计算数据,最后合并计算结果
public class CyclicBarrierTest3 implements Runnable {//初始化屏障拦截的线程数量为4,表示只要有4个线程到达屏障,屏障就会开门//设置屏障开门后优先执行的动作private CyclicBarrier cb = new CyclicBarrier(4, this);//存放4个线程的计算结果private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<String, Integer>();public CyclicBarrierTest3() {for (int i = 0; i < 4; i++) {new Thread(new Runnable() {@Overridepublic void run() {//计算结果int result = (int) (Math.random() * 10);//存放map.put(Thread.currentThread().getName(), result);try {//到达屏障cb.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}, "thread" + i).start();}}@Overridepublic void run() {//汇总4个线程的计算结果int sum = 0;for (Map.Entry<String, Integer> entry : map.entrySet()) {sum += entry.getValue();}map.put("result", sum);}public static void main(String[] args) {try {CyclicBarrierTest3 cbt3 = new CyclicBarrierTest3();Thread.sleep(1000); //让4个线程执行完System.out.println(cbt3.map);} catch (InterruptedException e) {e.printStackTrace();}}
}
- CountDownLatch的计数器只能使用1次,而CyclicBarrier的计数器可以使用reset()方法重置
- getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量。
- isBroken()方法可以用来了解线程是否被中断。
3. 控制并发线程数:Semaphore
- Semaphore是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
- Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。
public class SemaphoreTest {//初始化可用的许可证数量private static Semaphore s = new Semaphore(2);public static void main(String[] args) {//开6个线程,但是只有2个线程能并发执行for (int i = 0; i < 6; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {s.acquire(); //获取一个许可证Thread.sleep(1000); //执行任务System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis());s.release(); //归还许可证} catch (InterruptedException e) {e.printStackTrace();}}}, "thread" + i).start();}}
}
- Semaphore的构造方法是Semaphore(int permits),接收一个整型数字,表示可用的许可证数量。
- acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证,还可以使用tryAcquire()方法尝试获取许可证。
- int availablePermits()方法返回此信号量中当前可用的许可证数。
- int getQueueLength()方法返回正在等待获取许可证的线程数。
- boolean hasQueuedThreads()方法返回是否有线程正在等待获取许可证。
- void reducePermits(int reduction)方法减少reduction个许可证,是个protected方法。
- Collection getQueuedThreads()方法返回所有等待获取许可证的线程集合,是个protected方法。
4. 线程间交换数据:Exchanger
- Exchanger用于进行线程间的数据交换,他提供一个同步点,在这个同步点,两个线程可以交换彼此的数据,这两个线程通过exchange()方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange()方法,当两个线程都到达同步点时,这两个线程就可以交换数据。
- Exchanger可以用于遗传算法,也可以用于校对工作。
public class ExchangerTest {private static final Exchanger<String> e = new Exchanger<String>();public static void main(String[] args) {new Thread(new Runnable() {@Overridepublic void run() {try {String valueA = "A";String valueB = e.exchange(valueA); //参数是自己给别人的数据,返回值是别人给自己数据System.out.println(Thread.currentThread().getName() + " " + valueB);} catch (InterruptedException e1) {e1.printStackTrace();}}}, "threadA").start();new Thread(new Runnable() {@Overridepublic void run() {try {String valueB = "B";String valueA = e.exchange(valueB);System.out.println(Thread.currentThread().getName() + " " + valueA);} catch (InterruptedException e1) {e1.printStackTrace();}}}, "threadB").start();}
}
八、Java中的线程池
1. 线程池的实现原理
- ThreadPoolExecutor执行execute()方法有4种情况:
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获取全局锁)。
- 如果当前运行的线程等于或多余corePoolSize,则将任务加入BlockingQueue。
- 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获取全局锁)。
- 如果创建新线程将使得当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
2. 线程池的创建
new ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
- corePoolSize:线程池的基本大小,如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
- maximumPoolSize:线程池允许创建的最大线程数。
- keepAliveTime:线程池的工作线程空闲后,保持存活的时间。
- unit:线程活动保持时间的单位。
- workQueue:任务队列,参照第五章中的第3点。
- threadFactory:用于设置线程的工厂。
- handler:包和策略,当队列和线程池都满了采取的策略。默认是AbortPolicy,直接抛出异常,CallerRunsPolicy只用调用者所在线程来运行任务,DiscardOldestPolicy丢弃队列里最近的一个任务,并执行当前任务,DiscardPolicy不处理,丢弃掉。
3. 向线程池提交任务
- execute()
void execute(Runnable command)
- submit()
Future<?> submit(Runnable task);
4. 关闭线程池
- shutdown():将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
- shutdownNow():首先将线程池的状态设置为STOP,然后尝试停止所有正在执行或暂停任务的线程,并返回等待执行任务的列表。
九、Executor框架
1. Java的线程即是工作单元,也是执行机制。从JDK 5开始,把工作单元和执行机制分离开来。工作单元包括Runnable和Callable,执行机制由Executor框架提供。
2. Executor框架主要由3大部分组成:
- 任务:包括被执行任务需要实现的Runnable接口或Callable接口。
- 任务的执行:包括任务执行机制的核心接口Executor、继承自Executor接口的ExecutorService接口、实现ExecutorService接口的ThreadPoolExecutor类和ScheduledThreadPoolExecutor。
- 异步计算的结果:包括接口Future、实现Future接口的FutureTask类。
3. Exectuor框架的成员:
- ThreadPoolExecutor:
- FixedThreadPool:创建使用固定线程数的线程池,适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,适用于负载比较重的服务器。
- SingleThreadExecutor:创建使用单个线程的线程池,适用于需要保证顺序地执行各个任务,并且在任意时间点,不会有多个线程是活动的应用场景。
- CachedThreadPool:创建一个会根据需要创建新线程的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
- ScheduledThreadPoolExecutor:
- SecheduledThreadPoolExecutor:创建固定个数线程的线程池,适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。
- SingleThreadScheduledExecutor:创建单个线程的线程池,适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。
- Future
- Runnable和Callable:
- Runnable不会返回结果。
- Callable可以返回结果。
4. 示例代码,1个数自增100次
public class Main {private static final int NUM = 10;private static final ExecutorService threadPool1 = Executors.newFixedThreadPool(5);private static AtomicInteger ai1 = new AtomicInteger(0);private static CountDownLatch cdl1 = new CountDownLatch(NUM);private static final ExecutorService threadPool2 = Executors.newSingleThreadExecutor();private static AtomicInteger ai2 = new AtomicInteger(0);private static CountDownLatch cdl2 = new CountDownLatch(NUM);private static final ExecutorService threadPool3 = Executors.newCachedThreadPool();private static AtomicInteger ai3 = new AtomicInteger(0);private static CountDownLatch cdl3 = new CountDownLatch(NUM);public static void main(String[] args) throws InterruptedException {for (int i = 0; i < NUM; i++) {threadPool1.execute(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " incrementAndGet " + ai1.incrementAndGet());cdl1.countDown();}});}cdl1.await();System.out.println(ai1.get());threadPool1.shutdown();for (int i = 0; i < NUM; i++) {threadPool2.execute(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " incrementAndGet " + ai2.incrementAndGet());cdl2.countDown();}});}cdl2.await();System.out.println(ai2.get());threadPool2.shutdown();for (int i = 0; i < NUM; i++) {threadPool3.execute(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " incrementAndGet " + ai3.incrementAndGet());cdl3.countDown();}});}cdl3.await();System.out.println(ai3.get());threadPool3.shutdown();}
5. 有返回值的情况
public class Main {private static final ExecutorService threadPool = Executors.newSingleThreadExecutor();public static void main(String[] args) throws InterruptedException, ExecutionException {Future<Integer> future = threadPool.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {int sum = 0;for (int i = 0; i < 100; i++) {sum++;}return sum;}});System.out.println(future.get()); //100threadPool.shutdown();}
}
更多推荐
记《Java并发编程的艺术》
发布评论