多线程与高并发 笔记,非面向初学者 一、JUC锁,一些面试题和源码讲解"/>
多线程与高并发 笔记,非面向初学者 一、JUC锁,一些面试题和源码讲解
网页左边,向下滑有目录索引,可以根据标题跳转到你想看的内容 |
---|
1、基本概念
并行和并发 |
---|
并发:同时发布任务,比如早上7点到10点,有人工地搬砖,有人去上课,有人去办公室办公,同一时间段,干不同的事,(不同的事,在同一时间段同时发生了) ,单核cpu,多个进程抢占资源 |
并行:同时执行任务,比如大家都早上7点起床,然后所有人干相同或不同的工作,同一时间点上,干不同的事 ,多核cpu,好多事,分配到不同内核上,同时进行 |
启动线程的3种方式 |
---|
Thread,Runnable,线程池Executors.newCachedThrad(或者说lambda表达式) |
实际启动线程无非就是实现Thread类,或者继承Runnable接口,而问出第三种方式,就可以答线程池或者lambda表达式,这两种实际也是Thread或者Runnable |
线程基本方法 |
---|
sleep:让当前线程睡眠一段时间 |
yield:谦让一下,让下一个线程先过去,比如当前线程在运行时,碰见yield方法,就会谦让一下让其它线程先运行(就是回到等待队列中),其它运行完了,cpu调度从队列中选线程,而能不能再次选到自己,就看cpu调度了,并不是谦让完后,下一个还是自己执行 |
join:加入到指定线程,等待它执行完,自己继续执行,这个方法如果调自己是没有意义的,通常是t1线程运行时,调t2.join,此时t1加入到t2线程,让t2获得cpu执行,执行完后,t1继续执行 |
锁的基本方法 |
---|
锁是锁一个对象,无论是一个new出来的对象,还是this还是类对象,都继承与Object,Object有wait和notify两个方法 |
wait():锁睡眠,让锁里面的线程睡眠等待 ,同时将锁释放 ,其它线程得以抢到锁(锁了同一个对象的锁),执行代码 |
notify():唤醒,通知睡眠线程继续执行,但是不会释放锁 ,只是让锁中线程继续执行(前提是锁中没有其它线程在执行,如果有,等待其他线程执行结束,自己继续执行) |
public class TT {public static void main(String[] args) {Thread[] threads = new Thread[2];//保存两个线程Object o = new Object();//用来加锁的对象for(int i=0;i<2;i++){//为两个线程赋值int finalI = i;//保存变量为常量,让值不可变threads[i] = new Thread(()->{synchronized (o){//加锁if(finalI == 1) o.notify();//如果是第二个线程就通知其它线程,让线程0可以继续执行System.out.println(Thread.currentThread().getName()+" "+1);try {o.wait();//锁睡眠,当前线程被睡,锁被释放,其它线程可以抢到锁,得以执行} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+" "+2);o.notify();//唤醒,通知被睡眠线程继续执行}},"thread"+i);}for (Thread t:threads){t.start();}/** thread0 1thread1 1thread0 2thread1 2以上是结果* 线程0输出完后,进入wait()方法,此时锁睡眠,其它线程可以进入 此时thread1进入* 进入后,if语句finalI==1,执行notify唤醒锁,thread0得以继续执行,但是此时thread1正在执行,输出后遇到wait()睡眠* thread0继续执行,输出thread0 2,执行notify()唤醒锁,线程thread1继续执行* thread1执行输出thread1 2,再次唤醒锁,但是锁没有锁,所以没有变化* */}
}
ThreadLocal |
---|
让每个线程都拥有指定类型的,独立的资源,多个线程相互不冲突 |
相当于,每个线程,在本地都有个名字一样的容器,每个线程间,容器相互独立 |
根据源码一步步剖析得到的理论如下:进入set方法后,发现它将值存到了map中,这个map是在Thread类中的 |
最终发现,值存到了Thread.currentThread的某个map中,就是当前线程对象的map集合中,所以实现了,每个线程ThreadLocal独立的效果 |
/*** ThreadLocal线程局部变量** ThreadLocal是使用空间换时间,synchronized是使用时间换空间* 比如在hibernate中session就存在与ThreadLocal中,避免synchronized的使用** 运行下面的程序,理解ThreadLocal* * @author 马士兵*/
import java.util.concurrent.TimeUnit;public class ThreadLocal2 {//volatile static Person p = new Person();static ThreadLocal<Person> tl = new ThreadLocal<>();//本地线程,表示我这里的值,除了我指定的Person的线程之外,其它线程获取值是获取不到的public static void main(String[] args) {//以下代码,理论来讲,第一个线程2秒后获取值,第二个线程1秒后设置值,也就是说,第一个线程获取值的时候,因为第二个线程已经提前设置了值//所有第一个线程会获取到第二个线程设置的值(不使用ThreadLocal确实会这样)//但是ThreadLocal的作用就是,非本线程,不能操作和获取值。也就是说,每一个线程,只能操作自己的ThreadLocaltl.set(new Person("线程开始前"));new Thread(()->{try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
// tl.set(new Person());System.out.println(tl.get());//这里我们获取值}).start();new Thread(()->{try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}tl.set(new Person("第二个线程改"));//这里我们设置值}).start();}static class Person {String name = "zhangsan";public Person(){}public Person(String name){this.name = name;}}
}
2、volatile
volatile这个单词就是可变的,容易变的,所以这个关键字就是用来修饰哪些容易改变的量,仔细侦听,跟踪,变量是否发生改变
package com.yzpnb.controller;import java.util.concurrent.TimeUnit;public class TestController {volatile boolean running = true;//volatile关键字修饰的,会仔细跟踪void m() {System.out.println("m start");while(running){}System.out.println("m end");}public static void main(String[] args) {TestController t = new TestController();new Thread(t::m,"t1").start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}t.running = false;}}
volatile |
---|
保证线程可见性:从上面例子可以看到,加上关键字后,产生我们想要的结果,原因就是,running变量是在堆内存的一个变量,而主线程main和普通线程t1同时操作这个值,他们操作时不是直接操作堆内存的值,而是各自拷贝一份操作,而什么时候同步堆内存的值,并不好控制,这就是线程的不可见性。上面例子中,main线程将running改为false,由于线程不可见性,t1线程无法及时获取running变量的变化,所以它执行while循环的running一只保持true的状态。但是volatile关键字保证线程可见性,main线程将running改为false,t1线程立即就可以看到变化,从而停止while循环. 依靠的是cpu的MESI协议(缓存一致性协议),因为多个cpu内核之间,一个值改了,如果无法及时同步,后果十分严重,所以cpu之间通过MESI协议同步,而java实现也是依靠cpu的缓存一致性协议,归根结底就是依靠硬件实现线程可见性 |
禁止指令重排序(-DCL单例 - Double Check Lock - Mgr06.java):指令重排序,就是CPU执行指令时,发生重新排序,比如JVM虚拟机中,我们java给一个变量赋值,分为3步,1、申请内存(此时会有一个默认值在内存中),2、将设定的值放在内存中,3、将内存空间指向变量 。那么指令重排序 ,可能刚申请完内存,比如一个整型值,此时它默认值是0,然后直接让内存空间指向了变量,最后再设定值,也就是可能将第二步和第三步换了 。对于变量来说,结果是一样的,但是,如果我们写一个秒杀程序,这个变量是一个单例的,一个线程来执行初始化,此时我们肯定要判断变量有没有值,如果有,直接获取对象,而不初始化,没有进行初始化,就执行初始化代码。可是此时发生指令重排序,当内存空间还是默认值0的时候,没来得及赋值想要的值,下一个线程就来了,发现对象已经有值了,直接拿着这个赋了初刷值的对象走了,如果是订单等操作时,本来需要给1000块钱,但是因为初始值是0,你没给钱,就把东西买了。所以我们要给这个变量加上volatile关键字,让这个变量严格按照顺序1,2,3的步骤赋值,而不进行指令重排序 |
3、Synchronized 锁
锁的细节总结,具体讲解在下面 |
---|
锁的是对象不是代码 |
锁的对象,不能是String常量,Integer, Long ,如果其它类库有相同常量,两个锁锁了同一个常量,因为常量在一个内存空间,此时如果是同一个线程,发生重入,不是同一个线程,发生死锁。两个都是不好的结果 |
锁方法时,如果是非静态方法,锁的对象是this,锁的是static静态方法,则锁class对象 |
锁定方法,与非锁定方法,同时执行,就是两个线程同时请求一个锁,一个请求到执行锁中代码,另一个,如果还要请求非锁定方法,那么它会直接去执行非锁定方法,两个方法的代码会同时执行 |
锁升级,偏向锁(保存线程ID)->如果有2个及以上线程请求锁升级为 ->自旋锁(锁住)->如果自旋了10次(就是外面的线程请求10次)还没有执行完升级为->系统锁(重量级锁,直接请求操作系统) |
自旋锁适用于,加锁代码执行时间短,线程数少的(因为,自旋不是将线程放在等待队列,而是占用cpu资源自旋,争锁)。系统锁适用于线程数多的(因为线程会放在等待队列,不占用cpu资源),操作时间长 |
Synchronized(o ){代码}:锁概述 |
---|
锁,并不是锁一段代码同一时刻只能有一个线程操作 |
而是锁一个对象,比如上面括号里的o |
当拿到锁之后,才可以执行代码,而不是将代码锁住了,只是拿到锁的,才能执行代码 |
而锁,锁了什么,就是Synchronized(o)里面的o |
就是说,某个线程,拿到锁之后,执行代码,此时这个线程拿这把锁,把门锁上,其它线程就进不来了,当它执行完了,把锁打开,此时,线程开始抢锁,谁抢到,谁就进去把门锁上,执行代码,不断循环,直到流程结束 |
锁方法 |
---|
假设有一个T类,我们使用Synchronized修饰一个静态static方法,此时相当于锁了T这个类对象,也就是class这个文件,T.class |
当我们修饰一个普通方法时,相当于锁this,也就是当前类实例对象,注意实例对象可以有多个,但是类对象只有一个 |
也就是修饰普通方法,是为每一个实例对象加锁,而修饰static方法,是对这个类文件加锁 |
可重入锁,为什么?面试题,为什么锁可重入 |
---|
有两个加了锁的方法m1和m2,现在当线程t1执行m1时,m1中调用了m2 |
理论上讲,m1加了锁,此时m1无法进入其它线程,此时m1中调用了m2,m2也加了锁,此时,m1需要线程执行,m2也需要线程执行,两个资源同时请求资源,会发生死锁 |
但是实际上,m1调用m2时,发现m2和m1都是一个线程类的,此时可以重入,不发生死锁,称为可重入锁 |
为什么要有可重入锁:因为必须有,比如,一个子类,使用super关键字调用父类方法,那么此时如果线程不可重入,调用super父类方法,发生死锁,java整个执行就全是死锁了 |
子类锁住的方法,调用使用super调用父类方法,锁的是哪个类 |
---|
子类的方法m1调用了父类方法,如果是锁this,也就是锁一个普通方法,不是static的,那么子类m1锁子类的this,此时因为可重入锁,进入父类m1方法,那么因为可重入,所以锁的对象不变,还是子类的this |
如果锁的是static方法,那么子类m1锁子类这个类的类对象,重入锁重入后,同理,还是锁的东西不变,依然是子类的类对象 |
异常锁 |
---|
默认情况下,程序执行过程中出现异常,锁会被释放 |
如果多个线程在请求同一个锁,第一个线程执行时出现异常,此时你不做处理,会立即释放锁,其它的线程抢锁,进入同步代码块,此时其它线程可能访问到异常的数据,所以锁中处理异常要多加小心 |
如果出现异常你不想让锁释放,可以监听异常是通过catch,让线程继续 |
Synchronized 底层如何实现 |
---|
JDK早期,Synchronized是重量级的,直接去OS(操作系统)申请资源 |
后来引入锁升级概念,因为我们需要轻量级一点,不喜欢太重 |
当线程1,请求锁时,markword 此时仅仅记录这个线程的id,这叫偏向锁 ,也就是默认就这一个线程请求 |
当有其它线程来争用,此时进行锁升级,升级为自旋锁 ,就是这锁,不记录id了,看见其它人来直接自己锁住了,其它线程进不来了,但是这个自旋锁有上限 |
当其它线程来争用,10次以后,发现里面的线程还不释放锁出来,此时锁自己也看不下去了,再次升级,升为重量级锁 ,也就是JDK早期的直接去OS申请资源 |
锁升级,一旦升级,无法降级,也就是说,一旦升级为自旋锁,就回不去偏向锁,一旦升级为重量级锁,就回不去偏向锁 |
执行时间短(加锁的代码,不是整个线程代码),线程数少,用自旋锁,线程多,操作时间长用系统锁,也就是重量级锁 |
自旋锁,线程不断旋转争用锁,销毁cpu,系统锁,线程放在等待队列,不占用cpu资源 |
如何优化 |
---|
细锁:比如一个业务逻辑,有一个方法,里面有一句是需要加锁的,那么,我们只给这一句加锁,此为采用细颗粒度锁,使线程争用时间变短,提高效率 |
锁的对象,不能发生改变:一般我们会将对象设置为Final,不可改变,类对象无需final,因为类始终时这个类,不会变,但是一个变量,一个实例对象,是可能改变的,一旦改变,锁无法释放,其它线程永远拿不到锁,得不到执行 |
不要以字符串常量,等一些基本类型作为锁定对象:比如你锁了字符串常量"hello",此时有个类库也锁了这个常量,你的程序不经意间和类库使用了同一把锁,出现死锁阻塞 |
4、CAS(无锁优化,自旋)
CAS Compare And Set(比较并设定) |
---|
因为某些非常常见的操作,总是加锁,太过于麻烦,所以java提供了一些类,可以自动帮我们加锁 |
这些类不是实现了Synchronized,而是CAS |
java中只要是java.util.concurrent.atomic的类或者Atomic 开头的类,都是用CAS操作保证线程安全的类 |
package com.yzpnb.controller;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;public class TestController {AtomicInteger count = new AtomicInteger(0);//此类是创建一个线程安全的Integer型量,参数表示初始值/*synchronized*/ void m() {//这里无需加锁了,因为我们操作的是count,我们需要保证count安全,而countfor (int i = 0 ;i<10000;i++){count.incrementAndGet();//相当于count++,但这是线程安全的}}public static void main(String[] args) {TestController t = new TestController();List<Thread> threads = new ArrayList<>();for (int i = 0;i < 10;i++){threads.add(new Thread(t::m,"Thread"+i));}threads.forEach((o)->o.start());threads.forEach(o->{try {o.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(t.count);}}
实现原理 |
---|
原来,我们想改变一个值,为了线程安全同步,需要加锁 |
现在需要的是cas算法 |
cas(要改的值,期望的值(我们赋新值的时候,期望的原值),新值)if 要改的值 == 期望的值 //如果要改的值,和期望的值不一样,说明已经被人改了,比如我们要改5这个值,此时期望的值就是5,if时发现要改的值变成60了,与期望的值不对应了,说明被人改了,不可以执行赋值操作要改的值 == 新值否则因为要改的和期望的值不同,说明已经被其它线程改了,回去重新执行算法cas操作是CPU原语支持,也就是说,是CPU指令级别的支持,中间不能被打断,也就是if 要改的值 == 期望的值,判断后,不会有人能突然改值,因为不可打断ABA问题
就是如果你在操作过程中,有一个线程进来把值从A改成B,然后又改回A
此时当前线程操作时两个值是相同的解决办法:如果是基础数据类型,无所谓,因为值没变,非要解决,可以使用下面和处理引用类型一样的方法
如果是引用类型,那么就像你女朋友,分手了,和别的男人在一起,然后又回来和你复合,这肯定不是简单的值没变的问题
所以,给值加一个标识,时间戳也好,版本号也好,以此来判断是不是原值
比如A 版本号为1,改为B 版本号为2 改回A 版本号为3
那么cas(这里就需要加上一个判断,判断这个标识,版本号或者时间戳等等)
5、Unsafe类
Unsafe类:可以直接操作Java虚拟机中的内存,让java具备C C++的指针操作内存能力,可以直接用指针定位堆内存操作值 |
---|
下面是一些类中的方法,1.8版本,此类只能通过反射来用,新版本,改成了单例模式,可以通过getUnsafe获取类实例 |
直接操作内存:allocateMemory putXX freeMemory pageSize |
直接生成类实例:allocateInstance |
直接操作类或实例变量:objectFieldOffset,getInt,getObject |
CAS相关操作:compareAndSetObject Int Long,新版本weakCompareAndSetObject Int Long |
6、LongAdder
LongAdder |
---|
此类和AtomicLong一样,可以算递增等操作,但是它采用了分段锁的概念 |
LongAdder类与AtomicLong类的区别在于高并发时前者将对单一变量的CAS操作分散为对数组cells中多个元素的CAS操作,取值时进行求和;而在并发较低时仅对base变量进行CAS操作,与AtomicLong类原理相同。分布式的设计 |
由此,我们会发现,处理高并发时,LongAdder速度更快 |
package com.yzpnb.controller;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;public class TestController {/*创建3个Long,count2是普通long对象,需要Synchronized加锁,count1是普通CAS,count3是分布式,分段锁CAS*/static long count2=0L;static AtomicLong count1 = new AtomicLong(0L);static LongAdder count3 = new LongAdder();/*使用3个count模拟高并发,看谁的速度快,一个1000个线程,每个线程递增十万次*/public static void main(String[] args) throws Exception {Thread[] threads = new Thread[1000];//创建1000个线程,模拟高并发//count1的高并发for (int i=0;i<threads.length;i++){threads[i] = new Thread(()->{for (int k=0;k<100000;k++)count1.incrementAndGet();//递增十万次});}long start = System.currentTimeMillis();//获取高并发开始前时间for (Thread t:threads) t.start();//依次启动所有线程for (Thread t:threads) t.join();//等待,模拟高并发的等待long end = System.currentTimeMillis();//获取结束时间System.out.println("AtomicLong:"+count1.get()+" time"+(end-start));//输出结果和处理时间/*count2的高并发*/Object lock = new Object();//用来锁的对象for (int i = 0;i<threads.length;i++){threads[i] = new Thread(new Runnable() {@Overridepublic void run() {for (int k = 0;k<100000;k++){synchronized(lock){count2++;//递增}}}});}start = System.currentTimeMillis();for (Thread t:threads) t.start();for (Thread t:threads) t.join();end = System.currentTimeMillis();System.out.println("Synchronized:"+count2+" time"+(end-start));//输出/*count3高并发*/for (int i=0;i<threads.length;i++){threads[i] = new Thread(()->{for (int k=0;k<100000;k++)count3.increment();//递增});}start = System.currentTimeMillis();for (Thread t:threads) t.start();for (Thread t:threads) t.join();end = System.currentTimeMillis();System.out.println("LongAdder:"+count1.longValue()+" time"+(end-start));//输出}}
7、JUC同步锁—重要的新锁,旧的锁是Synchronized
1、ReentrantLock 可重入锁
可重入锁 |
---|
就是一个线程执行一个加锁的代码,此时这个线程,执行另一个加锁的代码,两个锁发现是同一个线程,就让线程可以进入,这就是可重入锁 |
public class T01_ReentrantLock1 {synchronized void m1() {for(int i=0; i<10; i++) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(i);if(i == 2) m2();}}synchronized void m2() {System.out.println("m2 ...");}public static void main(String[] args) {T01_ReentrantLock1 rl = new T01_ReentrantLock1();new Thread(rl::m1).start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}//new Thread(rl::m2).start();}
}
/*** reentrantlock用于替代synchronized* 由于m1锁定this,只有m1执行完毕的时候,m2才能执行* 这里是复习synchronized最原始的语义* * 使用reentrantlock可以完成同样的功能* 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)* 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放* @author mashibing*/
package com.mashibing.juc.c_020;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class T02_ReentrantLock2 {Lock lock = new ReentrantLock();//创建可重入锁void m1() {try {lock.lock(); //synchronized(this) 创建锁,锁下面代码块for (int i = 0; i < 10; i++) {TimeUnit.SECONDS.sleep(1);System.out.println(i);}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();//释放锁}}void m2() {try {lock.lock();//锁下面代码块,因为只有一句,就锁这一句System.out.println("m2 ...");} finally {lock.unlock();//释放}}public static void main(String[] args) {T02_ReentrantLock2 rl = new T02_ReentrantLock2();new Thread(rl::m1).start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}new Thread(rl::m2).start();}
}
尝试锁定,tryLock |
---|
如果无法锁定,或者在指定时间内无法获取锁,可以直接执行代码 |
可以根据tryLock返回值判定是否锁定 |
可以使用tryLock(时间),指定tryLock的时间,超时抛出异常 |
public class T03_ReentrantLock3 {Lock lock = new ReentrantLock();void m1() {try {lock.lock();for (int i = 0; i < 3; i++) {TimeUnit.SECONDS.sleep(1);System.out.println(i);}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}/*** 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行* 可以根据tryLock的返回值来判定是否锁定* 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中*/void m2() {/*boolean locked = lock.tryLock();System.out.println("m2 ..." + locked);if(locked) lock.unlock();*/boolean locked = false;try {locked = lock.tryLock(5, TimeUnit.SECONDS);System.out.println("m2 ..." + locked);} catch (InterruptedException e) {e.printStackTrace();} finally {if(locked) lock.unlock();}}public static void main(String[] args) {T03_ReentrantLock3 rl = new T03_ReentrantLock3();new Thread(rl::m1).start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}new Thread(rl::m2).start();}
}
public class T04_ReentrantLock4 {public static void main(String[] args) {Lock lock = new ReentrantLock();Thread t1 = new Thread(()->{try {lock.lock();System.out.println("t1 start");TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);System.out.println("t1 end");} catch (InterruptedException e) {System.out.println("interrupted!");} finally {lock.unlock();}});t1.start();Thread t2 = new Thread(()->{try {//lock.lock();lock.lockInterruptibly(); //可以对interrupt()方法做出响应System.out.println("t2 start");TimeUnit.SECONDS.sleep(5);System.out.println("t2 end");} catch (InterruptedException e) {System.out.println("interrupted!");} finally {lock.unlock();}});t2.start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}t2.interrupt(); //打断线程2的等待}
}
公平锁 |
---|
如果不是公平锁,一个线程来了,会直接进行抢锁,谁抢到算谁的,而公平锁,一个线程来了,会先看等待队列,如果等待队列没有其它线程,直接去抢锁,如果有,就收敛点。但是并不是说绝对公平,只是相对雨露均沾一点点,不会出现一个线程执行满,另一个线程一次都得不到执行的情况 |
2、ReadWriteLock 读写锁(共享锁+排他锁)
读写锁 |
---|
如果有1000个读线程,2个写线程,如果有一个线程读完,这时写线程写入,那么刚才的读线程就出错了 |
所以,我们必选让读线程先读,等都读完再写 |
那么,如果是普通的锁,必选第一个线程读完,第二个线程才能进去(因为锁里面同时只能有一个线程操作)效率不高 |
所以我们使用读写锁,它分为共享锁和排他锁 |
当第一个读线程读时,其它读线程过来,因为都是做同一个操作,并且不改东西,所以其它读线程也可以进入到锁中执行代码,也就是同一时刻,读线程都可以进入锁内执行代码,而非读线程想进入,就会被排除,无法进入(当第一个线程进入锁,其它和第一个线程相同的线程,比如全部都是读线程,那么这些线程都可以执行代码,而不和第一个线程同类型,便不能执行代码) |
import java.util.Random;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class T10_TestReadWriteLock {static Lock lock = new ReentrantLock();//普通锁private static int value;static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();//读写锁static Lock readLock = readWriteLock.readLock();//获取读锁static Lock writeLock = readWriteLock.writeLock();//获取写锁//模拟读操作public static void read(Lock lock) {try {lock.lock();Thread.sleep(1000);System.out.println("read over!");//模拟读取操作} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void write(Lock lock, int v) {try {lock.lock();Thread.sleep(1000);value = v;System.out.println("write over!");//模拟写操作} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) {/*下面注释的是普通锁,你可以将其解除注释,然后注释使用读写锁的代码,试一下效果* 你会发现使用读写锁,读操作几乎是瞬间的在同一时间段完成* 而使用普通锁,第一个读完第二个才能读* *///Runnable readR = ()-> read(lock);Runnable readR = ()-> read(readLock);//Runnable writeR = ()->write(lock, new Random().nextInt());Runnable writeR = ()->write(writeLock, new Random().nextInt());//先读后写for(int i=0; i<18; i++) new Thread(readR).start();for(int i=0; i<2; i++) new Thread(writeR).start();}
}
8、JUC同步锁—常用工具
1、CountDownLatch 倒数门栓
倒数门栓,比如有100个门栓,那么1个门栓,栓一个,就相当于加锁,先从第100个栓,然后倒数一个,第99个门栓再去栓,被拴住的线程,必须等Count为0(门栓没有了)才能继续执行 |
---|
await();相当于join阻塞,count为0继续执行 |
countDown();倒数,倒数一个门栓,找东西栓,count-1 |
以下代码创建100个线程threads和100个门栓latch,每个线程配备一个门栓latch.countDown(), |
启动线程start(),然后latch.await()方法表示阻塞,就像门栓把所有线程都拴住了
下面代码使用join实现
import java.util.concurrent.CountDownLatch;public class T06_TestCountDownLatch {public static void main(String[] args) {usingJoin();//使用JoinusingCountDownLatch();//使用门栓}private static void usingCountDownLatch() {Thread[] threads = new Thread[100];//100个线程CountDownLatch latch = new CountDownLatch(threads.length);//创建与线程数相同的门栓for(int i=0; i<threads.length; i++) {threads[i] = new Thread(()->{int result = 0;for(int j=0; j<10000; j++) result += j;latch.countDown();//为每个线程上一个门栓});}for (int i = 0; i < threads.length; i++) {threads[i].start();}try {latch.await();//阻塞线程} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end latch");}private static void usingJoin() {Thread[] threads = new Thread[100];for(int i=0; i<threads.length; i++) {threads[i] = new Thread(()->{int result = 0;for(int j=0; j<10000; j++) result += j;});}for (int i = 0; i < threads.length; i++) {threads[i].start();}for (int i = 0; i < threads.length; i++) {try {threads[i].join();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("end join");}
}
总结:join实现阻塞,必须等待1其它线程执行完,自己才能继续执行,但是用门栓,可以控制,什么时候想运行了,执行countDown(),什么时候想阻塞,调用await()即可
2、CyclicBarrier 栅栏
相当于有个栅栏,人满了,把栅栏推倒,人过去,然后栅栏再起来,下一波人又满了,再次推到过去,以此类推 |
---|
3、Phaser 阶段器
按不同阶段,执行逻辑,比如有些线程到阶段1就可以断掉了,有些线程要从头走到尾 |
---|
以下代码实现模拟婚礼,人到齐,吃完饭,离开,入洞房4阶段,每个人都齐才能进入下一个阶段,入洞房只能有新郎和新娘在才可以进行 |
import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;public class T09_TestPhaser2 {static Random r = new Random();static MarriagePhaser phaser = new MarriagePhaser();//获取阶段器,此阶段器需要继承Phaser实现方法,代码在下面static void milliSleep(int milli) {//睡眠时间通用方法try {TimeUnit.MILLISECONDS.sleep(milli);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {phaser.bulkRegister(7);//将阶段器的大小设置为7for(int i=0; i<5; i++) {//5个线程new Thread(new Person("p" + i)).start();}new Thread(new Person("新郎")).start();//新娘新郎线程new Thread(new Person("新娘")).start();}static class MarriagePhaser extends Phaser {//继承阶段器@Overrideprotected boolean onAdvance(int phase, int registeredParties) {//此方法决定阶段switch (phase) {case 0:System.out.println("所有人到齐了!" + registeredParties);System.out.println();return false;case 1:System.out.println("所有人吃完了!" + registeredParties);System.out.println();return false;case 2:System.out.println("所有人离开了!" + registeredParties);System.out.println();return false;case 3:System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);return true;default:return true;}}}static class Person implements Runnable {String name;public Person(String name) {this.name = name;}public void arrive() {milliSleep(r.nextInt(1000));System.out.printf("%s 到达现场!\n", name);phaser.arriveAndAwaitAdvance();//进入下一阶段}public void eat() {milliSleep(r.nextInt(1000));System.out.printf("%s 吃完!\n", name);phaser.arriveAndAwaitAdvance();}public void leave() {milliSleep(r.nextInt(1000));System.out.printf("%s 离开!\n", name);phaser.arriveAndAwaitAdvance();}private void hug() {if(name.equals("新郎") || name.equals("新娘")) {milliSleep(r.nextInt(1000));System.out.printf("%s 洞房!\n", name);phaser.arriveAndAwaitAdvance();} else {phaser.arriveAndDeregister();//结束//phaser.register()}}@Overridepublic void run() {arrive();eat();leave();hug();}}
}
4、Semaphore 信号灯
信号灯,灯亮了可以进去,不亮不能进去 |
---|
允许指定数量线程同时执行,当值大于1,灯一直亮,当值等于0时,灯灭。其它线程要等灯重新亮 |
比如我指定允许1个线程同时执行,那么当一个线程看见灯亮着显示着1,可以获取到资源执行,此时灯变为0不亮了。其它线程看见灯不亮,就无法获取资源 |
当线程执行完,将资源还回去,那么灯重新变为1,重新亮 |
起来,其它线程抢资源 |
import java.util.concurrent.Semaphore;public class T11_TestSemaphore {public static void main(String[] args) {//Semaphore s = new Semaphore(2);//允许两个线程同时执行Semaphore s = new Semaphore(2, true);//允许两个线程同时执行,并且第二个参数为true,表示公平//允许一个线程同时执行//Semaphore s = new Semaphore(1);new Thread(()->{try {s.acquire();//线程获取资源,Semaphore的资源少一个System.out.println("T1 running...");Thread.sleep(200);System.out.println("T1 running...");} catch (InterruptedException e) {e.printStackTrace();} finally {s.release();//将获取的资源还回去给Semaphore}}).start();new Thread(()->{try {s.acquire();System.out.println("T2 running...");Thread.sleep(200);System.out.println("T2 running...");s.release();} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}
5、Exchanger 交换
Exchanger |
---|
此对象有两个位置,当一个线程调用exchange(s)时,此线程进入阻塞状态,占用一个位置,知道遇到另一个调用exchange(s)的线程,两个线程交换值,然后继续执行 |
import java.util.concurrent.Exchanger;
public class T12_TestExchanger {static Exchanger<String> exchanger = new Exchanger<>();public static void main(String[] args) {new Thread(()->{String s = "T1";try {s = exchanger.exchange(s);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " " + s);}, "t1").start();new Thread(()->{String s = "T2";try {s = exchanger.exchange(s);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " " + s);}, "t2").start();}
}
6、LockSupport
此类可以让线程随时停止随时启动 |
---|
下面例子让线程5s后停止,8秒后重启 |
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;public class T13_TestLockSupport {public static void main(String[] args) {Thread t = new Thread(()->{for (int i = 0; i < 10; i++) {System.out.println(i);if(i == 5) {LockSupport.park();//让线程停车}try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});t.start();// LockSupport.unpark(t);//此代码可以先与停车使用try {TimeUnit.SECONDS.sleep(8);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("after 8 senconds!");LockSupport.unpark(t);//让指定线程启动}
}
9、面试题
1、线程间通知类
题目在代码注释里面,方法一 :用到了 上面第一节 基本概念里面的锁中两个方法 wait和notify, |
---|
wait():锁睡眠,让锁里面的线程睡眠等待 ,同时将锁释放 ,其它线程得以抢到锁(锁了同一个对象的锁),执行代码 |
notify():唤醒,通知睡眠线程继续执行,但是不会释放锁 ,只是让锁中线程继续执行(前提是锁中没有其它线程在执行,如果有,等待其他线程执行结束,自己继续执行) |
/*** 曾经的面试题:(淘宝?)* 实现一个容器,提供两个方法,add,size* 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束* * 给lists添加volatile之后,t2能够接到通知,但是,t2线程的死循环很浪费cpu,如果不用死循环,该怎么做呢?* * 这里使用wait和notify做到,wait会释放锁,而notify不会释放锁* 需要注意的是,运用这种方法,必须要保证t2先执行,也就是首先让t2监听才可以* * 阅读下面的程序,并分析输出结果* 可以读到输出结果并不是size=5时t2退出,而是t1结束时t2才接收到通知而退出* 想想这是为什么?* * notify之后,t1必须释放锁,t2退出后,也必须notify,通知t1继续执行* 整个通信过程比较繁琐* @author mashibing*/
package com.mashibing.juc.c_020_01_Interview;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;public class T04_NotifyFreeLock {//添加volatile,使t2能够得到通知volatile List lists = new ArrayList();public void add(Object o) {lists.add(o);}public int size() {return lists.size();}public static void main(String[] args) {T04_NotifyFreeLock c = new T04_NotifyFreeLock();final Object lock = new Object();new Thread(() -> {synchronized(lock) {System.out.println("t2启动");if(c.size() != 5) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("t2 结束");//通知t1继续执行lock.notify();}}, "t2").start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e1) {e1.printStackTrace();}new Thread(() -> {System.out.println("t1启动");synchronized(lock) {for(int i=0; i<10; i++) {c.add(new Object());System.out.println("add " + i);if(c.size() == 5) {lock.notify();//释放锁,让t2得以执行try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}}, "t1").start();}
}
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class T05_CountDownLatch {// 添加volatile,使t2能够得到通知volatile List lists = new ArrayList();public void add(Object o) {lists.add(o);}public int size() {return lists.size();}public static void main(String[] args) {T05_CountDownLatch c = new T05_CountDownLatch();CountDownLatch latch = new CountDownLatch(1);new Thread(() -> {System.out.println("t2启动");if (c.size() != 5) {try {latch.await();//也可以指定等待时间//latch.await(5000, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("t2 结束");latch.countDown();}, "t2").start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e1) {e1.printStackTrace();}new Thread(() -> {System.out.println("t1启动");for (int i = 0; i < 10; i++) {c.add(new Object());System.out.println("add " + i);if (c.size() == 5) {// 打开门闩,让t2得以执行latch.countDown();try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e1) {e1.printStackTrace();}}/*try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}*/}}, "t1").start();}
}
public class T06_LockSupport {// 添加volatile,使t2能够得到通知volatile List lists = new ArrayList();public void add(Object o) {lists.add(o);}public int size() {return lists.size();}static Thread t1=null,t2=null;//声明为静态,可以作为内部参数public static void main(String[] args) {T06_LockSupport c = new T06_LockSupport();t2 = new Thread(() -> {System.out.println("t2启动");if (c.size() != 5) {LockSupport.park();//如果没有5个元素就停掉线程}System.out.println("t2 结束");LockSupport.unpark(t1);//打印解锁解锁线程t1}, "t2");t2.start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e1) {e1.printStackTrace();}t1 = new Thread(() -> {System.out.println("t1启动");for (int i = 0; i < 10; i++) {c.add(new Object());System.out.println("add " + i);if (c.size() == 5) {//如果为5LockSupport.unpark(t2);//解锁线程t2LockSupport.park();//停掉当前线程,避免size又递增后,t2线程才执行完逻辑,保证t2执行完}/*try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}*/}}, "t1");t1.start();}
}
2、生产消费类
方法一,使用Synchronized,配合wait()和notifyAll()两个方法 |
---|
当仓库满时,通过wait()让生产线程释放锁,进入睡眠。同时通过notifyAll()通知所有消费者进行消费 |
当仓库没有东西时,通过wait()让消费者释放锁,睡眠,同时notifyAll()通知所有生产者生产 |
/*** 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,* 能够支持2个生产者线程以及10个消费者线程的阻塞调用* * 使用wait和notify/notifyAll来实现* * @author mashibing*/
package com.mashibing.juc.c_021_01_interview;import java.util.LinkedList;
import java.util.concurrent.TimeUnit;public class MyContainer1<T> {final private LinkedList<T> lists = new LinkedList<>();final private int MAX = 10; //最多10个元素private int count = 0;public synchronized void put(T t) {while(lists.size() == MAX) { //想想为什么用while而不是用if?try {this.wait(); //effective java} catch (InterruptedException e) {e.printStackTrace();}}//因为一旦线程重启,由于线程太多,有些线程生产是,很可能已经满了,这时要保证它们再做一次判断,看看是否已经满了lists.add(t);++count;System.out.println(t+"生产了商品");this.notifyAll(); //通知消费者线程进行消费}public synchronized T get() {T t = null;while(lists.size() == 0) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}t = lists.removeFirst();count --;this.notifyAll(); //通知生产者进行生产return t;}public static void main(String[] args) {MyContainer1<String> c = new MyContainer1<>();//启动消费者线程for(int i=0; i<10; i++) {new Thread(()->{for(int j=0; j<5; j++) System.out.println(c.get()+"的商品被"+Thread.currentThread().getName()+" 消费");}, "c" + i).start();}try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}//启动生产者线程for(int i=0; i<2; i++) {new Thread(()->{for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " ");}, "p" + i).start();}}
}
方法二,使用ReentrantLock |
---|
通过newCondition()方法创建两个等待队列producer和consumer,这样可以将消费者统一放在consumer队列 |
生产者统一放在producer队列中 |
通过队列对象,producer.await()控制生产者释放锁等待 |
producer.signal()或signalAll()来控制生产者队列中线程继续运行 |
consumer消费者队列同理 |
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class MyContainer2<T> {final private LinkedList<T> lists = new LinkedList<>();final private int MAX = 10; //最多10个元素private int count = 0;private Lock lock = new ReentrantLock();//创建锁,此时只有一个等待队列private Condition producer = lock.newCondition();//条件,创建一个等待队列,为producerprivate Condition consumer = lock.newCondition();//再次创建一个等待队列,为consumer//以上,用lock锁创建两个等待队列,这样可以精确的控制,哪些线程进入哪个等待队列等待,比如让producer.await(),此时producer队列中的线程被停止//当consumer.signal()或consumer.signalAll()时,可以让consumer队列中的某个线程或所有线程继续运行public void put(T t) {try {lock.lock();while(lists.size() == MAX) { //想想为什么用while而不是用if?producer.await();//将此线程放在producer队列中等待}lists.add(t);++count;consumer.signalAll(); //通知consumer中所有消费者线程进行消费} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();//记住,此锁必须手动释放,并一定放在finally中,保证无论出错与否,都可以十分锁}}public T get() {T t = null;try {lock.lock();while(lists.size() == 0) {consumer.await();//将当前线程放在consumer队列中等待}t = lists.removeFirst();count --;producer.signalAll(); //通知生产者进行生产} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}return t;}public static void main(String[] args) {MyContainer2<String> c = new MyContainer2<>();//启动消费者线程for(int i=0; i<10; i++) {new Thread(()->{for(int j=0; j<5; j++) System.out.println(c.get());}, "c" + i).start();}try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}//启动生产者线程for(int i=0; i<2; i++) {new Thread(()->{for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);}, "p" + i).start();}}
}
3、线程配合
package com.mashibing.juc.c_026_00_interview.A1B2C3;import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;public class T01_00_Question {//cas时使用enum ReadyToRun {T1, T2}static volatile ReadyToRun r = ReadyToRun.T1; //思考为什么必须volatilestatic Thread t1=null,t2=null;public static void main(String[] args) throws InterruptedException {//要求用线程顺序打印A1B2C3....Z26//方法一、LockSupportAtomicInteger count = new AtomicInteger(0);System.out.println("======================LockSupport==========================");t1 = new Thread(()->{while(!(count.get()==26)){LockSupport.park();System.out.print((char)(64+count.incrementAndGet())+"");LockSupport.unpark(t2);}});t1.start();t2 = new Thread(()->{while(!(count.get()==26)){LockSupport.unpark(t1);LockSupport.park();System.out.println(count.get());}});t2.start();Thread.sleep(1*1000);System.out.println("======================wait_____notify==========================");Object o = new Object();count.set(0);new Thread(()->{while(!(count.get()==26)){synchronized (o){try {o.wait();} catch (InterruptedException e) {e.printStackTrace();}System.out.print((char)(64+count.incrementAndGet())+"");o.notify();}}}).start();new Thread(()->{while(!(count.get()==26)){synchronized (o){o.notify();try {o.wait();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(count.get());}}}).start();Thread.sleep(2*1000);System.out.println("======================cas==========================");count.set(0);new Thread(()->{while(!(count.get()==26)){if(r == ReadyToRun.T1){System.out.print((char)(64+count.incrementAndGet())+"");r=ReadyToRun.T2;}}}).start();new Thread(()->{while(!(count.get()==26)){if(r == ReadyToRun.T2){System.out.println(count.get());r=ReadyToRun.T1;}}}).start();}
}
10、源码
阅读源码非常麻烦,篇幅较大,所以以下ReentrantLock的源码我一步一步走,其它的源码,请大家参照方法自行阅读 |
---|
1、ReentrantLock 源码
此时进入ReentrantLock类
我们发现lock方法调用了Sync类的抽象方法lock(),而因为是抽象方法,所以调用子类的实现,NonfairSync类重写的lock()方法
lock()方法中我们发现直接调用了CAS操作,说明是其父类中的方法,我们进入Sync类看看
Sync类中没有发现CAS的定义,发现此类继承与AQS类,我们进入AQS看看
现在确定了类间的继承关系,ReentrantLock的lock方法调用了NonfairSync类重写的lock()方法,此类继承与Sync,Sync继承了AQS,接下来看看CAS
我们发现,实现和我们当时讲解CAS时一样,通过Unsafe类的compareAndSwapInt方法实现,接下来我们继续向下看看
发现CAS操作判断可以改值时,做了让线程独占的操作,接下来我们看看else的情况
我们发现else调用的是AQS的方法,而此方法又调用了tryAcquire()方法我们点进去看看
我们发现tryAcquire()方法也是AQS的方法,但是此方法由子类实现,我们看看子类的实现
至此,我们可以做出如下图解
2、AQS
从这里开始,不带大家一步一步看代码了,只把重点截取出来,看源码方法就是上面讲解的,照猫画虎即可
AQS源码阅读有以下重点 |
---|
AQS核心是state变量,这个变量是volatile的,它的取值为0和1,表示解锁和加锁 |
AQS中的队列,存储的是线程,有前驱和后缀两个变量,这3个对象都是volatile的。 |
也就是说,锁只有一个,用state变量来控制,当前锁是否可以获取,是否是加锁状态 |
而队列,就是等待队列,没有抢到锁的线程就在队列中等着,如果看到state是0就直接去拿锁,state变为1 |
另外我们发现,公平锁和非公平锁,就是,公平锁,没有抢到锁的线程会放在队列中。而非公平锁不放在队列中,直接去抢锁 |
这张图片erlse if中代码是锁重入的代码,忘了讲了
下面代码没有给整个链表加锁,而是用CAS只关注最后一个节点,实现让新来的线程,到队列最后面,CAS成功既插入成功,然后返回节点对象,没有成功就不断的尝试,直到成功
获取已经插入的线程,一直获取前置节点,如果前置节点是头节点了,尝试竞争锁,如果成功退出死循环拿锁执行代码,否则阻塞队列,当前置节点执行完了,叫醒队列,拿锁
一个重点,非常细节的对象,JDK1.9更新的VarHandle对象
JDK1.9之后有的VarHandle ,指向一个引用 |
---|
此对象可以让普通属性进行原子性操作 ,比反射更快(因为直接操纵2进制码) |
比如一个Object类型对象o变量指向一个实例(内存中一个区域),也就是他们之间是引用的关系 |
假设这样一个画面,变量o 在左边 一个箭头 指向右面一块区域 |
而VarHandler就是他俩之间的引用,也就是内一个箭头 |
所以,变量o可以指向右边的一块区域,而这个箭头一样,也指向内个区域 |
所以,VarHandle也可以指向一块引用区域,一个实例,一个变量值 |
一个例子,VarHandler对象,可以让同一个引用,进行一些其它的操作,比如原子性操作
发布评论