admin管理员组文章数量:1565357
1. 基础篇
1.1. 多线程的作用
-
提高程序的响应速度:通过多线程,程序可以同时处理多个任务或事件,避免阻塞等待某个耗时操作完成而导致整个程序暂停响应的情况,从而提高程序的响应速度。
-
充分利用多核处理器:现代计算机通常都是多核处理器,多线程可以充分利用多核处理器的优势,将任务分配给不同的核心并行处理,提高程序的运行效率。
-
提高系统资源利用率:通过多线程,可以更好地利用计算机的硬件资源,如CPU、内存等,使系统资源得到更合理的分配和利用,提高系统整体的资源利用率。
-
实现异步操作:多线程可以实现异步操作,即程序可以同时执行多个任务,而不需要等待上一个任务完成才能继续执行下一个任务,这样可以提高程序的性能和效率。
-
实现并发编程:多线程是实现并发编程的基础,可以让程序同时处理多个任务,实现数据共享和通信,从而更好地满足复杂程序的需求。
1.2. 实现多线程
1.2.1. 继承Thread类实现多线程
通过继承 Thread
类,并重写 run()
方法实现多线程。
public class TestThread extends Thread {
@Override
public void run() {
System.out.println("执行");
}
}
public class Main {
public static void main(String[] args) {
TestThread thread = new TestThread();
thread.start();
}
}
有上面的代码我们可以发现,基于 Thread 实现多线程是一件很简单的事情,启动线程我们只需要将我们的对象进行实例化,然后调用 start 方法即可。
下面我们来讨论一下,为什么一个类继承了 Thread 类并重写 run 方法后就可以实现多线程?
我们需要进行 start 方法,看一下里面都做了什么。
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}
首先我们可以发现 start 方法是一个线程安全的方法,避免了 start 方法中的逻辑混乱。start 方法上来就判断了 threadStatus 的值是否为 0,不为 0 则抛出异常(如果不为 0,大概率说明 start 方法被重复调用了),值得注意的是从 Thread 类中无法得到 threadStatus 被修改的过程,threadStatus 的修改在 JDK 层面是不可见的,,其修改是 Java 虚拟机(JVM)和操作系统通过底层的线程管理和调度机制来改变 threadStatus 的值。
下面我们可以看到,接着就将当前线程添加到了线程组中 group.add(this)
, ThreadGroup 便于对线程进行统一管理和控制、设置线程组的特性、异常处理、层级结构等。
最后我们可以看到,start0 被调用,start0 是一个 native 类型的方法,start0 方法会将该线程的状态设置为“运行中”,然后调用该线程对象的 run() 方法,最后将该线程的状态设置为“终止”。因为我们实例化的是 Thread 的子类(自定义类)并且重写了 run 方法,这样我们就实现了开辟一个线程去执行我们自定义代码的目的。
那么问题来了,如果我们不重写 run 方法会发生什么?自然是调用 Thread 中的 run 方法了,最终会尝试去调用 Runnable 的 run 方法
@Override
public void run() {
// target为Thread的成员属性private Runnable target
if (target != null) {
target.run();
}
}
1.2.2. 实现Runnable接口实现多线程
通过实现 Runnable
接口,并重写 run()
方法实现多线程。
public class TestThread implements Runnable{
@Override
public void run() {
System.out.println("执行");
}
}
public class Main {
public static void main(String[] args) {
TestThread thread = new TestThread();
new Thread(thread).start();
}
}
下面我们就来讨论一下,为什么实现了 Runnable 接口的类,通过实例化对象作为 Thread 构造方法的参数,就可以实现多线程?
我们可以发现,是启动多线程时调用了 Thread 的一个有参构造 public Thread(Runnable target),通过深入 Thread 的源码,最终发现,target 被赋值给了 Thread 的成员属性 private Runnable target
。
最后我们还是需要调用 Thread 的 start 方法,在上面我们讲了,Thread 中的 start 方法会调用 start0方法,start0 最终会调用当前对象的 run 方法,而我们实例化的是 Thread 类,这就意味着会调用 Thread 的 run 方法,Thread 的 run 方法如下:
@Override
public void run() {
if (target != null) {
target.run();
}
}
我们发现,Thread 的 run 方法会尝试调用 target 的 run 方法,非常碰巧的是 Thread 中的 target 已经被我们通过使用 Thread 的有参构造方法给初始化了,这就意味着 target.run();
必然会被调用。
这样我们就通过实现 Runnable 接口开辟了一个线程并执行了我们的自动以代码。
1.2.3. 实现Callable接口实现多线程
通过实现 Callable
接口,并重写 call()
方法实现多线程。
public class TestTread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 0;
}
}
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
TestTread thread = new TestTread();
FutureTask<Integer> futureTask = new FutureTask<>(thread);
new Thread(futureTask).start();
System.out.println(futureTask.get());
}
}
下面我们来讨论一下,为什么我们通过实现 Callable 接口并重写其 call 方法就可以实现多线程并能拿到返回值?
通过上面的 Main 中启动多线程的过程,我们可以发现要比通过 继承 Thread 和实现 Runnable 接口的方式要复杂,多了一个 FutureTask。
我们先进入 Future 的有参构造 public FutureTask(Callable<V> callable)
,看一下 new FutureTask<>(thread)
都干了什么?
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
我们可以看到核心是将 thread(实现了Callable)对象传递给了 FutureTask 的成员属性 callable。
下面我们看一下 Main 中的 new Thread(futureTask)
都干了什么?点进去发现和 实现 Runnable 接口后启动线程所使用的的 Thread 的构造方法一样
,都是 public Thread(Runnable target)
,那么就意味着 start 方法会调用 start0 方法,start0 会调用 Thread 的 run 方法,run 方法会调用 Runnable 的 run 方法。
其实 FutureTask 是 Runnable 的实现类,上一段所说的调用 Runnable 的 run 方法,其实就是调用 FutureTask 的 run 方法,我们进入 FutureTask 的 run 看一下:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
我们可以看到第 7 行 Callable<V> c = callable;
将 callable 赋值给了局部变量 c,第 12 行 result = c.call();
调用了 call 方法,这也就意味着被我们重写的 call 方法被调用。而且 result(执行结果)被保存起来了,那么我们就可以拿到线程的执行结果。
1.3. 多线程常用操作
1.3.1. 线程命名
线程是不确定的运行状态,线程名称就是线程的主要标记。因此,需要注意的是,对于线程的名称一定是在线程启动之前进行设置,不建议在线程运行中进行设置、修改线程名称。
public class MyThread implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
public class Main {
public static void main(String[] args) {
MyThread myThread0 = new MyThread();
MyThread myThread1 = new MyThread();
new Thread(myThread0, "线程1").start();
new Thread(myThread1, "线程2").start();
myThread0.run();
myThread1.run();
}
}
运行结果:
main
main
线程1
线程2
有上面可知,我们可以在实例化线程的时候,通过构造方法参数初始化线程名称,当然我们也可以在线程实例在被 start()
之前调用 setName()
实现线程名称的设置。如果我们不设置线程名称,我们在Thread初始化话的过程中会使用 Thread-[索引]
来命名同一个线程组中的名称。
如果我们直接执行 MyThread 中的 run 方法,那么线程的名称是 main ,这是因为直接运行 run 方法,JVM 不会开辟新的线程,代码依然运行在主线程中。
1.3.2. 线程休眠
sleep()
方式定义在 java.lang.Thread
中,由 Thread.sleep() 调用,其作用是需要暂停线程的执行,即当前线程从 “运行状态” 进入 “阻塞状态” ,在此期间线程并不会释放相关的线程锁,并导致当前当前线程阻塞。当睡眠实现达到预设时间后,线程会被唤醒,此时该线程会从 “阻塞状态” 变为 “就绪状态” ,等待 CUP 的调度。
其主要方法如下:
// 普通函数:设置休眠时间的毫秒数
public static void sleep(long millis) throws InterruptedException
// 普通函数:设置休眠毫秒数和纳秒数
public static void sleep(long millis,int nanos) throws InterruptedException
有人可能会问 wait 呢? sleep 和 wait 是不同的, wait 和 sleep 的区别如下:
wait属于Object的方法,而sleep属于Thread的方法
wait会释放锁,但是sleep不会
wait需要依赖synchronized,而sleep不需要
wait需要被被动唤醒,而sleep不需要
1.3.3. 线程中断
interrupt()
方法的定义在 java.lang.Thread
中,由 Thread.interrupt() 调用实现,该方法将会设置该线程的中断标记位,即设置为 true 。调用 interrupt() 并不能直接停止线程,它只是类似于发起了一个对该线程的中断请求,至于线程是否被中断,是由程序本身决定的。
例如下面的程序中,线程自己调用的 interrupt() 方法,尝试在第 10 次调用后终止程序,是没有效果的:
public class MyThread implements Runnable{
public MyThread() {
new Thread(this).start();
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
System.out.printf("%s:正在执行第%d次\n",Thread.currentThread().getName(), i+1);
// 尝试在第10次执行之后中断请求
if ((i+1) >= 10) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
new MyThread();
}
}
执行结果:
Thread-0:正在执行第1次
Thread-0:正在执行第2次
Thread-0:正在执行第3次
Thread-0:正在执行第4次
Thread-0:正在执行第5次
Thread-0:正在执行第6次
Thread-0:正在执行第7次
Thread-0:正在执行第8次
Thread-0:正在执行第9次
Thread-0:正在执行第10次
Thread-0:正在执行第11次
Thread-0:正在执行第12次
Thread-0:正在执行第13次
Thread-0:正在执行第14次
Thread-0:正在执行第15次
Thread-0:正在执行第16次
Thread-0:正在执行第17次
Thread-0:正在执行第18次
Thread-0:正在执行第19次
Thread-0:正在执行第20次
线程是否中断是由程序本身决定的,例如将上面的代码进行稍微的调整:
public class MyThread implements Runnable{
public MyThread() {
new Thread(this).start();
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
// 判断程序是否被发起了中断请求
if (Thread.currentThread().isInterrupted()) {
return;
}
System.out.printf("%s:正在执行第%d次\n",Thread.currentThread().getName(), i+1);
// 尝试在第10次执行之后中断请求
if ((i+1) >= 10) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
new MyThread();
}
}
执行结果:程序被成功中断
Thread-0:正在执行第1次
Thread-0:正在执行第2次
Thread-0:正在执行第3次
Thread-0:正在执行第4次
Thread-0:正在执行第5次
Thread-0:正在执行第6次
Thread-0:正在执行第7次
Thread-0:正在执行第8次
Thread-0:正在执行第9次
Thread-0:正在执行第10次
注意
关于线程中断,此处需要做出必要的说明
1. 当前线程调用自己的 interrupt() 总是被允许的,其他线程调用当前线程的 interrupt() 方法是需要通过 checkAccess() 检查的,不通过则会抛出异常,一般情况下没有问题。
2. 如果该线程在 Object 类的 wait()、wait(long)、wait(long, int) 方法,或者该类的 join()、join(long)、join(long, int)、sleep(long)、sleep(long, int) 方法的调用中被阻塞,那么它的中断状态将被清除,并且它将接收到一个 InterruptedException 异常。
3. 如果该线程在对可中断通道的 I/O 操作中被阻塞,则通道将被关闭,线程的中断状态将被设置,并且线程将接收到一个 java.nio.channels.ClosedByInterruptException 异常。
下面我将显示一下在 Thread.sleep() 调用中被阻塞的情况
public class MyThread implements Runnable{
@Override
public void run() {
for (;;) {
System.out.printf("%s:正在执行\n",Thread.currentThread().getName());
try {
Thread.sleep(400);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() +"被中断");
break;
}
}
}
}
public class Main {
public static void main(String[] args) {
MyThread myThread = new MyThread();
Thread thread = new Thread(myThread, "myThread");
thread.start();
try {
Thread.sleep(4000);
thread.interrupt();
} catch (InterruptedException e) {throw new RuntimeException(e);}
}
}
执行结果:
myThread:正在执行
myThread:正在执行
myThread:正在执行
myThread:正在执行
myThread:正在执行
myThread:正在执行
myThread:正在执行
myThread:正在执行
myThread:正在执行
myThread:正在执行
myThread被中断
1.3.4. 线程插队
多线程在执行后,会交替的资源进行执行,线程(非守护线程)不会因为扣个线程结束而结束,也不会因某个线程线程执行而等待。但是在实际开发中,我们往往需要实现对某个线程的阻塞,让其等待其他线程执行完成后,再次继续执行。
对于上面的需求(让某个线程( A 线程)因另一个线程( B 线程)的执行而等待,而 A 线程可以实时监控 B 线程的执行情况,当 B 线程结束, A 线程可以立即继续执行
),我们可以通过 Thread 中的 join()
方法实现。
线面我来举一个例子,A 线程中执行循环 20 次,当 A 线程中的循环执行到 10 次,插入 B 线程(阻塞A线程),当 B 线程执行完成后,继续执行 A 线程剩下的 10 次循环。
public class BThread implements Runnable{
@Override
public void run() {
System.out.println("--------------------");
for (int i = 0; i < 3; i++) {
System.out.printf("BThread 执行第%d次循环\n", i+1);
}
System.out.println("--------------------");
}
}
public class AThread implements Runnable{
Thread bThread;
public AThread() {
bThread = new Thread(new BThread());
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.printf("AThread 执行第%d次循环", i+1);
if (i == 4) {
// 启动B线程
bThread.start();
try {
// 插入B线程,阻塞当前线程(A线程)
bThread.join();
} catch (InterruptedException e) {throw new RuntimeException(e);}
}
}
}
}
public class Main {
public static void main(String[] args) {
new Thread(new AThread()).start();
}
}
执行结果:
AThread 执行第1次循环
AThread 执行第2次循环
AThread 执行第3次循环
AThread 执行第4次循环
AThread 执行第5次循环
--------------------
BThread 执行第1次循环
BThread 执行第2次循环
BThread 执行第3次循环
--------------------
AThread 执行第6次循环
AThread 执行第7次循环
AThread 执行第8次循环
AThread 执行第9次循环
AThread 执行第10次循环
在上面,我们看到,join() 确实阻塞了 A 线程,那是不是 join() 会阻塞所有的线程呢?答案时否定的, join() 阻塞的是 'join() 代码片段' 所属的线程
,例如在 A 线程中通过 bThread.join()
调用了 join() ,但是 bThread.join()
这个代码片段是属于 A线程的,所以 join() 阻塞的是 A 线程。如下图所示:
下面我们来探索一下,bThread.join() 是如何实现阻塞 A 线程的,A 线程又是如何实现自动唤醒的,进入 join() 方法,发现其是引用的 join() 的一个重载方法 join(0),join(0) 的代码如下:
public final synchronized void join(long millis) throws InterruptedException {
// 略
long base = System.currentTimeMillis();
long now = 0;
// 略
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
// 条件成立:方法参数millis为0时(就是本次的探索)
if (millis == 0) {
// 成立条件:当B线程存活时(会形成一个自旋)
while (isAlive()) {
// 阻塞调用线程(A线程)
wait(0);
}
}
// 略
else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
上面的源码中,核心部分就是我标记的 “条件成立” 的那块代码片段,下面我就来解释一下这个代码片段,消除大家心中的疑虑:
if (millis == 0) {
while (isAlive()) {
wait(0);
}
}
第 2 行中的 isAlive() 被执行,获取的是 B 线程的存活状态,因为上面的代码被调用来自于 bThread.join(),所以 isAlive() 相当于 bThread.isAlive(),而 isAlive() 属于 Thread 的原始方法,自然获取的是 B 线程的执行存活状态,但是!!!,上面的代码片段并不是在 B 线程中执行的,而是在 A线程中执行的,bThread.isAlive() 只是从 A 线程中获取 B 线程的状态而已~,这点很重要!!!
第 3 行中的 wait(0) 被执行,阻塞的是 A 线程,如果你对 wait 比较了解,那么就会知道 wait阻塞的是当前执行线程
,因为我们上面讲到了上面的代码片段的执行并不是由 B 线程执行的,而是由 A 线程执行的,所以 wait(0) 阻塞的必然是执行线程,即 A 线程。
上面的第 2、3 行代码会使得 A 线程进行自旋:判断 B 状态,等待 0 秒接着判断 B 状态,再等待 2 秒,循环往复直到 B 线程结束。
总结:当 A 线程中通过 B 线程对象调用 join() 方法,join() 的执行是在 A 线程中执行的,其中join() 源码中的 isAlive() 获取的是 B 线程的存活状态,wait(0) 阻塞的是 A 线程。
1.3.5. 线程让步
多线程在彼此彼此交替执行的过程中往往需要进行资源的轮流抢占,如果某些不是很重要的线程抢到了资源但是又不急于去执行,就可以把资源让步出去,供其他资源执行。值得注意的是线程让步(yield)是将线程状态由“运行状态”转为就绪状态,所以不能保证当当将线程让步出去其他线程就一定能获得资源,反之,抢到资源的可能是刚刚执行 yield 的资源,所以最终到底由谁去执行还是要靠CPU调度。
现在的电脑都是多核的,Thread.yield() 不好效果,使用方式入下:
public class Main {
public static void main(String[] args) {
Runnable runnable = () -> {
for (int i = 0; i <= 100; i++) {
if (i % 2 == 0) {
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + "-执行" + i);
}
};
new Thread(runnable, "线程1").start();
new Thread(runnable, "线程2").start();
}
}
我们在什么时候使用 yield() 方法呢?我能想到的一个案例是,当我们有多个线程同时执行时,其中有一个线程A时长任务(且是 cup 紧密型),那么当线程 A 每执行完一个阶段就可以使用 yield() 让出一下 cup ,让其他等待的线程有执行的机会,这样线程 A 最执行完成,其他线程也无需等待。
1.3.6. 线程优先级
我们通过编码创建的线程都属于子线程,所有的子线程在启动的时候都会保持相同的优先级权限,当我们希望某些线程的优先级权限可以更高时(即优先抢占资源并执行),那么我们可以设置线程的优先级来实现。
需要知道的是,线程的优先级权限是从 1 到 10 的,10 为最高优先级(Thread.MAX_PRIORITY), 1 为最低优先级(Thread.MIN_PRIORITY),5为普通优先级(NORM_PRIORITY),普通优先级即为默认优先级。
我们需要知道的是,对线程设置优先级只是尝试告诉操作系统,哪个线程的优先级高,该线程希望被优先执行,但是也仅限希望,最终被执行的顺序还是依赖于操作系统的调度算法(策略)。
优先级越高的线程并不是真的就一定会被优先执行,例如在 Windows 操作系统中,存在一种优先级推进器,当一个线程被执行的次数过多的话,操作系统会跳过线程优先级,先为该线程分配资源让其执行。
设置优先级的让发如下:
public class Main {
public static void main(String[] args) {
Thread thread1 = new Thread(new ThreadOne());
Thread thread2 = new Thread(new ThreadTwo());
Thread thread3 = new Thread(new ThreadThree());
// 设置线程优先级
thread1.setPriority(Thread.MIN_PRIORITY); // 最小(1)
thread2.setPriority(Thread.MAX_PRIORITY); // 最大(10)
thread3.setPriority(Thread.NORM_PRIORITY); // 默认(5)
thread1.start();
thread2.start();
thread3.start();
}
}
1.4. 线程同步与死锁
1.4.1. 线程同步
多线程如果共享每一个资源,且对资源具有写操作,那么必须在对公共资源进行写操作之前实现线程同步,即需要先获取同步锁,然后再执行写操作,当完成对资源的写操作之后,需要释放同步锁,给其他线程进行写曹组的机会,同时也避免了死锁的操作。
下面我将使用 synchronized 和 ReentrantLock 两种方式的锁实现对一个售票系统的线程同步。
1.4.1.1. 基于 synchronized 的方式
public class Ticketing implements Runnable {
private int ticket = 10;
private final Object locker = new Object();
@Override
public void run() {
while (true) {
synchronized (locker.getClass()) {
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + ":售出第" + (10 - (ticket--) + 1) + "张票");
try {Thread.sleep(20);} catch (InterruptedException ignored) {}
} else {
System.out.println(Thread.currentThread().getName() + ":票已售罄");
return;
}
}
}
}
public static void main(String[] args) {
Ticketing ticketing = new Ticketing();
new Thread(ticketing,"售票员01号").start();
new Thread(ticketing, "售票员02号").start();
}
}
输出结果
售票员01号:售出第1张票
售票员01号:售出第2张票
售票员01号:售出第3张票
售票员02号:售出第4张票
售票员01号:售出第5张票
售票员01号:售出第6张票
售票员01号:售出第7张票
售票员01号:售出第8张票
售票员02号:售出第9张票
售票员01号:售出第10张票
售票员02号:票已售罄
售票员01号:票已售罄
由上面的结果我们可以发现,售票员 1 和售票员 2 是在交替售票,票的总数是固定的,售票的总数和票的总数是匹配的,符合我们的预期。
大家可以看到,我在 synchronized 中使用的锁是 locker.getClass() ,而 locker 就是一个普通的 Object 对象,我这样做的目的是想要告诉大家,synchronized 实现的同步锁其实锁的是一个对象,这个对象是任意的。
大家可能会问是如何锁对象的?其实就是在该对象的二进制文件中的头部添加了标记位来实现的,当一个对象指定位置(约定)的被填充了信息,则意味着该对象已经被上锁了,当该对象指定位置的信息被擦除后,则意味着锁已放开。
当多个线程在竞争执行同一块代码时,因为有 synchronized 的存在,所以所有的线程都会去查看 locker 的指定位置是否已经被填充了信息,如果没有被填充信息,那么线程就会尝试填充自己的信息,填充自己信息的线程就是获取锁成功的线程,其他线程会不断的检查并竞争 locker 对象,知道将自己的信息填充到 locker 的指定位置,自己采拿回了代码的执行权。
我们使用 synchronized 实现同步锁是不需要在代码中实现释放锁的,锁的释放是由 JVM 决定的,当同步方法或同步代码块执行完成后会自动的释放锁。另外 synchronized 是支持可重入锁的(就是一个线程在未释放锁的时候是可以采集获取锁的)
1.4.1.2. 基于 ReentrantLock 的方式
public class Ticketing implements Runnable {
private int ticket = 10;
ReentrantLock locker = new ReentrantLock();
@Override
public void run() {
while (true) {
locker.lock();
try {
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + ":售出第" + (10 - (ticket--) + 1) + "张票");
} else {
System.out.println(Thread.currentThread().getName() + ":票已售罄");
return;
}
} finally {
locker.unlock();
try {Thread.sleep(5);} catch (InterruptedException ignored) {}
}
}
}
public static void main(String[] args) {
Ticketing ticketing = new Ticketing();
new Thread(ticketing,"售票员01号").start();
new Thread(ticketing, "售票员02号").start();
}
}
执行结果:
售票员01号:售出第1张票
售票员02号:售出第2张票
售票员02号:售出第3张票
售票员01号:售出第4张票
售票员02号:售出第5张票
售票员01号:售出第6张票
售票员02号:售出第7张票
售票员01号:售出第8张票
售票员02号:售出第9张票
售票员01号:售出第10张票
售票员02号:票已售罄
售票员01号:票已售罄
由上面的结果我们可以发现,售票员 1 和售票员 2 是在交替售票,票的总数是固定的,售票的总数和票的总数是匹配的,符合我们的预期。
从 ReentrantLock 的名字上我们就可以得知,ReentrantLock 是一种可重入锁,顾名思义,当一个线程获取了锁但是没有释放锁,该线程时可以重复获取该锁的。但是值得注意的是 ReentrantLock 的 lock() 和 unlock() 必须成对出现。
除此以为 ReentrantLock 是支持公平锁和不公平锁的。所谓的公平锁就是多个线程进行抢锁的时候是需要排队(FIFO)的,保证多个线程都可以获取到锁,对于线程层面的执行时公平的;所谓的不公平锁就是多个线程抢锁是一拥而上的,谁抢到是谁,这样会造成某个线程可能会一直抢锁成功,某个线程会一直抢不到锁,对于线程层面的执行时不公平的。我们上面讲过的 synchronized 锁就是一种公平锁。在实际的开发中大多数使用的非公平锁(相对公平锁更高效),除非有特定的场景需要线程按照先来后到的规则进行执行的需求我们才会使用公平锁(使用 ReentrantLock fairLocker = new ReentrantLock(true) 创建公平锁)。
ReentrantLock 是如何是实现上锁和解锁的呢?源码我就不在这里给大家粘贴了,下面我通过文字的方式讲述一下:
在 ReentrantLock 中实时有是哪个内部类的,分别是 abstract Sync
、 NonfairSync
、FairSync
,当我们通过 ReentrantLock 的无参构造进入后发现,其实例化的是 NonfairSync ,当我们在调用 locker.lock() 后实际进入的的 NonfairSync 的 lock() 方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
当线程 A 是第一次获取锁且获取到时,那么在进行 CAS(给 status 设置为 1 )的时候是成功的,因为期望值是 0 ,设置值是 1 。所以当线程 A 第一次获取到锁之后,状态值就变成了 1 。
当线程 A 在未释放锁的时候进行而二次上锁时,那么上面的 CAS compareAndSetState(0, 1)
必然不会通过,因为状态已经是 1 了,不是期望值 0,所以会进入 acquire(1)
,代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们可以发现只要 tryAcquire(1)
为 true,整个 if
不成立,程序结束,NonfairSync 的 tryAcquire() 中直接调用了nonfairTryAcquire(1),代码如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
很明显,因为线程 A 是第二次上锁,且上一次未释放锁,所以走的就是 else if (current == getExclusiveOwnerThread())
,接着往下读发现就是给状态值 status 加 1 ,并返回 true ,所以第二次上锁成功。
ReentrantLock 实现公平锁其实就是在对 status 进行 CAS 前判断自己需不需排队。如果需要排队,也就意味着在当前线程之前已经有其他线程尝试进行加锁且失败配,并且正在排队;如果不需要排队也就意味着在当前线程前没有其他线程进行上锁操作(上锁失败),当前线程可以直接尝试加锁不需要排队。
在 ReentrantLock 中,实现释放锁是不分公平锁和非公平锁的。 没有上锁成功的线程在进行排队期间(通过双向链表实现的队列)是被 park 的。不论是否是公平锁,都是只会解锁队列中的一个排队线程。至于解锁队列中的哪个等待线程是确定的,ReentrantLock 会选择双向链表中的头节点对应的线程来唤醒,即链表中的第一个等待线程。这意味着等待时间最长的线程将被优先唤醒,以便它有机会再次尝试获取锁。
1.4.2. 线程死锁
所谓死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去,死锁的操作一般是在程序运行时候才有可能出现,死锁是在多线程开发中较为常见的一种问题,过多的同步就有可能出现死锁。
如下就是一个经典的死锁案例:
class First {
public synchronized void tell(Second second) {
System.out.println("First start now...");
second.doSubMethod();
}
public synchronized void doSubMethod() {
System.out.println("First sub method do");
}
}
class Second {
public synchronized void tell(First first) {
System.out.println("Second start now...");
first.doSubMethod();
}
public synchronized void doSubMethod() {
System.out.println("Second sub method do");
}
}
public class DeadLock implements Runnable{
private final First first = new First();
private final Second second = new Second();
public DeadLock() {
new Thread(this).start();
first.tell(second);
}
@Override
public void run() {
second.tell(first);
}
public static void main(String[] args) {
new DeadLock();
}
}
上面的代码通过主线程实例化了 DeadLock 对象,顺势启动了一个子线程,子线程中通过 second.tell(first);
获取到了 Second 的对象锁,Second 的 tell 方法由获取到了 First 的对象锁,同时主线程中,由通过 first.tell(second)
获取到了 First 的对象锁,First 中的 tell 方法由获取 Second 的对象锁,这样子线程获取了 Second 的对象锁,在未释放锁的时候又要获取 First 的对象锁,主线程获取了 First 的对象锁,在未释放锁的时候又要获取 Second 的对象锁,两个线程就卡死了,造成了死锁。
1.5. 守护线程
守护线程是 Java 多线程编程中的特殊类型的线程,它的生命周期取决于是否存在前台线程,不会阻止 JVM 的退出。这种线程通常用于执行支持性任务、定时任务、垃圾回收等工作。然而,在使用它时需要注意生命周期不可控、不要进行 I/O 操作以及不要执行长时间任务等问题。合理使用守护线程可以提高程序的性能和可维护性,但需要根据具体需求谨慎选择。
下面将会实现一个守护线程,我会启动一个主线程,并在主线程中启动一个普通线程和一个守护检查,最终的目的是使得守护线程在主线程和子线程执行结束后自动结束。
public class UserThread implements Runnable{
private String threadName;
public UserThread(String threadName) {
this.threadName = threadName;
new Thread(this, threadName).start();
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+":正在运行");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(threadName + "结束...");
}
}
public class DaemonThread implements Runnable{
public DaemonThread(String threadName) {
Thread thread = new Thread(this, threadName);
// 设置该线程为守护线程,当用户(包括主线程)线程结束后,守护线程也结束
thread.setDaemon(true);
thread.start();
}
@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + ":正在运行");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
public class Main {
public static void main(String[] args) {
new DaemonThread("守护线程");
new UserThread("用户线程");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("主线程结束...");
}
}
执行结果:
守护线程:正在运行
用户线程:正在运行
用户线程:正在运行
守护线程:正在运行
用户线程:正在运行
守护线程:正在运行
守护线程:正在运行
用户线程:正在运行
守护线程:正在运行
用户线程:正在运行
守护线程:正在运行
用户线程:正在运行
主线程结束...
用户线程:正在运行
守护线程:正在运行
用户线程:正在运行
守护线程:正在运行
用户线程:正在运行
守护线程:正在运行
守护线程:正在运行
用户线程:正在运行
守护线程:正在运行
用户线程结束...
从上面的执行结果我们可以看出,当主线程和用户线程执行完成之后,守护线程会自动终结,和我们的预期相符。
1.6. 线程池
1.6.1. 线程池的优点
当我们需要执行大量的多线程任务时,如果对所有的任务都创建线程那么容易造成线程阻塞,导致其他更为重要的任何无法被执行,其次线程的创建是很消耗资源的,如何在一个任务中需要同时开辟多个线程,倘若线程是被临时创建的,会对整个任务的耗时造成很大的影响。
通过上面的描述,我们可以总结出线程池存在如下的优势:
- 降低资源消耗,通过对线程的重复利用,避免了线程的反复创建于销毁,降低了资源的消耗。
- 提高响应速度,线程池中是存在被提前创建好的线程的,那么对于新任务而言,就可以直接执行,省去了创建线程的时间,提高了响应速度。
- 提升系统稳定,线程是稀缺的,如果任由线程无限创建,不但会消耗系统资源,而且还会造成系统崩溃(如OOM),通过使用线程池可以有效的避免上述问题的发生,提升了系统的稳定性。
1.6.2. 线程池的使用
1.6.2.1. 构造方法说明
在 Java 中,我们可以使用 JUC 提供的 ThreadPoolExecutor 来创建线程池,ThreadPoolExecutor 有 4 个构造方法。
public ThreadPoolExecutor( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue )
{
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory )
{
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler )
{
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
{
...
}
如上的 4 个构造方法涉及到了 7 个构造参数,下面我们就来一一介绍一下这哥几个构造参数。
corePoolSize
: 线程池的核心线程数,默认情况下线程池中的核心线程都会一致存活,我们可以将其理解为线程池的最小核心线程数。但是,如果我们将线程池的 allowCoreThreadTimeout 设置为 true ,核心线程超时也会被回收。
maximumPoolSize
: 最大线程数,当线程池中创建的线程数达到该值后,后序加进来的任务将会被阻塞。
keepAliveTime
: 线程空闲超时时间,当非核心线程空闲时间超过了 eepAliveTime ,那么将会被回收,如何 allowCoreThreadTimeout 设置为 true ,那么核心线程超时也会被回收。
unit
: 是 keepAliveTime 超时时间的单位,通常有 TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
workQueue
: 任务队列,通过 execute() 方法提交的 Runnable 对象将存储在该参数中,使用阻塞队列实现。
threadFactory
: 线程工厂,通过实现 ThreadFactory 接口,并重写其中的 newThread(Runnable r)方法,我们就可以自定义线程池中创建的线程。一般来讲,如果我们需要设置线程名称,或者设置创建的线程为守护线程,那么通常我们会自定义该参数。
handler
: 拒接策略,由于达到线程界限和队列容量而阻止执行时使用的处理逻辑。
1.6.2.2. 使用案例演示
下面我们将演示一个例子:创建一个核心线程数为 2 且最大线程数为2的线程池,设置线程的阻塞队列的大小为 2 ,使用自定义线程工厂自定义创建的线程池的名称,并自定义拒绝策略。
最终我们会创建 5 个任务添加到线程池中去执行,且每个任务的执行时间不小于 1 秒,按照我们线程池的设定,最终会有一个任务会被拒绝,并执行我们自定义的拒绝策略。
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
2,
2,
6,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadFactory() {
final String name = "自定义-线程-";
final AtomicInteger num = new AtomicInteger(1);
@Override
public Thread newThread(@NotNull Runnable r) {
return new Thread(r, name + num.incrementAndGet());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("抱歉," + r.getClass().getName() + "被拒绝执行");
}
}
);
try {
poolExecutor.execute(() -> {
try {Thread.sleep(1000);} catch (Exception ignored) {}
System.out.println(Thread.currentThread().getName() + "被执行");
});
poolExecutor.execute(() -> {
try {Thread.sleep(1050);} catch (Exception ignored) {}
System.out.println(Thread.currentThread().getName() + "被执行");
});
poolExecutor.execute(() -> {
try {Thread.sleep(2000);} catch (Exception ignored) {}
System.out.println(Thread.currentThread().getName() + "被执行");
});
poolExecutor.execute(() -> {
try {Thread.sleep(2050);} catch (Exception ignored) {}
System.out.println(Thread.currentThread().getName() + "被执行");
});
poolExecutor.execute(() -> {
try {Thread.sleep(3000);} catch (Exception ignored) {}
System.out.println(Thread.currentThread().getName() + "被执行");
});
} finally {
poolExecutor.shutdown();
}
}
}
执行结果:
抱歉,com.example.juc.threadPool.Main$$Lambda$5/1915503092被拒绝执行
自定义-线程-2被执行
自定义-线程-3被执行
自定义-线程-2被执行
自定义-线程-3被执行
通过执行结果,我们发现 5 个任务有 4 个任务被执行,有一个任务被拒绝,且执行了我们自定义的拒绝策略,符合预期。
1.6.2.3. 阻塞队列说明
线程池构造方法中的 workQueue 是基于阻塞队列实现的,即采用生产者消费者模式,在 Java 中需要实现 BlockingQueue 接口。但 Java 已经为我们提供了 7 种阻塞队列的实现:
ArrayBlockingQueue
:一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue
:一个由链表结构组成的有界阻塞队列,在未指明容量时,容量默认为 Integer.MAX_VALUE。
PriorityBlockingQueue
:一个支持优先级排序的无界阻塞队列,对元素没有要求,可以实现 Comparable 接口也可以提供 Comparator 来对队列中的元素进行比较。跟时间没有任何关系,仅仅是按照优先级取任务。
DelayQueue
:类似于PriorityBlockingQueue,是二叉堆实现的无界优先级阻塞队列。要求元素都实现 Delayed 接口,通过执行时延从队列中提取任务,时间没到任务取不出来。
SynchronousQueue
:一个不存储元素的阻塞队列,消费者线程调用 take() 方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用 put() 方法的时候也会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。
LinkedBlockingDeque
:使用双向队列实现的有界双端阻塞队列。双端意味着可以像普通队列一样 FIFO(先进先出),也可以像栈一样 FILO(先进后出)。
LinkedTransferQueue
:它是 ConcurrentLinkedQueue 、 LinkedBlockingQueue 和 SynchronousQueue 的结合体,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行为一致,但是是无界的阻塞队列。
在 ThreadPoolExecutor 中一般使用 ArrayBlockingQueue 、 LinkedBlockingQueue 、 PriorityBlockingQueue 的情况较多。
1.6.3. 禁用Executors
我们可以看到很多文章都在说在生产环境的代码中是禁止使用 Executors 去构建线程池的。在探讨 Executors 的问题之前,我们先来看看 Executors 给我们几种线程池。
1.6.3.1. Executors中的线程池
第一种:定长线程池(FixedThreadPool)
- 特点:只有核心线程,线程数量固定,执行完立即回收,任务队列为链表结构的有界队列。
- 场景:控制最大并发数。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
第二种:定时线程池(ScheduledThreadPool )
- 特点:核心线程数量固定,非核心线程数量无限,执行完闲置 10ms 后回收,任务队列为延时阻塞队列。
- 场景:执行定时或周期性的任务。
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
第三种:可缓存线程池(CachedThreadPool)
- 特点:无核心线程,非核心线程数量无限,执行完闲置 60s 后回收,任务队列为不存储元素的阻塞队列。
- 场景:执行大量、耗时少的任务。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
第四种:单线程化线程池(SingleThreadExecutor)
- 特点:只有 1 个核心线程,无非核心线程,执行完立即回收,任务队列为链表结构的有界队列。
- 场景:不适合并发但可能引起 IO 阻塞性及影响 UI 线程响应的操作,如数据库操作、文件操作等。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
1.6.3.2. Executors中的线程池的弊端
使用 Executors 中提供的 4 种线程池的是很容易造成系统出现 OOM(内存溢出)的风险的。
对于 FixedThreadPool 和 SingleThreadExecutor 而言,其主要问题是请求堆积造成的 OOM ,之所以会造成 OOM 的原因是因为阻塞队列均采用 LinkedBlockingQueue ,但是没有设置长度,从而造成内存泄漏。
对于 CachedThreadPool 和 ScheduledThreadPool 而言,其主要原因是线程数最大是 Integer.MAX_VALUE,可能会创建数量非常多的线程,直至系统 OOM。
2. 进阶篇
2.1. volatile关键字
2.1.1. 并发编程的三个概念
原子性
:原子性指的是一个操作或者多个操作在执行的过程中要么全部执行成功,要不全部不执行。常见的原子操作有基础类型的 读取
和 赋值
操作(i++ 并不是原子操作,其分为读取、加、赋值);java.util.concurrent.atomic.*
中的类的所有操作。
可见性
:可见性值的是当多个线程同时访问一个变量时,其中一个线程修改了该变量的值,其他的线程可以立即读取最新的值。在 Java 中提供的多种方式支持并发操作下的原子性,例如 volatile 关键字,被 volatile 关键字修饰的变量,表示线程的本地内存失效,当一个线程修改了共享变量以后,其会被立即刷新到主内存中,并且其他线程中的共享变量全部失效,需要从主内存中重新加载共享变量。当然 synchronized 和 Lock 也可以保证变量的可见性。
有序性
:有序性指的是程序的执行顺序是按照代码的执行顺序执行的。在JVM的内存模型中,为了保证程序的执行效率是允许编译器和处理器对指令进行重排序的,这种指令重排对单线程而言是没有任何的影响的,但是在多线程情况下可能会存在问题,因此 Java 提供了 volatile 关键字用来禁止指令重排。
从上面的描述我们可以知道,volatile 可以保证有序性和可见性,但是无法做到原子性。
2.1.2. Java的内存模型JMM
Java 线程之间的通信是由 Java 的内存模型(JMM)决定的,JMM决定一个线程对共享变量的写入何时对两一个线程可见。从抽象的角度来看,JMM 定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有自己的本地内存(工作内存),线程在使用共享变量时是先将共享变量从主内存中加载到本地内存(工作内存)中,然后完成对变量的读取的赋值。本地内存(工作内存)是JMM的一个抽象概念,并不是真实存在的。
如上图的 JMM 模型所示,倘若线程 A 和线程 B 进行通信,那么需要线程 A 修改本地内存中的共享变量,然后 JMM 会将线程A修改的共享变量刷新到主内存中,然后通他线程 B 重新刷新本地缓存,获取到线程 A 修改后的共享变量,从而完成线程 A 和线程 B 之间的通信。
2.1.3. volatile的特性
特性一:保证可见性,不保证原子性
当一个线程对一个被 volatile 修饰的变量进行写操作时,JMM 会线程的本地内存(工作内存)中的变量重新刷新到主主内存中,并且会导致其他线程中的 volatile 变量缓存失效。
特性二:禁止指令重排
指令重排是编译器和执行器为了优化执行性能而对指令序列进行排序的一种手段。指令排序需要遵循以下规则:
- 指令重排不会对存在数据依赖关系的操作记性排序。例如对于
a = 1; b = a
的代码而言并不会出现指令重排的情况,因为 a 和 b 存在依赖关系。 - 指令重排后,对于单线程的执行结果不能被改变。例如
a = 1; b = 2; c = a + b;
代码经过指令重排后可以是这样的b = 2; a = 1; c = a + b;
对于单线程而言质心结果没有发生改变。
指令重排在单线程执行中是没有任何问题的,但是在多线程中却不能保证结果的正确性,例如如下的情况:
public class Demo {
private int a = 0;
boolean status = false;
public void setValue() {
a = 1;
status = true;
}
public void printValue() {
if (status) {
System.out.println("a = " + a);
}
}
}
在如上的代码中,当 printValue() 被调用时,如果 status 为 true 时,那么就打印 a 的值。现在假设有两个线程 A、B,线程A调用 setValue() 方法,线程B调用 printValue() 方法。现在的问题是当线程B打印出了 a 的值,是不是意味着打印出的一定是 a = 1
?
答案是否定的,因为可能会存在指令重排的情况,例如 a = 1; status = true;
被重排成 status = true; a = 1
是合法的,在单线程情况下是没有问题的,但是在多线程中就会存在问题,例如线程A在执行时被指令重排了,那么会先执行 status = true;
,此时线程B判断status是成立的,会打印出 a,但是线程A还没来得及执行 a = 1;
,那么线程B打印的是 a = 0
而不是 a = 1
。
2.1.4. volatile不适用的场景
上面也说明 volatile 不能保证原子性,所以对于多线程的非原子操作是存在问题的,例如:a++
操作:
我们将会开辟 3 个线程,每个线程对共享变量执行 100 次自增,最终话的共享变量结果不等于 3000
public class Demo {
private volatile int a = 0;
public void increase() {
a++;
}
public static void main(String[] args) {
Demo demo = new Demo();
for (int i = 0; i < 3; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
demo.increase();
}
}).start();
}
// 保证前面的线程执行结束
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println("a = "+demo.a);
}
}
执行结果:
a = 2845
执行结果符合预期,要想得到 3000 的结果,需要给自增操作加锁,例如我们可以使用 synchronized 实现同步锁:
...
public synchronized void increase() {
a++;
}
...
执行结果:
a = 3000
2.1.5. 单例模式的双重锁
public class TestInstance{
private volatile static TestInstance instance;
public static TestInstance getInstance(){ //1
if(instance == null){ //2
synchronized(TestInstance.class){ //3
if(instance == null){ //4
instance = new TestInstance(); //5
}
}
}
return instance; //6
}
}
我们知道 JVM 的编辑器和执行器会对执行指令进行重排,以便提升执行效率,对单例模式而言,我们需要对实例对象使用 volatile 关键字进行修饰,问题出现在第 5 行的 instance = new TestInstance();
,其可以分为三行为代码:
a. memory = allocate() //分配内存
b. ctorInstanc(memory) //初始化对象
c. instance = memory //设置instance指向刚分配的地址
上面的代码在运行时,可能会被重排:由 a-b-c
的执行顺序重排重 a-c-b
。在多线程的情况下,当A线程执行到第 5 行,B 线程执行到第 2 行,A 线程已经完成了 a 和 c ,但是在未来得及完成 b 步时, B 线程执行了第 2 行,判断 instance == null
很明显是不成立的,因为 c. instance = memory
已经使得 instance != null
,最终话 B 线程会返回一个空对象。
有人可能会问,为什么不将 if(instance == null)
包裹在 synchronized
中,包裹在其中不就不担心指令重排的问题了吗?
是的,将 if(instance == null)
包裹在 synchronized
中确实可以解决指令重排的问题,但是会带来极大的性能开销。使用 synchronized 是很慢的,对于单例模式而言,对象只需要创建一次即可,其他的 N 次请求只需要 return 已创建的对象即可。倘若将 if(instance == null)
包裹在 synchronized
中,意味着所有的请求都需要被上锁,并发性能无疑是被降低的,但是将 volatile 和 synchronized 配合使用就能完美的解决这个问题。
2.2 ThreadLocal
2.2.1. 强、软、弱、虚引用
在介绍 Java 中线程的 ThreadLocal 之前,我们先来了解一下 Java 中的四大引用:强、软、弱、虚。
2.2.1.1. 强引用
我们在开发中使用的大部分的应用都是引用,通常我们所说的 “new 一个对象” ,这个对象与指向内存中的一块地址,对象引用内存地址之间就是强引用,例如:
Person p = new Person();
p
与 new Person()
之间的应用关系就是强引用,强引用是不会被 JVM 进行 GC (垃圾回收)的,当过多的强引用占满了 JVM 的内存,那么就会报 OOM (OutOfMemory),也就是我们常说的内存溢出。
对于强引用而言,我们可以通过手动的 p = null;
帮助 GC
进行垃圾回收。
2.2.1.2. 软引用
软引用是一种相对强引用弱化了一些的引用,当内存充足时,软引用的对象是不会被回收的,当内存不足时,软引用的内存将会被回收。
下面我们来感受一下软引用
创建一个10M的软引用对象,然后进行GC,查看软引用对象是否被GC回收
public class SoftReferenceClient {
public static void main(String[] args) {
// 10M
SoftReference<byte[]> softReference = new SoftReference<>(new byte[1024 * 1024 * 10]);
// 打印软引用对象
System.out.println(softReference.get());
// 尝试GC
System.gc();
try {Thread.sleep(2000);} catch (Exception ignored) {}
// 打印软引用对象
System.out.println(softReference.get());
}
}
执行结果:符合预期没有被回收
[B@1a6c5a9e
[B@1a6c5a9e
下面我将JVM的最大运行内存设置为20M,创建一个软引用的10M对象和创建一个12M的字节数组,总的内存大小必然大于20M,应该不会出现OOM,并且在创建完12M的字节数组后再次获取软引用中的10M的对象必然是null。
public class SoftReferenceClient {
public static void main(String[] args) {
// 创建一个10M的软引用
SoftReference<byte[]> softReference = new SoftReference<>(new byte[1024 * 1024 * 10]);
System.out.println(softReference.get());
// 创建一个12M的字节数组
byte[] other = new byte[1024 * 1024 * 12];
System.out.println(softReference.get());
}
}
设置JVM最大运行内存为20M
执行结果:符合预期
[B@1a6c5a9e
null
2.2.1.3. 弱引用
弱引用的使用和软引用类似,只是关键字变成了 WeakReference,JVM的CG遇到弱引用将会直接对其进行回收。
下面我们将创建一个弱引用,并且获取一次其中的对象,此时其中的对象可以被获取;紧接着我们强制 GC ,然后获取若引用中的对象,应该为null。
public class WeakReferenceClient {
public static void main(String[] args) {
// 创建一个弱引用对象
WeakReference<Object> weakReference = new WeakReference<>(new Object());
System.out.println(weakReference.get());
// 执行GC
System.gc();
try {Thread.sleep(2000);} catch (Exception ignored) {}
System.out.println(weakReference.get());
}
}
执行结果:符合预期
java.lang.Object@1a6c5a9e
null
倘若一个弱应用的对象同时被强引用,那么GC将不会回收弱引用中对象,我们将上面的代码进行略微的调整,将 new WeakReference<>(new Object())
改为 Object o = new Object();
new WeakReference<>(o)
,这样一个对象同时被o进行强引用、被 weakReference 进行若引用,整体上为强引用,不会被 GC。
public class WeakReferenceClient {
public static void main(String[] args) {
Object o = new Object();
WeakReference<Object> weakReference = new WeakReference<>(o);
System.out.println(weakReference.get());
System.gc();
try {Thread.sleep(2000);} catch (Exception ignored) {}
System.out.println(weakReference.get());
}
}
执行结果:符合预期
java.lang.Object@1a6c5a9e
java.lang.Object@1a6c5a9e
2.2.1.4. 虚引用
虚引用”顾名思义,就是形同虚设,与其他几种引用都不同,虚引用并不会决定对象的生命周期。如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收。
虚引用也称为“幽灵引用”或者“幻影引用”,它是最弱的一种引用关系。
A a = new A();
ReferenceQueue<A> rq = new ReferenceQueue<A>();
PhantomReference<A> prA = new PhantomReference<A>(a, rq);
虚引用主要用来跟踪对象被垃圾回收器回收的活动。
虚引用与软引用和弱引用的一个区别在于:虚引用必须和引用队列 (ReferenceQueue)联合使用,当垃圾回收器准备回收一个对象时,如果发现它还有虚引用,就会在回收对象的内存之前,把这个虚引用加入到与之 关联的引用队列中。
2.2.1.5. 总结
引用类型 | 被GC的场景 | 用途 | 生存时间 |
---|---|---|---|
强引用 | 不会被GC | 对象的一般状态 | JVM停止运行 |
软引用 | JVM内存不足时 | 缓存 | 内存不足时 |
弱引用 | GC时即被回收 | 缓存(如Threadlocal) | GC结束后 |
虚引用 | 不知 | 不知 | 不知 |
2.2.2. ThreadLocal是什么
ThreadLocal 是 Java 的线程本地变量,线程与线程之间的 ThreadLocal 是隔离的,一个线程的 ThreadLocal 只能在这个线程中被使用。ThreadLocal 一般用作线程的状态记录器,例如在 spring 的 @Transactional
、动态数据源等均使用了 ThreadLocal 用于存储线程 connection,从而保证同一个线程中的不同方法栈中使用的 connection 是同一个。
2.2.2. ThreadLocal如何使用
ThreadLocal 就是一个线程独享变量,我们通过 new ThreadLocal<>()
实例化这个变量,然后可以在不同的线程中去使用这个变量,线程与线程之间对于这个变量时隔离的,是不会相互污染的。
ThreadLocal 中主要有三个方法:
set(Object o):void
:用于向当前线程的 ThreadLocal 存储信息。
get():Object
:用于从当前线程的 ThreadLocal 获取存储信息。
remove():void
:用于删除当前线程的 ThreadLocal 的存储信息。当一个线程的 ThreadLocal 不再被使用后,我们强力推荐调用 ThreadLocal 的 remove() 方法清除存储信息,避免内存溢出。
public class Demo {
public static void main(String[] args) {
ThreadLocal<String> threadLocal = new ThreadLocal<String>() {
@Override
protected String initialValue() {
return "hello";
}
};
new Thread(() -> {
threadLocal.set("你好");
System.out.println(Thread.currentThread().getName() + ":" + threadLocal.get());
threadLocal.remove();
}, "线程1").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + ":" + threadLocal.get());
threadLocal.remove();
}, "线程2").start();
}
}
执行结果:
线程1:你好
线程2:hello
我上面演示的是使用一个 ThreadLocal 变量,其实一个线程中可以使用多个 ThreadLocal 变量,我们只需要提前 new
出来即可,记得使用完成之后调用 remove()
方法清除存储信息。
2.2.3. ThreadLocal的实现原理
2.2.3.1 ThreadLocal.set(T value)
接下来我们来探索一下ThreadLocal中的set() 方法的是实现原理
[ThreadLocal.class]
public void set(T value) {
// 1.获取当前线程
Thread t = Thread.currentThread();
// 2.获取当前线程中的成员属性 threadLocals:ThreadLocalMap
ThreadLocalMap map = getMap(t);
// 3.成立条件:map 不为空,设置value值到ThreadLocalMap
if (map != null) {
map.set(this, value);
}
// 4.成立条件:map值为空,初始化ThreadLocalMap
else {
createMap(t, value);
}
}
我们可以看到,通过 2 中的 getMap(t) 我们就能获取到 t(当前线程)的成员属性threadLocals:ThreadLocalMap,然后后面将value设置到了当前线程的threadLocals中,从这里我们也就能看出,为什么ThreadLocal是线程隔离的了,因为它就是将值设置到当前线程的成员变量中的。
我们看一下getMap():就是获取当前线程的成员变量
[Thread.class]
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
我们进入 map.set(k,v) 看一下:
[ThreadLocal.class]
private void set(ThreadLocal<?> key, Object value) {
// 1.table是一个Entry数组
Entry[] tab = table;
int len = tab.length;
// 2.通过 ThreadLocal 对象的hascode与table长度取模,获取到期应该存储的目标下标
int i = key.threadLocalHashCode & (len-1);
// 3.条件不成立:某一次的循环监测到 table[i] == null,
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// 4.成立条件:Entry中的get()返回值等于key(方法参数)
if (k == key) {
// 覆盖原本Entry中的value
e.value = value;
return;
}
// 5.成立条件:Entry中的get()返回为空,进行替换赋值
if (k == null) {
// 置换
replaceStaleEntry(key, value, i);
return;
}
}
// 6.程序到此必然tab[i] == null,所以new一个Entry赋值给tab[i]
tab[i] = new Entry(key, value);
// 7.map中存储的元素加1
int sz = ++size;
// 8.成立条件:table中Entry的get() 方法返回值为没有null && table的长度大于预设的threshold的大小
if (!cleanSomeSlots(i, sz) && sz >= threshold)
// 9.刷新table(将table数组的长度扩展成原来的2倍)
rehash();
}
在 2 中,我们看到其使用的是 key.threadLocalHashCode & (len-1)
相当于是使用key(也就是ThreadLocal的对象)的hashCode与 table的长度-1
取模,保证得到的 i 必然是合法的。
如果 3 中的不成立,也就意味着 4、5都没有被执行,也就是说还没有进行赋值,程序必然会跳到 6 ,3 成立相等于 tab[i] == null,若依 6 必然不会已经存在的元素。
如果 4 成立,也就意味着 Entry 中的 get() 方法的返回值等于当前的 ThreadLocal,直接覆盖原本的 value 并结束方法。(后面我会详细介绍Entry
)
如果 5 成立,也就意味着 Entry 中的 get() 方法的返回值等于null,所以就使用目标 key(当期的ThreadLocal) 和 value (当前的ThreadLocal 要存储的值)置换 tab[i]。
上面的 8、9 的目的是为了对 table 进行扩容,扩容的前提是 8 成立,既 table中Entry的get() 方法返回值为没有null(即!cleanSomeSlots(i, sz) 为true
) && table的长度大于预设的 threshold 的大小。
上述的代码中频繁的出现了 Entry
,下面我们就来看一下 Entry
是个什么东西:
public class ThreadLocal<T> {
...
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
...
}
...
}
原来 Entry
是 ThreadLocal
的内部类 ThreadLocalMap
的内部类,并且继承了 WeakReference
若引用,且弱引用的 <T>
为ThreadLocal,在上刚开始进行赋值时使用的 map.set(this, value);
其中 map 就是 ThreadLocalMap,set(k,v) 中的 k 就是ThreadLocal,所以 ThreadLocal 属于弱引用。
除此以外,我们可以看到这个 Entry 的初始化方式只能通过构造器去初始化,将 k 存储到 WeekReference 中, v 存储到 Entry 的成员属性 Object value
中。对于 Entry 而言,我可以以知道的是,我们可以直接调用其 entry.get()
和 entry.set()
方法实现对弱引用的存取。
接下来,我们进入 createMap(t, value)
中看一下 createMap 方法中是如何实现的:
[ThreadLocal.class]
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
我们可以发现,createMap 中是直接对 ThreadLocalMap 进行了实例化,并传入了两个参数:this 表示 ThreadLocal
、firstValue 表示要存储的值
。下面我们直接进入 ThreadLocalMap 的有参构造中:
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 1.初始化了一个大小为INITIAL_CAPACITY(16)的Entry数组
table = new Entry[INITIAL_CAPACITY];
// 2.计算 ThreadLocal 的hashCode,并与Entry数组的 `长度-1` 进行取模
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 3.实例化了一个Entry对象,并存储到了table(Entry数组)中
table[i] = new Entry(firstKey, firstValue);
// 4.设置大小为1
size = 1;
// 5.设置扩容阈值:threshold = len * 2 / 3;
setThreshold(INITIAL_CAPACITY);
}
下面我们只要介绍一下上面 3 的意义,对于 3 我们可以看到,其就是初始化了一个 Entry 并将 Entry 存入了 table(Entry数组),如果你有印象的话,上面我们介绍过 Entry 是实现了 WeekReference 的对象,Entry 本身有一个成员属性 value,且 Entry 是通过有参构造方法进行初始化 弱引用 和 成员属性 value 的。对于上面的 3 new Entry(firstKey, firstValue)
其目的就是将 firstKey(当前ThreadLocal) 进行弱引用存储,将 firstValue(存储在ThreadLocal中的值) 进行 value 赋值。
整个set方法的调用过程大概如下图所示:
实例化一个ThreadLocal tr,tr 强引用 ThreadLocal对象,tr 调用 set 方法设置值 value ,最终将 tr 作为 k(而 Entry 中的 k 又是弱引用) value 作为 v 存储到 Entry 中,Entry 是 table 数组的元素, table 数组是 当前线程
内部属性 threadLocals的成员属性,最终将 tr 存储到了当前线程中,所以 ThreadLocal 具有线程隔离。
2.2.3.2 ThreadLocal.get()
[ThreadLocal.class]
public T get() {
// 1.获取当前线程
Thread t = Thread.currentThread();
// 2.获取当前线程中的ThreadLocalMap
ThreadLocalMap map = getMap(t);
// 3.成立条件:map不为空,即在调用get前已经调用了set
if (map != null) {
// 4.将ThreadLocal作为key获取Map中的Entry
ThreadLocalMap.Entry e = map.getEntry(this);
// 5.成立条件:在调用get前已经调用了set且没有调用remove
if (e != null) {
@SuppressWarnings("unchecked")
// 6.强转并返回
T result = (T)e.value;
return result;
}
}
// 7.成立条件:ThreadLocal未进行初始化(为调用set)
return setInitialValue();
}
下面我们进入 4 ,看一下是如何从map中获取目标值的,其参数 this 指的是当前 ThreadLocal
[ThreadLocal.class]
private Entry getEntry(ThreadLocal<?> key) {
// 1.获取ThreadLocal在table数组中的索引
int i = key.threadLocalHashCode & (table.length - 1);
// 2.尝试获取目标Entry
Entry e = table[i];
// 3.成立条件:Entry不为null 且 Entry中的key(弱引用)等于当前ThreadLocal
if (e != null && e.get() == key)
return e;
// 4.尝试获取失败
else
// 5.再次尝试获取
return getEntryAfterMiss(key, i, e);
}
从上面我们可以看到,经过计算后的索引 i
存储的不一定是目标 Entry ,因为 i
是根据 hascode 和数组长度取模得出的,是容易重复的,当意识到取到了 Entry 不会想要的,会调用 getEntryAfterMiss 进行再次获取,getEntryAfterMiss 的三个参数分别是 当前ThreadLocal
、计算得出的索引
、按照索引所所取出的Entry
,下面我们进入 getEntryAfterMiss(key, i, e)
:
[ThreadLocal.class]
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
// 这个循环的目的是在尝试寻找 k 等于当前 ThreadLocal 的 Entry
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
// 清除陈旧条目,当key为null,也就意味着value取不出了,所以要清除key为null的Entry
expungeStaleEntry(i);
else
// 按照规则计算下一个索引
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
到此为止 get() 方法中的 getEntry() 就结束了,下面我们来看一下 get() 方法中的 setInitialValue() 方法:
[ThreadLocal.class]
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
if (this instanceof TerminatingThreadLocal) {
TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
}
return value;
}
看到 setInitialValue 方法,真的是非常熟悉,基本上和 set(T value)
一样,二者的区别在于:在`set(T value) 中 value 是作为参数传递的,在 setInitialValue 方法中 value 是调用 initialValue 方法获取的。
所以我们在实例化 ThreadLocal 的时候可以重写其 initialValue 方法,从而实现设置默认值的效果。
2.2.3.3 ThreadLocal.remove()
下面我们进入 ThreadLocal 的 remove 方法,探索一下它的工作原理:
[ThreadLocal.class]
public void remove() {
// 获取当前线程中的成员属性ThreadLocalMap
ThreadLocalMap m = getMap(Thread.currentThread());
// 成立条件:成员属性不为空,说明当前线程中存在ThreadLocal
if (m != null) {
// 将当前ThreadLocal作为参数调用ThreadLocalMap的remove
m.remove(this);
}
}
下面我们进入remove方法:
[ThreadLocal.class]
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
// 1.计算当前ThreadLocal所在的索引
int i = key.threadLocalHashCode & (len-1);
// 2.结束条件:获取到的Entry为null 或者 获取到的Entry的弱引用等于当前ThreadLocal
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
// 3.找到目标Entry
if (e.get() == key) {
// 4.将若引用置为null,之后e.get()返回值为null
e.clear();
// 5.清除掉e.get()返回值为null的Entry
expungeStaleEntry(i);
return;
}
}
}
上面的方法是在尝试清除掉当前线程中存储的ThreadLocal,进入 4 表示找到了目标Entry,随即调用了 clear() 方法。5 中是在清除 Entry 的 get() 方法的返回值为 null 的 Entry。
2.2.3.4总结
从 ThreadLocal 的 get() 、set() 、remove() 方法中我们可以看到,作者都在有意的尝试清除 Entry 的 get() 方法的返回值为 null 的 Entry,作者这样做的原因是为了尽可能避免 ThreadLocal 导致的 内存泄漏。
我们知道 ThreadLocal 的存储在线程中的,对于现在的开发中,线程池是被广泛使用的,线程池中的线程有一个特点是会被重复利用且存活时间较长,这就意味着同一个线程被多个重复使用后可能会造成大量的 ThreadLocal,如果在Entry中不使用弱引用指向 ThreadLocal 对象,而是使用强引用指向 ThreadLocal 对象,那么必然会造成内存移除,下面我将解释为什么?
线程终结后,ThreadLocal 对象还会存在吗?
必然不会存在,如果线程终结,那么如下的引用链将不在存在:
如果我使用完 ThreadLocal,将其置为 null ,还有没有内存溢出的风险?
我们的答案时分两种情况,一种情况:当你没有使用线程池,执行完业务逻辑线程消亡了,那么自然不用担心内存溢出的问题;另一种情况:使用线程池,在使用完 ThreadLocal 后将其置为 null, 那么ThreadLocal 的强引用结束了,此时只有一个 Entry 对 ThreadLocal 存在弱引用,等到下次 GC 该 ThreadLocal 自然被回收了,ThreadLocal 对象不会再次内存泄漏,但是 Entry 里的 value 是一直存在的,且 Entry 里的 value 一般是远大于 Entry 里的 key 的,当 key 为 null , 那么其对应的 value 就取不出了,形成了内存孤岛,必然导致内存泄漏。
我们在线程池中使用 ThreadLocal ,如何避免内存泄漏?
很简单,当我们使用完 ThreadLocal 后,记得调用一次其 remove() 方法即可。在 remove() 方法中会清除 Entry 中的若引用对象,即将 key 置为 null,并会清除掉 Entry 中 key 为 null 的 Entry,避免内存泄漏。所以在使用完 ThreadLocal 后调用其 remove() 方法是一个好习惯!
倘若如下的代码中使用的死线程池执行,那么还是会有内存泄漏的问题:
ThreadLocal<String> tr= new ThreadLocal<>();
tr.set("你好");
tr.remove();
即使调用了 remove() 方法,只是清除了 ThreadLocal 对象中的值和 Entry,但并没有影响到 tr 这个引用本身。因此,tr 仍然保持对 ThreadLocal 对象的强引用。又因为使用的是线程池,线程长久存活,那么 tr 也是一种存活的,即 tr 一直保持 ThreadLocal 对象的强引用。
综上所述,要想真正避免因使用 ThreadLocal 导致的内存泄漏,那么我们就要在使用完 ThreadLocal 之后,先调用其 remove() 方法,然后将 ThreadLocal 对象置为 null(helpe GC
)。
版权声明:本文标题:【Java】4万字解读Java多线程,这一篇文章就够了 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/dongtai/1725925245a1049292.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论