一、多线程编程基础
1.线程安全定义:
当多个线程访问一个类时,如果不用考虑这些线程在运行时环境下的调度和交替执行,并且不需要额外的同步及在调用方代码不必做其他的协调,这个类的行为仍然是正确的,那么这个类就被称之为是线程安全的。简言之对于线程安全类的实例进行顺序或并发的一系列操作,都不会导致实例处于无效状态。
只有对象状态可变,且存在对象共享,才需要考虑线程安全。
可以通过下面的方法来确保线程安全:
(1).无状态对象:
无状态对象不包含域也没有引用其他类的域,一次特定计算的瞬时状态只会唯一存在本地变量中,这些本地变量存储在线程的栈中,只有执行线程才能访问,因此无状态对象永远是线程安全的。
(2).不可变对象:
不可变对象需要满足下面条件:
A.对象本身是final的(避免被子类化),所有域都是final类型。
B.不可变对象的状态在创建后就不能再改变,每次对他们的改变都是产生了新的不可变对象的对象。
C.不可变对象能被正确地创建(在创建过程中没有发生this引用逸出)。
不可变对象是线程安全的,不需要任何同步或锁的机制就可以保证安全地在多线程之间共享。
(3).原子变量:
介绍原子变量前首先介绍原子操作,假设有操作A和B,如果从执行A的角度看,当其他线程执行B时,要么全部执行完成,要么一点都没有执行,这样A和B互为原子操作,不满足原子操作要求的操作被称为复合操作,原子操作是线程安全的。
JDK的Java.util.concurrent.atomic包中包括了原子变量类,这些类用来实现数字和对象引用的原子状态转换。
原子变量自身是线程安全的,但是如果一个不变约束涉及多个变量时,变量间不是彼此独立的,无论这些变量是否是原子变量都不能确保线程安全,而需要在同一个原子操作中更新这些相互关联的状态变量才能确保线程安全。
(4).正确使用线程同步:
通过使用synchronized关键字独占锁、显式锁、volatile等同步机制实现的相对线程安全。
2.多线程的竞争条件与数据竞争:
(1).竞争条件:
指多个线程或者进程在读写一个共享数据时结果依赖于它们指令执行的相对时序,即要想得到正确的结果,要依赖于幸运的时序。
最常见的竞争条件是“检查再运行”,使用一个潜在的过期值作为决定下一步操作的依据。
(2).数据竞争:
指没有使用同步来协调所有那些共享的非final域访问的情况,一个线程写入一个变量,可以被另一个线程读取;一个线程读取刚刚被另一个线程写入的变量,如果两个线程都没有使用同步,则会处于数据竞争的风险中。
不是所有的竞争条件都是数据竞争,也不是所有的数据竞争都是竞争条件,但它们都会引起并发程序以不可预期的方式失败。
3.内部锁:
Java提供了强制原子性的内置锁机制:synchronized块。
一个synchronized块有两部分:锁对象引用,以及该锁保护的代码块。
synchronized方法的锁是该方法所在的对象本身,静态的synchronized方法的锁是从Class对象上获取的锁。内部锁的特性如下:
(1).自动获得和释放:
每个java对象都可以隐式地扮演一个用于同步的锁的角色,这些内置的锁被称为内部锁或监视器锁,执行线程进入synchronized块之前自动获得锁,而无论是正常退出还是抛出异常,线程都会自动释放锁。因此获得内部锁的唯一途径是进入这个内部锁保护的同步块或方法。
(2).互斥性:
内部锁在java中扮演了互斥锁的角色,即至多只有一个线程可以拥有锁,没有获取到锁的线程只能等待或阻塞直到锁被释放,因此同步块可以线程安全地原子执行。
(3).可重入性:
可重入是指对于同一个线程,它可以重新获得已有它占用的锁。
可重入性意味着锁的请求是基于”每线程”而不是基于”每调用”,它是通过为锁关联一个请求计数器和一个占有它的线程来实现。
可重入性方便了锁行为的封装,简化了面向对象并发代码的开发,可以防止类继承引起的死锁,例子如下:
publicclass Widget {
public synchronized void doSomething(){
......
}
}
publicclass LoggingWidget extends Widget {
public synchronized voiddoSomething(){
System.out.println(toString() + “:calling doSomething”);
super.doSomething();
}
}
子类LoggingWidget覆盖了父类Widget中synchronized类型的doSomething方法,并调用了父类的中的同步方法,因此子类LoggingWidget和父类Widget在调用doSomething方法之前都会先获取Widget的锁,若内部锁没有可重入性,则super.doSomething的调用就会因为无法获得锁而被死锁。
4.内存可见性:
可见性是关于在哪些情况下,一个线程执行的结果对另一个线程是可见的问题。
在单线程的情况下,程序按顺序执行保证内存可见性,但是在多线程环境下,为了优化性能,在没有同步的情况下,java存储模型允许编译器进行指令重排序,指令重排序的结果是程序指令的执行顺序是不确定的,结果会导致内存可见性问题。
内存可见性不仅避免一个线程修改其他线程正式使用的对象状态,而且还保证一个线程修改了对象的状态之后,其他的线程能够真正看到改变,可以通过如下方式保证多线程的内存可见性:
(1).锁:
锁不仅仅是关于同步与互斥的的,也是关于内存可见性的,为了保证所有线程都能看到共享的、可变变量的最新值,读取和写入线程必须使用公共的锁进行同步,锁确保释放锁之前对共享数据做出的更改对于随后获得该锁的另一个线程是可见的 。
(2).volatile变量:
volatile是一种弱形式的同步, volatile确保对一个变量的更新以可预见的方式告知其他线程。当一个域被声明为volatile类型后,编译器与运行时会监视这个变量:它是共享的,而且对它的操作不会与其他内存操作一起被重排序,volatile变量不会缓存在寄存器或者缓存在其他处理器隐藏的地方,因此读一个volatile类型的变量时,总是返回由某一线程所写入的最新值。
出于简易性或可伸缩性的考虑,您可能倾向于使用 volatile 变量而不是锁。当使用 volatile 变量而非锁时,某些习惯用法更加易于编码和阅读。此外,volatile 变量不会像锁那样造成线程阻塞,因此也很少造成可伸缩性问题。在某些情况下,如果读操作远远大于写操作,volatile 变量还可以提供优于锁的性能优势,但是请注意volatile只保证可见性,不保证原子性。
只有满足如下条件才能使用volatile 变量:
A. 对变量的写操作不依赖于当前值。
B. 该变量没有包含在具有其他变量的不变式中。
5.发布和逸出:
(1).发布:
发布一个对象是指使该对象能够被当前范围之外的代码所使用,例如将一个引用存储到其他代码可以访问的地方;在一个非私有的方法中返回该引用;将该对象传递到其他类的方法中等。
最常见的发布方式是将对象的引用存储到公共静态域中,例如:
publicstatic Set<Secrte> knownSecrets;
publicvoid initialize(){
knowSecrets = newHashSet<Secret>();
}
(2).逸出:
逸出是指一个对象在尚未准备好时就将它发布。
对象逸出会导致对象的内部状态被暴露,可能危及到封装性,使程序难以维持稳定;若发布尚未构造完成的对象,可能危及线程安全问题。
最常见的逸出是this引用在构造时逸出,导致this引用逸出的常见错误有:
A.在构造函数中启动线程:
当对象在构造函数中显式还是隐式创建线程时,this引用几乎总是被新线程共享,于是新的线程在所属对象完成构造之前就能看见它。
避免构造函数中启动线程引起的this引用逸出的方法是不要在构造函数中启动新线程,取而代之的是在其他初始化或启动方法中启动对象拥有的线程。
B.在构造方法中调用可覆盖的实例方法:
在构造方法中调用那些既不是private也不是final的可被子类覆盖的实例方法时,同样导致this引用逸出。
避免此类错误的方法时千万不要在父类构造方法中调用被子类覆盖的方法。
C.在构造方法中创建内部类:
在构造方法中创建内部类实例时,内部类的实例包含了对封装实例的隐含引用,可能导致隐式this逸出。例子如下:
publicclass ThisEscape {
public ThisEscape(EventSource source){
source.registerListener(newEventListener() {
public void onEvent(Event e) {
doSomething(e);
}
});
}
}
上述例子中的this逸出可以使用工厂方法来避免,例子如下:
publicclass SafeListener {
private final EventListener listener;
public SafeListener(){
listener = new EventListener(){
public void onEvent(Event e){
doSomething(e);
}
);
}
public static SafeListenernewInstance(EventSource source) {
SafeListener safe = newSafeListener();
source.registerListener(safe.listener);
return safe;
}
}
6.线程封闭:
访问共享的、可变的数据要求使用同步。线程封闭是一种将数据仅在单线程中访问而不共享,不需要任何同步的最简单的实现线程安全的方式。
当对象(无论本身是否线程安全)封闭在一个线程中,会自动成为线程安全,线程封闭的常用做法有:
(1).栈限制:
栈限制是线程封闭的一种特例,只能通过本地变量才可以触及对象,本地变量使对象限制在执行线程中,存在于执行线程栈,其他线程无法访问这个栈,从而确保线程安全。栈限制的例子如下:
public intloadTheArk(Collection<Animal> candidates){
SortedSet<Animal> animals;
int numPairs = 0;
Animal candidate = null;
//animals被限制在本地方法栈中
animals = new TreeSet<Animal>(newSpeciesGenderComparator());
animals.addAll(candidates);
for(Animal a : animals){
if(candidate == null ||!candidate.isPotentialMate(a)){
candidate = a;
}else{
ark.load(new AnimalPair(candidate,a));
++numPairs;
candidate = null;
}
}
return numPairs;
}
注意上面的栈限制例子中animals不能逸出,否则就会破坏限制性。
(2).ThreadLocal:
ThreadLocal线程本地变量是一种规范化的维护线程限制的方式,它允许将每个线程与持有数值的对象关联在一起,为每个使用它的线程维护一份单独的拷贝。ThreadLocal提供了set和get访问器,get总是返回由当前线程通过set设置的最新值。
ThreadLocal线程本地变量通过用于防止在基于可变的单例(singleton)或全局变量的设计中,出现不正确的共享。ThreadLocal线程本地变量的例子如下:
privatestatic ThreadLocal<Connection> connectionHolder = newThreadLocal<Connection>(){
public Connection initialValue(){
returnDriverManager.getConnection(DB_URL);
}
};
publicstatic Connection getConnection(){
return connectionHolder.get();
}
7.向已有的线程安全类添加功能:
如果JDK或者第三方类库提供的线程安全类只能满足我们大部分的要求,即不能完全满足要求时就需要对其进行添加新操作,这个看似简单的问题往往会引起线程安全问题。
以向一个线程安全的List添加一个原子的缺少即加入操作为例,有如下方法:
(1).修改原始类:
添加一个新原子操作最安全的方式就是修改原始类,在原始类中添加新操作,但是软件设计的开放闭合原则以及有可能无法得到源码(或无法自由修改源码)等问题决定修改原始类是最不可能的。即便可以修改原始类,也要保证理解原有的同步策略,维持原有的设计。
(2).扩展原始类:
如果原始类在设计上是可扩展的(没有声名为final,即允许继承),则扩展原始类并添加新方法,例子如下:
publicclass BetterVector<E> extends Vector<E> {
public synchronized boolean putIfAbsent(Ex){
boolean absent = !contains(x);
if(absent){
add(x);
}
return absent;
}
}
扩展原始的线程安全类要特别小心,虽然做法非常直观,但是一定要明白原始的线程安全类的同步策略,新扩展的类要使用和原始类相同的锁来控制对基本类状态的并发访问,这种访问有很大的局限性,如果原始类使用的是内部私有锁同步策略或者没有告知使用者同步策略,则该方式是不能支持的。
Vector是使用内部锁来控制并发访问的,因此上面代码中BetterVector也使用内部锁控制并发访问是可以维持Vector同步策略。
(3).客户端加锁:
对于一个由Collections.synchronizedList封装的ArrayList,客户端不知道同步封装工厂方法返回的List对象的同步策略的时候,前面所介绍的扩展原始类方案就无法支持,这时我们就需要将新增功能添加在客户端,并在客户端进行加锁同步。
客户端加锁要非常小心,不注意就会发生错误,下面例子演示一个常见的错误:
publicclass ListHelper<E> {
public List<E> list =Collections.synchronizedList(new ArrayList<E>());
public synchronized boolean putIfAbsent(Ex) {
poolean absent = !list.contains(x);
if (absent) {
list.add(x);
}
return absent;
}
}
上述代码错误在于使用了与线程同步List不同的锁,上面代码的putIfAbsent方式使用的是ListHelper对象的内部锁,而线程同步List的其他原子操作肯定用的不是ListHelper对象内部锁,因此putIfAbsent对于List的其他操作而言并不是原子化的,上述代码是很多人经常不小心犯的错误。
避免上述的错误的办法是让客户端的putIfAbsent方法所使用的锁与List的其他操作所使用的锁是同一个锁,正确的代码如下:
publicclass ListHelper<E> {
public List<E> list =Collections.synchronizedList(new ArrayList<E>());
public boolean putIfAbsent(E x) {
synchronized (list) {
poolean absent =!list.contains(x);
if (absent) {
list.add(x);
}
return absent;
}
}
}
(4).组合:
面向对象有两种常用的扩展方式:继承(is-a)和组合(has-a),设计原则也经常推荐优先使用组合,除非情况非常合适,否则尽量少使用继承。
组合对于向现有线程安全类添加新功能时同样适合,通过添加一层额外的锁层,组合对象将操作委托给底层List实例,无论底层List实例是否实现线程安全,组合对象的putIfAbsent方法都可以保证操作的原子性,例子如下:
publicclass ImprovedList<T> implements List<T>{
private final List<T> list;
public ImprovedList<T>(List<T>list){
this.list = list;
}
public synchronized boolean putIfAbsent(Tx){
boolean absent= !list.contains(x);
if(absent){
list.add(x);
}
return absent;
}
public synchronized void clear(){
list.clear();
}
......
}
二、多线程容器类
1.同步容器类复合操作容易出现的问题:
JDK中同步容器类包括两部分:
早期JDK中的Vector和Hashtable;
JDK1.2引入的同步包装类,即由Collections.synchronizedXxx工厂方法创建的容器类。
同步容器类都是线程安全的,它们提供的基本操作都是原子操作,但是对于诸如迭代、导航和缺少即加入的条件运算等复合操作来说,通常需要使用额外的客户端加锁进行保护来确保线程安全。
(1).条件操作:
下面例子演示同步容器类的非线程安全条件运算:
publicstatic object getLast(Vector list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public staticvoid deleteLast(Vector list){
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
上述代码在单线程中没有任何问题,但是如果是多线程并发运行则问题会非常大,例如一个拥有10个元素的Vector,一个线程调用其getLast方法,与此同时另一个线程调用其deleteLast方法,若deleteLast方法早于getLast方法执行,则就会导致ArrayIndexOutOfBoundsException。
修复上述问题方法很简单,使用客户端加锁,例子代码如下:
publicstatic object getLast(Vector list){
synchronized(list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
publicstatic void deleteLast(Vector list){
synchronized(list){
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
(2).迭代:
下面例子演示同步容器类的非线程安全迭代:
for(int i= 0; i < vector.size(); i++){
doSomething(vector.get(i));
}
若多个线程并发运行环境下,一个线程调用vector的delete操作,一个线程调用上面的迭代操作,则size和get方法在调用直接vector已经发生变化,因此也会导致ArrayIndexOutOfBoundsException。
修复上述问题方法很简单,同样使用客户端加锁,例子代码如下:
synchronized(vector){
for(int i = 0; i < vector.size();i++){
doSomething(vector.get(i));
}
}
通过客户端加锁,确保了同步容器复合操作的线程安全,但是却增加了可伸缩性开销,削弱了并发性。
2.容器的ConcurrentModificationException:
容器的并发修改异常ConcurrentModificationException在单线程和多线程都会发生,很多对集合不熟悉的人都可能多次碰到过这个问题,下面详细介绍:
(1).单线程环境下的并发修改异常:
单线程环境下,如果在对一个集合容器进行遍历的同时对集合容器直接进行增删操作,则就会导致并发修改异常,例子代码如下:
for(int i= 0; i < list.size(); i++){
if(list.get(i) == 2){
list.remove(i);
}
}
运行上面例子时就会产生发修改异常ConcurrentModificationException,解决这个问题很简单,遍历时用迭代器增删元素,例子代码如下:
Iteratorit = list.iterator();
while(it.hasNext()){
int value = it.next();
if(value == 2){
it.remove();
}
}
(2).多线程环境下的并发修改异常:
上述使用迭代器在遍历集合的时候删除集合元素在单线程可以避免修改异常ConcurrentModificationException,但是在多线程环境仍然会发生修改异常ConcurrentModificationException。
在多线程情况下迭代器发生并发修改异常的原因是同步容器设计时并没有考虑并发修改问题,它们通常通过把修改计数器(modification count)与容器关联起来,如果在迭代期间计数器被修改,在hasNext或next方法就会通过抛出一个ConcurrentModificationException异常的及时失败(fail-fast)方式告知不能支持并发修改。
解决多线程并发修改的方法如下:
A.在所有遍历增删地方都加上synchronized或者使用Collections.synchronizedXxx,虽然能解决问题但是并不推荐,因为增删造成的同步锁可能会阻塞遍历操作。
B.推荐使用ConcurrentHashMap或者CopyOnWriteArrayList等并发容器。
3.并发容器:
同步容器通过对容器的所有状态进行串行访问,从而实现了它们的线程安全,但是这样做的代价是削弱了并发性,当多个线程共同竞争容器级的锁时,吞吐量将会下降。
为了改进同步容器的并发性,JDK1.5引入了并发容器,这些并发容器提供多线程环境下不会抛出并发修改异常的迭代器,常用的并发容器如下:
(1).ConcurrentHashMap:
同步容器使用一个公共锁同步每一个方法,并严格限制只能有一个线程同时访问容器,而ConcurrentHashMap采用分离锁(默认一把全局锁被分离为16把锁,ConcurrentHashMap的实现使用了一个包含16个锁的Array,每一个锁都守护Hash Bucket的1/16;Bucket N由第N mod 16个锁来守护,因此把锁请求减少为原来的1/16且能支持16个并发的写)这种细粒度的锁机制允许更深层次的共享访问,这样可以支持任意数量的线程对ConcurrentHashMap进行读操作,同时支持有限数量的写线程并发修改ConcurrentHashMap,而且读写线程可以并发访问ConcurrentHashMap。
ConcurrentHashMap与其他并发容器所提供的多线程环境下不会抛出并发修改异常的迭代器是由其返回的弱一致性迭代器决定的,弱一致性迭代器可以容许并发修改。当迭代器创建的时,它会遍历已有元素,并且可以感应到在迭代器被创建后对容器的修改。这种弱一致性在调用那些需要对整个容器进行加锁的方法如size或isEmpty时可能提供不精确的值,因此只有当程序需要在独占访问中加锁时,才不能使用ConcurrentHashMap,而在绝大多数情况下ConcurrentHashMap可以带来更好的伸缩性。
ConcurrentHashMap同时将一些常用的复合操作实现为原子操作:
publicinterface ConcurrentMap<K, V> extends Map<K, V> {
//只有当没有找到匹配K的值时才插入
V putIfAbsent(K key, V value);
//只有当K与V都匹配时才移除
boolean remove(K key, V value);
//只有当K与oldValue都匹配时才替换
boolean replace(K key, V oldValue, VnewValue);
//只有当K匹配时才替换
boolean replace(K key, V newValue);
}
在JDK1.6中又加入了ConcurrentSkipListMap来代替同步的SortedMap,加入ConcurrentSkipListSet来代替同步的SortedSet。
(2).写入时复制并发容器:
CopyOnWriteArrayList和CopyOnWriteArraySet这两个写入时复制并发容器用了代替同步的List和Set容器,避免了在迭代期间对容器的加锁和复制,在每次修改时会创建并重新发布一个新的容器拷贝。
当对容器的迭代操作的频率远高于对容器的修改操作的频率时,使用写入时复制容器是个合理的选择。
(3).阻塞队列:
阻塞队列Blocking queue提供了可阻塞的put和take方法,它们与可定时的offer和poll是等价的。如果Queue已满,put方法会被阻塞直到有空间可用;如果Queue已空,take方法会阻塞直到有元素可用。Queue的长度可以有限,也可以无限,无限的Queue永远不会满,因此put方法永远不会被阻塞。阻塞队列非常适合于生产者-消费者模式。
Blockingqueue的常见实现有:
A.LinkedBlockingQueue和ArrayBlockingQueue:
提供FIFO队列,比同步List容器拥有更好的性能。
B.PriorityBlockingQueue:
提供按优先级顺序排序的队列。
C.SynchronousQueue:
不是一个真正意义上的队列,不会为队列元素维护任何存储空间,它只维护一个排队的线程清单,这些线程等待把元素加入(enqueue)队列或移出(dequeue)队列。
以洗盘子为例,传统的队列类似于盘架,洗盘子的人(生产者)把洗好的盘子放到盘架上就是入队,烘干盘子的人(消费者)从盘架取走洗好的盘子烘干就是出队;SynchronousQueue就是没有盘架,洗盘子的人直接把洗好的盘子交给烘干盘子的人。
SynchronousQueue实现队列的方式非常直接地移交工作,减少了在生产者和消费者直接移动数据的延迟时间,但是因为它没有存储能力,所以除非另一个线程已经准备好参与移交工作,否则put和take会一直阻塞,因此SynchronousQueue队列只有在消费者充足的情况下比较合适。
(4).双端队列:
JDK1.6新增了两个分别扩展了Queue和BlockingQueue的容器Deque和BlockingDeque,它们是双端队列,允许高效地在队头和队尾进行插入和移除,它们的实现类是ArrayDeque和LinkedBlockingDeque。
双端队列非常适合于工作窃取(work stealing)模式,在生产者-消费者模式中,所有的消费者只能共享一个工作队列,而在工作窃取模式中,每个消费者都有一个自己的双端队列,大多数情况下只访问自己的双端队列,如果一个消费者完成了自己双端队列中的全部工作,它可以偷取其他消费者的双端队列中的末尾任务(注意是从尾部而不是头部获取工作,降低双端队列的竞争),因为工作线程并不会竞争一个共享的任务队列,所以工作窃取模式比传统的生产者-消费者模式具有更佳的伸缩性。
双端队列的工作窃取模式非常适合解决生产者-消费者模式中生产者也同时是消费者,消费者同时又是生产者的情况。
4.用阻塞队列实现生产者消费者模式:
生产者-消费者模式是一个经典的线程同步问题,可以将生产者和消费者解耦,实现高并发,很多JDK的类库就使用了该模式,比如Java并发线程池。传统使用互斥或信号量实现的生产者-消费者模式例子很多,这里以桌面搜索程序的扫描文件和建立索引为例,使用阻塞队列来实现生产者-消费者模式,代码如下:
//扫描文件,生产者
publicclass FileCrawler implements Runnable{
private final BlockingQueue<File>fileQueue;
private final FileFilter filter;
private final File root;
publicFileCrawler(BlockingQueue<File> queue, FileFilter filter, Fileroot){
this.fileQueue = queue;
this.filter = filter;
this.root = root;
}
public void run(){
try{
crawl(root);
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}
private void crawl(File root) throwsInterruptedException{
File[] files =root.listFiles(filter);
if(files != null){
for(File file : files){
if(file.isDirectory()){
crawl(file);
}elseif(!alreadyIndexed(file)){
fileQueue.put(file);
}
}
}
}
}
//建立索引,消费者
Publicclass Indexer implements Runnable{
private final BlockingQueue<File>queue;
public Indexer(BlockingQueue<File>queue){
this.queue = queue;
}
......
public void run(){
try{
while(true){
indexFile(queue.take());
}
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}
}
//开始搜索
publicstatic void startIndexing(File[] roots){
BlockingQueue<File> queue = newLinkedBlockingQueue<File>(BOUND);
FileFilter filter = new FileFilter(){
public boolean accept(File file){
return true;
}
};
for(File file : roots){
new Thread(new FileCrawler(queue,filter, root)).start();
}
for(int i = 0; i < N_CONSUMERS;i++){
new Thread(newIndexer(queue)).start();
}
}
5.闭锁:
闭锁(Latch)可以延迟线程的进度直到线程达到终止状态,闭锁可以用来确保特定活动直到其他活动完成后才发生。
CountDownLatch是一个灵活的闭锁实现,允许一个或多个线程等待一个事件集的发生,它初始化为一个正整数计数器,用来表示需要等待的事件数,countDown操作对计数器进行减一操作,表示一个事件已经发生,await方法等待计数器到达零,若到达零则表示所有等待的事件都已经发生。
我们以统计线程执行时间为例演示闭锁的使用,代码如下:
publicclass TestHarness {
public long timeTasks(int nThreads, finalRunnable task) throws InterruptedException{
final CountDownLatch startGate = newCountDownLatch(1);
final CountDownLatch endGate = newCountDownLatch(nThreads);
for(int i = 0; i < nThreads;i++){
Thread t = new Thread(){
public void run(){
try{
startGate.await();
try{
task.run();
}finally{
endGate.countDown();
}
}catch(InterruptedExceptionignored){
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}
一个闭锁工作起来就像一扇大门,直到闭锁达到终点状态之前,门一直是关闭的,没有线程能够通过,在终点状态到来的时候,门打开了,允许所有的线程都通过,一旦闭锁达到了终点状态,就再也不能改变状态了,即闭锁是一次性对象,一旦进入最终状态就不能被重置了,这是闭锁和关卡的最大区别。
6.关卡:
关卡(Barrier)类似于闭锁,它们都能阻塞一组线程,直到某些事件发生,其中关卡与闭锁不同之处在于:
A.所有线程必须同时达到关卡点,才能继续出现,闭锁等待的是事件,关卡等待的是其他线程。
B.闭锁是一次性使用的,一旦进入到最终状态,就不能被重置了;关卡可以重复使用。
JDK中常见的关卡有以下两种:
(1).CyclicBarrier:
CyclicBarrier允许一个给定数量的成员多次集中在一个关卡点,当线程达到关卡点时,调用await方法,await会被阻塞直到所有线程都达到关卡点,当所有线程都达到了关卡点,关卡就被成功地突破了,所有线程都被释放,关卡会重置以备下一次使用。若对await调用超时,或者阻塞中的线程被中断,那么关卡就被认为是失败的,所有对await未完成的调用都通过BrokenBarrierException终止。
CyclicBarrier允许向构造方法传递一个Runnable的关卡行为,在阻塞线程被释放之前是不能执行的,当成功通过关卡时,会在一个子任务线程中执行该关卡行为。
在这并行迭代算法中非常有用,这个算法会把一个问题拆分成一些列相互独立的子问题,使用CyclicBarrier关卡的例子代码如下:
publicclass Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
public Worker(int row) {
myRow = row;
}
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedExceptionex) {
return;
} catch (BrokenBarrierExceptionex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N, newRunnable() {
public void run() {
mergeRows(...);
}
});
for (int i = 0; i < N; ++i){
new Thread(newWorker(i)).start();
}
waitUntilDone();
}
}
(2).Exchanger:
Exchanger是一种两步关卡,在关卡点会交换数据。
Exchanger为线程交换信息提供了非常方便的途径,它可以作为两个线程交换对象的同步点,当一个线程调用Exchange对象的exchange方法后,它会陷入阻塞状态,直到另一个线程也调用了exchange方法,然后以线程安全的方式交换数据,之后这两个线程继续运行,使用Exchanger关卡的例子代码如下:
publicclass FillAndEmpty {
Exchanger<DataBuffer> exchanger = newExchanger<DataBuffer>();
DataBuffer initialEmptyBuffer =......;
DataBuffer initialFullBuffer = ......;
class FillingLoop implements Runnable{
public void run() {
DataBuffer currentBuffer =initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if(currentBuffer.isFull()){
currentBuffer =exchanger.exchange(currentBuffer);
}
}
} catch (InterruptedException ex) {... handle ... }
}
}
class EmptyingLoop implements Runnable{
public void run() {
DataBuffer currentBuffer =initialFullBuffer;
try {
while (currentBuffer != null){
takeFromBuffer(currentBuffer);
if(currentBuffer.isEmpty()){
currentBuffer =exchanger.exchange(currentBuffer);
}
}
} catch (InterruptedException ex) {... handle ...}
}
}
public void start() {
new Thread(newFillingLoop()).start();
new Thread(newEmptyingLoop()).start();
}
}
7.FutureTask:
多线程的Runnable任务是不返回执行结果的,Callable任务是携带执行结果的,FutureTask用于获取Callable任务的执行结果,它有3个状态:等待、运行和完成,完成包括所有计算以任意的方式结束,包括正常结束(FutureTask可以取消正在执行的任务)、取消和异常。
FutureTask通过Future的get方法获取任务执行结果,它依赖于任务的执行状态,如果已经完成,get可以立刻得到返回结果,否则会被阻塞直到任务装入运行状态,然后返回结果或抛出异常。FutureTask把执行结果从运行的线程传送到需要这个结果的线程。
Executor线程池框架利用FutureTask来完成异步任务,并可以用来进行任何潜在的耗时计算,而且可以在真正需要结果之前就启动他们开始计算。FutureTask的例子代码如下:
publicclass Preloader {
private final FutureTask<ProductInfo>future = new FutureTask<ProductInfo>(
new Callable<ProductInfo>(){
public ProductInfo call()throws DataLoadException {
return loadProductInfo();
}
}
);
private final Thread thread = newThread(future);
public void start() {
thread.start();
}
public ProductInfo get() throwsDataLoadException, InterruptedException{
try{
return future.get();
}catch(ExecutionException e){
......
}
}
}
8.信号量:
计数信号量(Semaphore)用来控制能够同时访问某特定资源的活动数量,或者同时执行某一个给定操作的数量。一个信号量管理一个有效的许可集,许可的初始量通过构造函数传入,若还有剩余许可,活动能够获得许可,并在使用之后通过release方法释放许可;若已经没有许可,那么acquire方法会被阻塞,直到有可用许可、中断或超时为止。
计数信号量的一种退化形式是二元信号量:一个计数初始值为1的Semaphore。二元信号量可用做互斥锁,它有不可重入锁的语意。
计数信号量可以用来实现资源池或者给容器定边界,一个使用信号量的例子代码如下:
publicclass BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set =Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T t) throwsInterruptedException{
sem.acquire();
boolean wasAdded = false;
try{
wasAdded = set.add(t);
teturn wasAdded;
}finally{
if(!wasAdded){
sem.release();
}
}
}
public boolean remove(T t){
boolean wasRemoved =set.remove(T);
if(wasRemoved){
sem.release();
}
return wasRemoved;
}
}
三、线程池的基础
1.JDK中的Executor框架是基于生产者-消费者模式的线程池,提交任务的线程是生产者,执行任务的线程是消费者。
Executor线程池可以用于异步任务执行,而且支持很多不同类型任务执行策略,同时为任务提交和任务执行之间的解耦提供了标准方法。
Executor线程池支持如下三种线程执行策略:
(1).顺序执行:
类似于单线程顺序执行任务,优点是实现简单;缺点是扩展性受限,执行效率低下,例子代码如下:
publicclass WithinThreadExecutor implements Executor{
publicvoid execute(Runnable r){
r.run();
}
}
(2)每请求每线程:
为每个请求创建一个新的线程,优点是可以并行处理;缺点是线程生命周期开销大,活动线程受内存资源、JVM以及操作系统的限制,当负载过大时响应性和吞吐量会下降严重,同时还会影响稳定性,例子代码如下:
publicclass ThreadPerTaskExecutor implements Executor{
publicvoid execute(Runnable r){
newThread(r).start();
}
}
(3)线程池:
使用线程池可以重用已有线程,减少线程生命周期开销,同时可以调整活动线程数量,既可以确保足够的并发性,又避免过多线程相互竞争资源,例子代码如下:
publicclass TaskExecutionWebServer {
privatestatic final int NTHREADS = 100;
privatestatic final ExecutorService exec = Executors
.newFixedThreadPool(NTHREADS);
publicstatic void main(String[] args) throws IOException {
ServerSocketsocket = new ServerSocket(80);
while(true) {
finalSocket connection = socket.accept();
Runnabletask = new Runnable() {
publicvoid run() {
handleRequest(connection);
}
};
exec.execute(task);
}
}
}
2.Executor常用的创建线程池静态工厂方法:
(1).newFixedThreadPool:
创建一个定长的线程池,每当提交一个任务就创建一个线程,直到达到池的最大长度,这时线程池会保持长度不再变化,若一个线程由于非预期的异常而结束,线程池会补充一个新的线程。
(2).newCachedThreadPool:
创建一个可缓存的线程池,若当前线程池的长度超过了处理的需要时,它可以灵活地回收空闲的线程,当需求增加时,它可以灵活地添加新的线程,而并不会对池的长度做任何限制。
(3).newSingleThreadExecutor:
创建一个单线程化的executor,只创建唯一的工作者线程来执行任务,若这个线程异常结束,会有另一个取代它。Executor会保证任务依照任务队列所规定的顺序执行。
(4).newScheduledThreadPool:
创建一个支持定时的以及周期性执行的任务的定长线程池。
3.Executor的生命周期:
ExecutorService接口扩展了Executor,提供了以下用于生命周期管理的方法:
publicinterface ExecutorService extends Executor{
voidshutdown();
list<Runnable>shutdownNow();
booleanisShutdown();
booleanisTerminated();
booleanawaitTermination(long timeout, TimeUnit unit) throws InterruputedException;
......
}
ExecutorService接口暗示了Executor的生命周期有以下3中状态:
(1).运行状态:
ExecutorService最初创建后的初始状态是运行状态。
(2).关闭状态:
ExecutorService的sutdown方法会启动一个平缓的关闭过程,停止接收新任务,同时等待已提交的任务执行完成(包括尚未开始执行的任务)。
ExecutorService的sutdownNow方法会启动一个强制的关闭过程,尝试取消所有运行中的任务和排在队列中尚未开始执行的任务。
(3)终止状态:
一旦所有任务全部完成后,ExecutorService就会进入终止状态,通过调研ExecutorService的awaitTermination方法等待达到终止状态,也可以调用isTerminated来轮询是否达到终止状态。
4.Timer与ScheduledExecutorService
在JDK1.5之前,经常使用Timer(开源的Quartz框架也可以)作为定时器管理任务的延迟或周期性执行,在JDK1.5引入了ScheduledExecutorService,使用线程池作为定时器管理任务的延迟或周期性执行,二者的区别如下:
(1).Timer对调度的支持是基于绝对时间的,不支持相对时间,因此任务对系统时钟的改变是敏感的;ScheduledExecutorService只支持相对时间。
(2).Timer只创建唯一的线程来执行所有的timer任务,若一个timer任务的执行很耗时,会导致其他的timer任务时效准确性问题,例如一个timer任务每10ms执行一次,而另一个timer任务每40ms执行一次,若按固定频率进行调度则重复出现的任务会在耗时的任务完成后快速联系地被调用4次,若按延迟进行调度则完全丢失4次调用。
ScheduledExecutorService可以提供多个线程来执行延迟或按固定频率执行的周期性任务,解决了Timer任务时效准确性问题。
(3).若Timer任务抛出未检查异常时,Timer将会被异常地终止,Timer也不会再重新恢复线程执行,它错误地认为整个Timer都被取消了,从而产生无法预料的线程泄露:所有已被安排但尚未执行的Timer任务永远不会再执行了,新的任务也不能被调度了。
下面的例子代码演示Timer的线程泄露:
publicclass OutOfTimer {
publicstatic void main(String[] args) throws Exception {
Timertimer = new Timer();
timer.schedule(newThrowTask(), 1);
TimeUnit.SECONDS.sleep(1);
timer.schedule(newThrowTask(), 1);
TimeUnit.SECONDS.sleep(5);
}
staticclass ThrowTask extends TimerTask{
publicvoid run(){
System.out.println("I'minvoked.");
thrownew RuntimeException();
}
}
}
上面代码运行后只会打印出一行I'm invoked.然后就抛出Timer already cancelled异常。
ScheduledExecutorService可以妥善地处理异常,避免线程泄露。
下面的例子代码演示ScheduledExecutorService在异常之后仍然可以继续运行:
publicclass OutOfScheduledExecutor {
publicstatic void main(String[] args) throws Exception {
ScheduledExecutorServiceservice = Executors.newScheduledThreadPool(1);
service.schedule(newThrowTask(), 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(1);
service.schedule(newThrowTask(), 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(5);
service.shutdown();
}
staticclass ThrowTask implements Runnable{
publicvoid run(){
System.out.println("I'minvoked.");
thrownew RuntimeException();
}
}
}
上述的ScheduledExecutorService例子没有抛出,可以正常打印出两行I'm invoked.
5.Callable和Future:
Runnable是Executor框架常用的任务基本表达形式,但是其run方法不能返回一个值或者抛出受检查的异常。
Callable类似于Runnable,其call方法可以等待返回值,并为可能抛出的异常预先做好准备。
Future描述了任务的生命周期,并提供了相关的方法来获得任务的结果、取消任务以及检验任务是否已经完成或者被取消。ExecutorService中所有的submit方法都返回一个Future。
使用Runnable/Callable和Future可以提高任务的并行性,例子代码如下:
publicclass FutureRender{
privatefinal ExecutorService executor = ......;
publicvoid renderPage(CharSequence source){
finalList<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>>task = new Callable<List<ImageData>>(){
publiclist<ImageData> call(){
List<ImageData>result = new ArrayList<ImageData>();
for(ImageInfoimageInfo : imageInfos){
result.add(imageInfo.downloadImage());
}
returnresult;
}
};
Future<List<ImageData>>future = executor.submit(task);
renderText(source);
try{
List<ImageData>imageDatas = future.get();
for(ImageDatadata : imageDatas){
renderImage(data);
}
}catch(InterruptedExceptione){
Thread.currentThread().interrupt();
future.cancel(true);
}catch(ExecutionExceptione){
throwlaunderThrowable(e.getCause());
}
}
}
注意:只有大量相互独立且同类的任务进行并发处理时,会将程序的任务量分配到不同的任务中,才能正在获得并发性能的提高;而对异类任务的并发处理则会因为任务协调的开销,不一定能获得性能的提高。
6.CompletionService介绍:
CompletionService整合了Executor与BlockingQueue的功能,可以将一个批处理任务提交给给它执行,然后返回一个包含每个任务执行结果的QueueingFuture队列,通过调用队列的take和poll方法,可以获得包含每个任务执行结果的Future。
CompletionService的例子代码如下:
publicclass CompletionServiceRender {
privatefinal ExecutorService executor;
publicCompletionServiceRender(ExecutorService executor) {
This.executor= executor;
}
publicvoid renderPage(CharSequence source) {
final List<ImageInfo>imageInfos = scanForImageInfo(source);
CompletionService<ImageData>service = new ExecutorCompletionService<ImageData>(
executor);
for(final ImageInfo imageInfo : imageInfos) {
service.submit(newCallable<ImageData>() {
publicImageData call() {
returnimageInfo.downloadImage();
}
});
}
renderText(source);
try{
for(int i = 0; i < imageInfos.size(); i++) {
Future<ImageDate>f = service.take();
ImageDatadata = f.get();
renderImage(data);
}
}catch (InterruptedException e) {
Thread.currentThread().interrupt();
}catch (ExecutionException e) {
throwlaunderThrowable(e.getCause());
}
}
}
7.线程的取消和关闭:
对于非后台线程,如果取消和关闭不当会导致阻塞JVM无法正常关闭,Java提供了一个协作的中断机制使一个线程能够要求另一个线程停止当前工作。
Java中常用的取消和关闭策略如下:
(1).非阻塞方法:
使用volatile域保存取消状态,在每次操作时检测该状态。
(2).阻塞方法:
线程可能永远不会检测取消标志,因此使用volatile域保存取消状态的方案不可行,需要使用线程中断。
线程中断是一个协作机制,一个线程给另一个线程发送信号,通知它在下一个方便时刻(通常称为取消点)停止正在做的工作,去做其他事情。
每个线程都有一个boolean类型的中断状态,在中断的时候该中断状态被设置为true,线程中断相关的方法如下:
publicclass Thread{
//中断目标线程
publicvoid interrupt(){......}
//返回目标线程的中断状态
publicboolean isInterrupted(){......}
//清除当前线程的中断状态,并返回它之前的值
publicstatic boolean interrupted(){......}
......
}
特定阻塞库类的方法都支持中断,中断通常是实现线程取消最明智的选择。
(3).Executor和Future:
Executor线程池可以使用shutdown和shutdownNow方法来关闭线程池。
Future可以使用cancel方法取消任务。
(4).JVM关闭钩子:
在JVM正常关闭时,可以执行使用Runtime.addShutdownHook注册的尚未开始执行的线程(关闭钩子),例子代码如下:
publicvoid start(){
Runtime.getRuntime().addShutdownHook(newThread(){
publicvoid run(){
try{
LogService.this.stop();
}catch(InterruptedExceptionignore){
}
}
});
}
JVM关闭钩子全部是并发执行,因此必须是线程安全,访问共享数据必须要同步,同时小心避免死锁。
JVM关闭钩子常用于服务或应用程序的清理,或者清除OS不能自动清除的资源。
四、线程池高级
1.估算线程池最优大小:
CPU的数量,内存大小,任务是计算、I/O、还是混合操作,任务执行周期的长短,资源的数量来确定线程池的大小,假如整个系统中只有两台打印机,或者真个DB连接池只有20个连接。具体的计算公式:
Ncpu = CPU的数量 =Runtime.getRuntime().availableProcessors();
Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1;
W/C = 等待时间与计算时间的比率;
为了保持处理器达到期望的使用率,最优的线程池大小等于:
Nthreads =Ncpu * Ucpu * (1+ W/C);
N = Ncpu +1
2.配置ThreadPoolExecutor:
ThreadPoolExecutor是Executors中工厂方法newCachedThreadPool、newFixedThreadPool和newScheduledThreadExecutor返回的ExecutorService接口的基本实现,提供了默认的执行策略。
ThreadPoolExecutor还允许通过构造方法定制执行策略,常用构造方法如下:
publicThreadPoolExecutor(int corePoolSize,
intmaximumPoolSize,
longkeepAliveTime,
TimeUnitunit,
BlockingQueue<Runnable>workQueue,
ThreadFactorythreadFactory,
RejectedExecutionHandlerhandler)
{......}
ThreadPoolExecutor配置执行策略的参数如下:
(1).核心池大小:
核心池大小是目标线程池大小,线程池实现试图维护池的大小,即使没有任务执行,池的大小也等于核心池的大小,并且直到工作队列充满前池都不会创建更多的线程。
如果核心池大小设置为0,则在工作队列填满之前线程池不会开始工作;若有一个有限的线程池和一个有限的工作队列,同时又希望所有的线程在没有任务的情况下销毁,则可以将核心线程池大小设置为0来激活这个特性,newCachedThreadPool的核心池大小就是0。
(2).最大池大小:
最大池大小是可同时活动的线程数的上限,若空闲时,会销毁超出核心池大小的多余线程。
newCachedThreadPool将最大池设置为Integer.MAX_VALUE。
(3).存活时间:
即空闲线程超时时间,若一个线程已经闲置的时间超过了存活时间,它将成为一个被回收的候选者,若当前的池大小已经超过了核心池大小,这个线程会被终止掉。
newFixedThreadPool为请求的线程池设置了核心池和最大池的大小,而且存活时间永远不会超时。
newCachedThreadPool默认的超时时间为1分钟。
(4).任务阻塞队列:
当请求到达速度超了线程池处理速度时,线程池将尚未开始执行的任务放入任务阻塞队列中等待。
任务阻塞队列通常有以下3中:
A.无限队列:
newFixedThreadPool和newSingleThreadExecutor默认使用一个无限的LinkedBlockingQueue阻塞队列,若所有工作者线程都处于忙碌,任务将会在队列中等候,若任务持续快速地到达,队列也会无限制地增加。
B.有限队列:
为了避免资源被耗尽,线程池也经常使用有限队列,如ArrayBlockingQueue或有界的LinkedBlockingQueue;当队列满时,新来的任务会使用饱和策略处理。
C.同步移交:
同步移交完全绕开队列,直接将任务从生产者移交给工作者线程。
同步移交适合线程池无限或者可以接受任务被拒绝的情况,newCachedThreadPool就使用同步移交方式。
注意:只有任务彼此独立时,使用有限线程池或有限工作队列才是合理的,若任务之间相互依赖,有限的线程池或工作队列就可能引起线程的饥饿死锁,而使用无限的线程池配置(newCachedThreadPool)可以避免任务相互依赖引起的线程饥饿死锁。
(5).饱和策略:
饱和策略用于规定有界队列充满或线程池被关闭时对新提交任务的处理方案。
JDK提供了如下4中饱和策略:
A.中止(abort):
默认的饱和策略,会抛出未检查的拒绝执行异常RejectedExecutionException,调用者捕获该异常做适当处理。
B.遗弃(discard):
丢弃最新提交的任务。
C.遗弃最旧的(discard-oldest):
丢弃本应该接下来就要执行的任务,并尝试去重新提交新任务,若是优先级队列,则丢弃优先级最高的任务,因此不能混合使用遗弃最旧的饱和策略和优先级队列。
D.调用者运行(caller-runs):
既不会丢最新提交的任务,也不会抛出异常,会把最新提交的任务推回到调用者,由生产者线程执行,一方面给工作者线程时间来追赶进度,另一方面减慢了生产者提交新任务的速度。
通过调节核心池大小和存活时间,可以促进线程池归还空闲线程占用的资源,饱和策略用于应对过载处理。
3.定制线程工厂:
线程池通过线程工厂来创建线程,默认的线程池工厂会创建一个新的,非后台的线程,没有特殊配置。
线程池工厂可以运行定制线程的配置信息,例如:为线程池指定UncaughtExceptionHandler,用于捕获线程抛出的未检查异常以防止线程泄露;实例化一个定制的线程类实例,用来执行调试日志线程;给线程指定名称等等。
线程池工厂只有一个newThread方法,在线程池需要创建一个新线程时使用,例子代码如下:
publicclass MyThreadFactory implements ThreadFactory{
privatefinal String poolName;
publicMyThreadFactory(String poolName){
this.poolName= poolName;
}
publicThread newThread(Runnable runnable){
returnnew MyAppThread(runnable, poolName);
}
}
publicclass MyAppThread extends Thread{
publicstatic final String DEFAULT_NAME = “MyAppThread”;
privatestatic volatile boolean debug = false;
privatestatic final AtomicInteger created = new AtomicInteger();
privatestatic final AtomicInteger alive = new AtomicInteger();
privatestatic final Logger log = Logger.getLogger(MyAppThread.class.getClassName());
publicMyAppThread(Runnable r){
this(r,DEFAULT_NAME);
}
publicMyAppThread(Runnable r, String name){
super(runnable,name + “-” + created.incrementAndGet());
setUncaughtExceptionHandler(newThread.UncaughtExceptionHandler(){
publicvoid uncaughtException(Thread t, Throwable e){
log.log(Level.SEVER,“Uncaught in thread ” + t.getName(), e);
}
});
}
publicvoid run(){
if(debug){
log.log(Level.FINE,“Created ” + t.getName());
}
try{
alive.incrementAndGet();
super.run();
}finally{
alive.decrementAndGet();
if(debug){
log.log(Level.FINE,“Exiting ” + t.getName());
}
}
}
publicstatic int getThreadsCreated(){
returncreated.get();
}
publicstatic int getThreadsAlive(){
returnalive.get();
}
publicstatic boolean isDebug(){
returndebug;
}
publicstatic void setDebug(boolean debug){
this.debug= debug;
}
}
4.扩展ThreadPoolExecutor:
ThreadPoolExecutor提供了以下3个生命周期的钩子方法让子类扩展:
(1).beforeExecute:
任务执行前,线程会调用该方法,可以用来添加日志、监控或者信息收集统计。
若beforeExcute方法抛出了RuntimeException,线程的任务将不被执行,afterExecute方法也不会被调用。
(2).afterExecute:
任务执行结束后,线程会调用该方法,可以用来添加日志、监控或者信息收集统计。
无论任务正常返回或者抛出异常(抛出Error不能被调用),该方法都会被调用。
(3).terminate:
线程池完成关闭动作后调用,可以用来释放资源、发出通知、记录日志或者完成统计信息等。
一个扩展ThreadPoolExecutor的例子代码如下:
publicclass TimingThreadPool extends ThreadPoolExecutor{
privatefinal ThreadLocal<Long> startTime = new ThreadLocal<Long>();
privatefinal Logger log = Logger.getLogger ( TimingThreadPool.class.getClassName() );
privatefinal AtomicLong numTasks = new AtomicLong();
privatefinal AtomicLong totalTime = new AtomicLong();
protectedvoid beforeExecute(Thread t, Runnable r){
super.beforeExecute(t,r);
log.fine(String.format(“Thread%s: start %s”, t, r));
startTime.set(System.nanoTime());
}
protectedvoid afterExecute(Runnable r, Throwable t){
try{
longendTime = System.nanoTime();
longtaskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread%s: end %s, time=%dns", t, r, taskTime));
}finally{
super.afterExecute(r,t);
}
}
protectedvoid terminated(){
try{
log.info(String.format("Terminated:avg time=%dns",
totalTime.get()/ numTasks.get()));
}finally{
super.terminated();
}
}
}
五、自定义线程池
六、活跃度问题
1.死锁的定义及发生条件:
死锁就像是两个人过独木桥,在桥中间碰到了,谁也不想让步,结果谁也无法通过。
线程A占有锁L时想要获得锁M,而线程B占有锁M并尝试获得锁L,两个线程将永远等待下去,这种情况称为死锁(deadlock),或致命拥抱(the deadly embrace)。
在并发程序设计中,死锁 (deadlock) 是一种十分常见的逻辑错误。通过采用正确的编程方式,死锁的发生不难避免。
死锁发生的四个必要条件:
(1).互斥:存在这样一种资源,它在某个时刻只能被分配给一个执行者(也称为线程)使用;
(2).持有:当请求的资源已被占用从而导致执行者阻塞时,资源占用者不但无需释放该资源,而且还可以继续请求更多资源;
(3).不可剥夺:执行者获得到的互斥资源不可被强行剥夺,换句话说,只有资源占用者自己才能释放资源;
(4).环形等待:若干执行者以不同的次序获取互斥资源,从而形成环形等待的局面,即锁顺序死锁。
上面四个条件只要打破任何一个,死锁问题就可以解决。
数据库系统通过在表示正在等待关系的有向图上搜索循环来检测死锁,一旦发现死锁会牺牲一个调用者,使它退出事务释放资源,从而使其他事务能够继续进行。
JVM中线程发送死锁时,应用系统可能完全停止或者某个特定子系统停止,也可能是性能受到影响,死锁不但经常发生在高负载情况下,而且难以测试和重现,恢复应用程序健康状态的唯一方式是重启应用。
2.动态锁顺序死锁:
线程发送死锁必要条件中的环形等待容易产生锁顺序死锁,下面例子代码演示一个简单的锁顺序死锁:
publicclass LeftRightDeadlock {
privatefinal Object left = new Object();
privatefinal Object right = new Object();
publicvoid leftRight() {
synchronized(left) {
synchronized(right) {
doSomething();
}
}
}
publicvoid rightLeft() {
synchronized(right) {
synchronized(left) {
doSomethingElse();
}
}
}
}
若一个线程调用leftRight方法,而另一个线程调用rightLeft方法,则很有可能发生死锁。
验证锁顺序一致性需要对程序中锁的行为进行整体分析,单独观察每一个锁的代码路径是不充分的,因为leftRight方法和rightLeft方法获得锁的方式都是合法的,只是它们彼此不能相互协调而已。
在实际的生产代码中,很少会出现像上面例子代码中这么明显的锁顺序死锁代码,通常会发生如下的隐式锁顺序死锁问题:
下面的转账例子演示一个动态的锁顺序死锁,代码如下:
publicvoid transferMoney(Account fromAccount, Account toAccount, DollarAmount amount)
throwsInsufficientFundsException{
synchronized(fromAccount){
synchronized(toAccount){
if(fromAccount.getBalance()pareTo(amount)< 0){
thrownew InsufficientFundsException();
}else{
fromAccount.debit(amount);
toAccount.credit(amount);
}
}
}
}
若另个线程同时调用transferMoney方法,一个从X向Y转账,另一个从Y向X转账,如:
A:transferMoney(xAccount,yAccount, 10);
B:transferMoney(yAccount,xAccount, 20);
在偶然情况下很容易产生线程A获得xAccount的锁,等待yAccount的锁;线程B获得yAccount的锁,等待获取xAccount的锁,即产生锁顺序死锁。
上述隐式锁顺序死锁的例子中锁的顺序是由外部输入参数顺序决定的,解决此类锁顺序死锁的方法是确保线程以通用的固定顺序获得锁,制定锁顺序例子代码如下:
privatestatic final Object tieLock = new Object();
publicvoid transferMoney(Account fromAccount, Account toAccount, DollarAmount amount)
throwsInsufficientFundsException{
classHelper{
publicvoid transferMoney()throws InsufficientFundsException{
if(fromAccount.getBalance()pareTo(amount)< 0){
thrownew InsufficientFundsException();
}else{
fromAccount.debit(amount);
toAccount.credit(amount);
}
}
}
intfromHash = System.identityHashCode(fromAccount);
inttoHash = System.identityHashCode(toAccount);
if(fromHash< toHash){
synchronized(fromAccount){
synchronized(toAccount){
newHelper().transferMoney();
}
}
}elseif(fromHash > toHash){
synchronized(toAccount){
synchronized(fromAccount){
newHelper().transferMoney();
}
}
}else{
synchronized(tieLock){
synchronized(fromAccount){
synchronized(toAccount){
newHelper().transferMoney();
}
}
}
}
}
确保所有的线程以通用固定的顺序获得锁可以有效解决锁顺序死锁。
3.协作对象的锁顺序死锁:
很多情况下,锁顺序死锁发生总不像动态锁顺序死锁例子的那么明显,即很少在一个方法里面显式调用两个锁,但是同步方法的调用外部的同步方法时,同步方法上的隐式锁同样会发生锁顺序死锁问题。
以一个出租车GPS调度系统为例来演示非开放调用的锁顺序死锁问题:
publicclass Taxi {
privatePoint location, destination;
privatefinal Dispatcher dispatcher;
publicTaxi(Dispatcher dispatcher) {
this.dispatcher= dispatcher;
}
publicsynchronized Point getLocation() {
returnlocation;
}
publicsynchronized void setLocation(Point location){
this.location= location
if(location.equals(destination)){
dispatcher.notifyAvailable(this);
}
}
}
publicclass Dispatcher {
privatefinal Set<Taxi> taxis;
privatefinal Set<Taxi> availableTaxis;
publicDispatcher() {
taxis= new HashSet<Taxi>();
availableTaxis= new HashSet<Taxi>();
}
publicsynchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
publicsynchronized Image getImage() {
Imageimage = new Image();
for(Taxi t : taxis) {
image.drawMarker(t.getLocation());
}
returnimage;
}
}
线程A:调用setLocation方法作为对GPS接收器更新的响应,则首先获得Taxi对象的锁更新位置,若已经到达目的地,则调用Dispatcher的notifyAvailable方法获取Dispatcher对象的锁来通知有出租车可用。
线程B:调用getImage方法来获取出租车的位置,则首先获取Dispatcher对象的锁,然后调用Taxi的getLocation方法获取Taxi对象的锁来获取出租车的位置。
在偶发情况线程A获取了Taxi对象的锁,等待Dispatcher对象的锁;线程B获取了Dispatcher对象的锁,等待Taxi对象的锁,产生锁顺序死锁。
上述协作对象锁顺序死锁是由非开放调用引起的,非开放调用是指在持有锁时调用外部方法,外部方法可能会获得其他锁(同步方法,产生死锁的风险),或者遭遇严重超时的阻塞,当持有锁时会延迟其他试图获得该锁的线程。
解决协作对象的锁顺序死锁的方法是使用开放调用,开放调用是指调用方法时不需要持有锁,使用开放调用解决死锁问题的例子代码如下:
publicclass Taxi{
privatePoint location, destination;
privatefinal Dispatcher dispatcher;
publicTaxi(Dispatcher dispatcher){
this.dispatcher= dispatcher;
}
publicsynchronized Point getLocation(){
returnlocation;
}
publicvoid setLocation(Point location){
booleanreachedDestination;
synchronized(this){
this.location= location;
reachedDestination= location.equals(destination);
}
if(reachedDestination){
dispatcher.notifyAvailable(this);
}
}
}
publicclass Dispatcher{
privatefinal Set<Taxi> taxis;
privatefinal Set<Taxi> availableTaxis;
publicDispatcher(){
taxis= new HashSet<Taxi>();
availableTaxis= new HashSet<Taxi>();
}
publicsynchronized void notifyAvailable(Taxi taxi){
availableTaxis.add(taxi);
}
publicsynchronized Image getImage(){
Set<Taxi>taxisCopy;
synchronized(this){
taxisCopy=new HashSet<Taxt>(taxis);
}
Imageimage = new Image();
for(Taxit : taxisCopy){
image.drawMarker(t.getLocation());
}
returnimage;
}
}
在程序中尽量使用开放调用,依赖于开放调用的程序,相比于那些在持有锁的时候还调用外部方法的程序,更容易进行死锁自由度的分析。
4.饥饿死锁:
饥饿就像3个人打篮球,其中两个人牢牢把控着球,第三个人死活也得不到球,变成了现场观众。
饥饿死锁是指线程无法得到资源(cpu或io资源或数据库连接池资源等),所以无法执行下去,称为饿死,比较常见的就是在优先级调度中,不停的有高优先级的线程创建,导致低优先级的线程的无法分配到cpu,从而饥饿。
饥饿死锁的例子如下:
publicclass ThreadDeadLock {
ExecutorServiceservice = Executors.newSingleThreadExecutor();
classRenderPageTask implements Callable<String> {
publicString call()throws Exception{
Future<String>header, footer;
header= service.submit(new LoadFileTask("header.html"));
footer= service.submit(new LoadFileTask("footer.html"));
Stringpage = renderBody();
returnheader.get() + page + footer.get();
//出现饥饿死锁,任务等待子任务结果
}
}
}
上述饥饿死锁的例子中,一个任务将获取页眉和页脚工作提交到单线程化的线程池中,同时等待该单线程化线程池中获取页眉和页脚任务的执行结果,最后将页眉、页脚和页面主体组合渲染页面,在该情况下,获取页眉任务等待获取页脚任务的完成,而获取页脚的任务等待获取页眉任务的完成,结果在该单线程化线程池中执行的两个任务永久停滞下来。
通常需要等待其他任务的结果(任务有依赖关系)的任务是产生线程饥饿死锁的来源,有界池和相互依赖的任务不能放在一起使用。
说起饥饿想起了曾经经历过的一个笑话,上大学的时候,有一次宿舍里面的电脑集体中毒了,尝试了很多主流的杀毒软件也没有解决,最后大家一致决定格式化硬盘重新安装操作系统,这是有个室友很淡定地关机睡觉,我们问他什么时候重装系统,他说他打算两个星期不开机饿死病毒。虽然这个笑话很冷,但也阐述了饥饿的基本原理,只不过在让病毒饥饿的同时他自己的工作也被饥饿了。
5.活锁:
活锁好比两个过于礼貌的人在半路相遇,他们都给对方让路,于是又在另一条路上相遇了,然后就这样不停的一直让路下去,导致没有人能够通过,活锁是一种特殊形式的饥饿。
活锁是尽管线程没有被阻塞,线程却仍然不能继续,因为它不断重试相同的操作,却总失败。
活锁通常发生在如下两种情况:
(1).过渡的错误恢复代码:
以消息处理为例,若消息处理失败,其中传递消息的底层架构会退回整个事务,并把它置回队首,若消息处理程序对某种特定类型消息处理存在bug,每次都会处理失败,那么每次这个消息都会被从队列中取出,传递到存在问题的消息处理器,然后发生事务回退,然后消息又被置回队首,如此反复,虽然消息处理器并没有阻塞,但是线程却永远无法继续执行下去了。
这种形式的活锁是误将不可修复的错误当做可以修复的错误来处理而导致的。
(2).多个相互协作的线程修改状态:
多个相互协作的线程为了彼此间响应而不停修改状态,使得没有一个线程能够继续(类似于让路的例子),那么就发生了活锁。
解决活锁的一种方案是对重试机制引入随机性,例如在以太网上发送数据包时,如果发现了数据包冲突,发送的数据包的各方会随机等待一个时间继续重试,在并发程序中也类似,通过随机等待重试可以有效避免活锁。
6.死锁的避免与诊断:
前面已经介绍了死锁常见的解决方法,总结如下:
(1).定制锁顺序,使用一致的锁顺序避免锁顺序死锁。
(2).使用开放调用避免隐式锁顺序死锁。
(3).在使用显式Lock时,使用定时的tryLock特性,使得超时之后自动释放锁。
诊断死锁的方法:
当怀疑发生死锁时,强制JVM产生线程转储,通过分析线程转储分析死锁信息。
七、性能优化
1.Amdahl定律:
Amdahl(阿姆达尔)定律描述了在一个系统中,基于可并行化和串行化的组件各自所占的比重,程序通过获得额外的计算资源,理论上能够加速多少。若F是必须串行化执行的比重,那么在一个N处理器的机器中,通过Amdahl定律计算可以获得理论加速为:
Speedup<=1/(F+(1-F)/N)
当N无限增大趋近与无穷大时,Speedup的最大值无限接近于1/F。
Amdahl估算理论加速例子如下:
(1).在CPU无限多的情况下,一个程序中若有50%的处理需要串行,则Speedup只能提高2倍;若程序中有10%需要串行执行,则Speedup最多只能提高10倍。
(2).一个程序中若有10%是串行执行,在拥有10个CPU的情况下,Speedup最多只能达到5.3倍;在拥有100个CPU的情况下,Speedup最多只能达到9.2倍。
所有的并发程序中都有串行执行的部分,因此在评估添加硬件资源对并发程序性能提高时,Amdahl定律是理论基础和计算依据。
2.减少锁的竞争:
Amdahl定律告诉我们串行化会损害可伸缩性;线程上下文切换会影响性能;锁竞争会同时影响可伸缩性与性能。
Little定律:稳定系统中,顾客的平均数量=顾客的平均到达率*顾客在系统中的平均等待时间,通过Little定律我们可得出影响锁竞争性的两个因素:锁被请求的频率、每次持有该锁的时间。
下面3种方式可以降低锁竞争:
(1).减少锁的持有时间:
通过将与锁无关的代码移出同步代码块,缩小同步范围可以减少锁的持有时间,从而减少锁竞争,下面的例子代码演示锁时间持有过长同步范围过大:
publicclass AttributeStore {
private final Map<String, String>attributes = new HashMap<String, String>();
public synchronized booleanuserLocationMatches(String name, String regexp){
String key = "users." + name+ ".location";
String location = attributes.get(key);
if(location == null){
return false;
}else{
return Pattern.matches(regexp,location);
}
}
}
上面例子代码中,只有从map中获取location时需要同步,其他的正则表达式匹配等都不需要同步,我们可以通过减少同步范围来减少锁持有时间,改进后的例子代码如下:
publicclass AttributeStore {
private final Map<String, String>attributes = new HashMap<String, String>();
public boolean userLocationMatches(Stringname, String regexp){
String key = "users." + name+ ".location";
synchronized(this){
String location =attributes.get(key);
}
if(location == null){
return false;
}else{
return Pattern.matches(regexp,location);
}
}
}
减少同步范围可以减少串行化执行比重,通过Amdahl定律知道可以提高可伸缩性。
(2).降低锁的请求频率:
若一个应用中只有一个全局锁,而不是为每个对象分配一个独立锁,那么所有同步的操作都变成了串行顺序执行,若将锁请求分配到更多锁对象上,单个锁的请求频率将会被降低,降低锁的请求频率通常有两种方法:
A.锁分拆(lock splitting):
锁分拆是指将多个相互独立状态变量由一个锁来保护分离成为每个独立状态变量由一个单独的锁来保护。
下面的例子代码演示一个锁保护多个独立状态变量:
publicclass ServerStatus{
private final Set<String> users;
private final Set<String> queries;
public synchronized void addUserP|(Stringuser){
users.add(user);
}
public synchronized void addQueries(Stringquery){
queries.add(query);
}
public synchronized void removeUser(Stringuser){
users.remove(user);
}
public synchronized void removeQuery(Stringquery){
querys.remove(query);
}
}
下面例子代码演示锁分拆技术,例子代码如下:
publicclass SplitedServerStatus{
private final Set<String> users;
private final Set<String> queries;
public void addUser(String user){
synchronized(users){
users.add(user);
}
}
public void addQueries(String query){
synchronized(queries){
queries.add(query);
}
}
public void removeUser(String user){
synchronized(users){
users.remove(user);
}
}
public void removeQuery(String query){
synchronized(queries){
queries.remove(query);
}
}
}
将一个竞争适中而不竞争激烈的锁,分离为两个锁,可以最大限度地提高可伸缩性和性能。
B.锁分离(lock striping):
将一个竞争激烈的锁分拆为两个锁时,这两个锁可能都竞争激烈,虽然采用两个线程并发执行能提高一部分可伸缩性,但是在一个拥有多处理器系统中,仍然无法给可伸缩性带来极大的提高,这时我们需要使用锁分离技术。
锁分离是锁分拆技术的扩展,将被加锁对象分成可大可小加锁块的集合,并且这些集合使用相互独立的锁对象。JDK1.5引入的ConcurrentHashMap就使用的是锁分离技术,ConcurrentHashMap的实现中使用一个包含16个锁的数组,每个锁包含散列桶的1/16,其中第N个散列桶由第(N mod 16)个锁来保护。若哈希算法实现的比较好,即哈希值均匀分布,则ConcurrentHashMap的锁分离可以将锁请求减少为原来的1/16,并且可以支持多达16路的并发写入。
下面的例子代码演示锁分离:
publicclass StripedMap{
private static final int N_LOCKS = 16;
private final Node[] buckets;
private final Object[] locks;
private static class Node{......}
public StripedMap(int numBuckets){
buckets = new Node[numBuckets];
locks = new Object[N_LOCKS];
for(int i = 0; i < N_LOCKS; i++){
locks[i] = new Object();
}
}
private final int hash(Object key){
return Math.abs(key.hashCode() %buckets.length);
}
public Object get(Object key){
int hash = hash(key);
synchronized(locks[hash % N_LOCKS]){
for(Node n = buckets[hash]; n !=null; n = n.next){
if(n.key.equals(key)){
return n.value;
}
}
}
return null;
}
public void clear(){
for(int i = 0; i < buckets.length;i++){
synchronized(locks[i % N_LOCKS]){
buckets[i] = null;
}
}
}
......
}
(3).放弃使用独占锁:
使用并发容器、读-写锁,不可变对象,原子变量等代替独占锁来管理共享对象可以获得比独占锁更好的可伸缩性与性能。
ReadWriteLock读-写锁实现了单写入多读取情况下的加锁机制,读取操作可以同时访问共享变量,而写入操作采用独占锁方式,对于读取操作占多数的情况下,读-写锁可以提供比独占锁更好的并发性。
读写锁例子如下:
publicclass RWDictionary {
private final Map<String, Data> m =new TreeMap<String, Data>();
private final ReadWriteLock rwl = newReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w =rwl.writeLock();
public Data get(String key) {
r.lock();
try {
return m.get(key);
}finally {
r.unlock();
}
}
public Data put(String key, Data value) {
w.lock();
try {
return m.put(key, value);
}finally {
w.unlock();
}
}
......
}
ReentrantReadWriteLock读-写锁默认是公平的,也可以显式指定为非公平的,其区别如下:
A.公平模式:
选择权交给等待时间最长的线程,若锁由读者线程获得,而一个线程请求写入锁,那么不再允许读者获得读取锁,直到写线程被受理并且已经释放写入锁。
B.非公平模式:
线程允许访问的顺序是不确定的,由写锁降级为读锁是允许的,反之由读锁升级为写锁是禁止的(可能会导致死锁)。
3.ReentrantLock与synchronized的选择:
ReentrantLock实现了Lock接口,提供了与synchronized内部锁相同的互斥、内存可见性以及可重入加锁语义,与synchronized内部锁不同的是,提供了无条件的、可轮询的、可定时的、可中断的锁操作。ReentrantLock与synchronized使用时的选择考量:
(1).要使用可轮询、可定时、可中断、公平性等特性的使用ReentrantLock:
A.ReentrantLock的可轮询特性可以避免锁顺序死锁,若无法同时获得两个锁会回退释放所获得的锁,以银行转账为例演示可轮询特性:
publicboolean transferMoney(Account fromAccount,
AccounttoAccount,
DollarAmount amount,
long timeout,
TimeUnit unit)
throwsInsufficientFundsException, InterruptedException{
long fixedDelay = getFixedDelayComponentNanos(timeout,unit);
long randMod =getRandomDelayModulusNanos(timeout, unit);
long stopTime = System.nanoTime() +unit.toNanos(timeout);
while(true){
if(fromAccount.lock.tryLock()){
try{
if(toAccount.lock.tryLock()){
try{
if(fromAccount.getBalance()pareTo(amount) < 0){
throw newInsufficientFundsException();
}else{
fromAccount.debit(amount);
toAccount.credit(amount);
return true;
}
}finally{
toAccount.unlock();
}
}
}finally{
fromAccount.unlock();
}
}
if(System.nanoTime() < stopTime){
return false;
}
NANOSECONDS.sleep(fixedDelay +rnd.nextLong() % randMod);
}
}
B.ReentrantLock的可定时特性使得在超出规定时间内没有结束的线程释放所获得的锁,避免线程无限阻塞,例子代码如下:
publicboolean trySendOnSharedLine(String message, long timeout, TimeUnit unit)
throwsInterruptedException{
long nanosToLock = unit.toNanos(timeout) -estimatedNanosToSend(message);
if(!lock.tryLock(nanosToLock,NANOSECONDS)){
return false;
}try{
return sendOnSharedLine(message);
}finally{
lock.unlock();
}
}
C.ReentrantLock的可中断特性使得可以在可取消的任务中使用,线程在响应中断时可以获得锁,例子代码如下:
publicboolean sendOnSharedLine(String message) throws InterruptedException{
lock.lockInterruptibly();
try{
returncancellableSendOnSharedLine(message);
}finally{
lock.unlock();
}
}
publicboolean cancellableSendOnSharedLine(String message) throwsInterruptedException{......}
D.ReentrantLock默认是非公平锁,可以显式指定为公平锁。
在锁竞争激烈情况下(大多数情况下)非公平锁性能比较好;当持有锁的时间比较长,或者请求锁的平均时间间隔比较长情况下,公平锁性能比较好。
(2).一般情况下使用synchronized:
synchronized内部锁是内置于JVM的,编译器可以在运行时动态进行锁消除、锁粗化等优化,同时线程转储中synchronized的锁信息比较详细便于调试。
ReentrantLock必须在finally中释放锁,如果忘记释放则会造成严重错误。因此只有synchronized不能满足需求,需要使用ReentrantLock高级特性时才使用ReentrantLock,一般情况下推荐优先使用synchronized。
4.使用Condition代替notify/notifyAll:
在JDK1.5之前,经常使用Object对象的wait和notify/notifyAll作为线程协调机制,以一个有限缓存队列为例,演示例子代码如下:
publicabstract class BasesBoundedBuffer<V> {
private final V[] buf;
private int count;
protected BaseBoundedBuffer(int capacity) {
this.buf = (V[]) new Object[capacity];
}
protected synchronized void doPut(Vv){......};
protected synchronized voiddoTake(){......};
public synchronized final boolean isFull(){
return count == buf.length;
}
public synchronized final boolean isEmpty(){
return count == 0;
}
}
publicclass WaitNofiyBoundedBuffer<V> extends BasesBoundedBuffer<V> {
public synchronized void put(V v) throwsInterruptedException {
while (isFull()) {
wait();
}
doPut(v);
notifyAll();
}
public synchronized V take() throwsInterruptedException {
while (isEmpty()) {
wait();
}
V v = doTake();
notifyAll();
return v;
}
}
notify和notifyAll的区别如下:
(1)notifyAll:
唤醒所有正在条件队列中等待的线程,让他们去竞争锁,只有一个线程会获得锁,其他的线程又重新回到等待状态,会带来大量的线程上下文切换和大量的锁竞争,效率低下。
(2).notify:
只是选择一个等待状态线程进行通知,并使它获得该对象上的锁,但不通知其他同样在等待被该对象唤醒的线程们,若被唤醒的线程发现其执行条件不成立,则继续转回等待状态,而其他执行条件成立的线程不会被唤醒,一直在等待已经过期的信号,这就发生了被劫持的信号(或丢失的信号)问题。
一般情况下为了避免被劫持的信号情况发生,通常使用notifyAll代替notify,当且仅当只有一个被激活线程且条件谓词(如上面例子中队列满、空等)只有一个时才使用notify。
为了克服notifyAll低效的缺点,JDK1.5引入了Condition,下面的例子使用Condition代替notifyAll/notify演示上述的有限缓存队列:
publicclass ConditionBoundedBuffer<V> extends BasesBoundedBuffer<V>{
protected final Lock lock = newReentrantLock();
private final Condition notFull =lock.newCondition();
private final Condition notEmpty =lock.newCondition();
public void put(V v) throwsInterruptedException{
lock.lock();
try{
while(isFull()){
notFull.await();
}
doPut(v);
notEmpty.signal();
}finally{
lock.unlock();
}
}
public V take() throws InterruptedException{
lock.lock();
try{
while(isEmpty()){
notEmpty.await();
}
V v = doTake();
notFull.signal();
return v;
}finally{
lock.unlock();
}
}
}
Condition的signal和signalAll方法类似于Object的notify和notifyAll,但是Condition的signal克服了notify的信号劫持问题,同样又避免了notifyAll的效率低下问题。
Condition的await和signal方法必须在持有锁对象时调用。
Condition与Object的notify/notifyAll的选择类似于ReentrantLock与synchronized之间的选择。
更多推荐
多线程编程总结
发布评论