admin管理员组文章数量:1599782
导言:
大家应该都或多或少学习过Reentranlock的源码,说实话AQS相关的阻塞队列、调价队列、排它锁、共享锁的阅读还是需要花时间细心看的,本文希望能梳理思路,方便你真正读代码时不至于绕迷糊。
本文基于我的思维导图来讲解,这个思维导图还是有些价值的:https://www.processon/view/link/5ede205d5653bb6963dc6d56
一、从Reentranlock说起
0、名词解释
代码中有些函数的命名规则
lock()
代表是最高层的加锁API,他调用acquire
acquire
是第二高的加锁API,它里面包含了tryAcquire
这种尝试加锁的API,也包含了入队阻塞等APItryAcquire
:尝试加锁,不影响阻塞队列(这点很重要),也就是说它不会操作阻塞队列的,代码里跟head和tail毫无关系tryRelease
:尝试解锁,他同样跟队列毫无关系- CAS:这个不用我多说了吧,拿着旧值去改新值(如果感兴趣的可以百度AtomicStampReference)
- 阻塞:java中使用
LockSupport.park(this);
来释放CPU - 同步器:管理公平性,不涉及队列
- AQS:围绕队列来管理锁
1、AQS涉及那些数据结构
- 阻塞队列:AQS最常用的是一个双向队列,持有
head
和tail
指针,并且每个Node
有prev
和next
指针。 - 条件队列:condition里条件队列持有的是单向队列,它是作用是先在条件队列里等到
资源
,然后将Node结点转移到阻塞队列里去正式等待锁
2、什么是排它锁和共享锁
排它锁是只有一个线程能占用锁,而共享锁可以多个线程占有锁。
ReentrantLock明显是排它锁(或者叫独占锁)
3、AQS和Reentranlock的关系
Reentranlock只写了最基本的接口,而接口的内部全部是通过同步器转发的,即Reentranlock持有同步器,这个同步器有2个实现类。
在构造Reentranlock对象时就会指定是公平锁还是非公平锁(默认)。
private final Sync sync;
而同步器的作用是提供基本的try加锁写锁
的包装逻辑(为了区分公平),而加锁入队等逻辑是AQS实现的
4、公平性和非公平
Reentranlock借助于同步器来管理公平性问题,而前面我介绍了同步器只实现了try加锁解锁的逻辑,管理队列等内容全靠AQS来做。
try加锁解锁指的是
tryAcquire
、tryRelease
我在AQS的思维导图中有用绿色线标注发生公平性问题的地方https://www.processon/view/link/5ede205d5653bb6963dc6d56
但我前面所述只是为了快速理解公平性问题,实际上同步器中除了try方法中还有1个地方发生了公平性问题,那就是lock到acquire强制获取锁的之间。
从上面两张图我们可以了解到非公平锁确实很欠,在lock里还尝试了一次CAS。
但不得不提的是公平性问题在同步器里是不影响队列的,它只操作AQS里的state
变量和exclusiveOwnerThread
备注,
exclusiveOwnerThread
实际上不在AQS中,而是在AQS的上层AbstractOwnableSynchronizer
中
5、为什么同步器不操作队列
不知道你学过synchronized的锁升级没有,可以简单阅读下我的synchronized锁升级博客:https://blog.csdn/hancoder/article/details/120421993
据我了解,AQS里锁升级的思想甚至是synchronized的前辈,因为最开始JVM版本中synchronized也是无论如何都入队,这显然在很多不发生锁冲突的场景下非常消耗性能。
synchronized的思想是依赖对象头里的mark_word来管理锁升级,它一般由64bit,它在大多数情况下只需要CAS改变后几位,如果成功用CAS从初始无锁0状态改为1状态
同步器的思想和synchronized很类似,我只用一个变量state或几bit就能管理大多数情况下的锁了,何必要用你AQS队列
或者synchronized重量级锁里的monitor
呢?白白浪费性能
6、什么时候只操作state不操作队列
前面或多或少地提过,当他发现state变量为0时他会尝试只操作state变量,而不操作队列。
当然这里有公平性问题,公平性还会以阻塞队列无元素为前提
7、head、tail和阻塞队列
不知道你见过没见过有人说head不包含在阻塞队列里?
确实应该这么说
因为看到state为0后会判断有没有阻塞队列前辈,此时head是算前辈的,所以公平性下不能CAS改变state。
但是第二个线程(当前线程)入队后会判断一下head.next是不是自己了(或者说node.prev是不是head),是的话就会用tryAcquire尝试设置state,设置成功后再设置head(此时不设置tail,以后再解释?)
8、入队流程
tryAcquire失败后就会到入队的流程
先把线程包装为Node(该node类型是.EXCLUSIVE),同样这里可以参考下我的思维导图
- addWaiter负责添加到队列
- acquireQueued负责阻塞释放cpu和醒来
addWaiter有2个分支,
(1)如果CAS设置tail成功,那么就成功入队了;
(2)第一次CAS-tail没成功,说明和其他线程竞争失败了,交给enq()
函数入队
这里有个细节是为什么tail==null时不使用CAS?而直接交给
enq()
?md我也开始疑惑,我最开始的预测是tail!=null可以判断有队列,而tail==null不可以推测出有没有队列;
看了enq里面我才发现答案,因为tail==null时还要
补充结点
enq会不断地使用for来循环,
1)发现tail==null就判断为没有队列,所以补充head和tail
2)tail!=null,已经有队列了,不是偏向锁,已经是重量级锁了,正常入队
9、补充结点
我们把synchronized的偏向锁和重量级锁(队列)概念迁移到AQS上,它们是可以对应上的。
我们用A线程代表当前持有锁的结点,B线程代表第二个来获取锁的线程
我们之前提过偏向锁时没有涉及阻塞队列,那么涉及阻塞队列时得补充一个结点让它代表当前占用state变量的线程。
补充的该结点不涉及线程信息,等前线程unlock释放锁时,它先把state
和排它线程标记
置空,然后判断到head!=null
,则代表他需要唤醒后面的结点,不能只处理state;
我们可以了解到在清空state和唤醒后续结点的过程中如果是非公平的是可能被其他线程抢了state的,没有关系,唤醒的线程发现head.next是自己,于是CAS-head自旋
这部分的流程图见
10、阻塞流程
在8里我们提了入队后该阻塞了,即acquireQueued
的逻辑
它首先把整体代码放到for循环里,不断询问当前结点的prev指针
1)for里的一个分支是说如果prev是head,那么可以尝试获取下锁,获取成功后设置head后就可以结束acquire
了
2)另一个分支是说还轮不到自己呢,别插队,还是乖乖去做点别的事情或者去睡觉吧
2.1)安心睡觉吧,有人叫醒你(prev.ws=SIGNAL
),即调用parkAndCheckInterrupt
,即调用LockSupport.park(this)
2.2)你前面有人取消了,你把它们踢走
2.3)前面没人叫醒你呢,通知它们让他们叫醒你。即把前面的prev.ws=SIGNAL,这样前面的结点就知道释放锁时要通知人了。
11、叫醒人
我们在这里知道前面结点入队时waitState(ws)是0,即不知道要唤醒别人,所以后面有人排队时要通知前面的人呢
12、被叫醒后
这个逻辑提过了,被叫醒后判断head.next是不是自己,是的话就循环CAS-state,即tryAcquire
发现不是自己后又continue到for里,接着走下面的分支去睡觉了
13、解锁
加锁时操作state等时是需要CAS(0,1)的,而释放锁时操作state无需使用CAS
因为解锁时会判断AQS的持锁线程是否是当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
只要通过上述判断,就可以对state进行减
操作
减到0时同时清除排它线程标记
14、要点
h、t指针是transient volatile
的
state是volatile
的
15、结点的状态
用waitStatus标识结点的状态
- CANCELLED = 1;
- SIGNAL = -1;
- CONDITION = -2;
- PROPAGATE = -3;
用thread表示结点包装的线程
二、条件队列
1、需要先lock才能await
这是因为源码中思想是拿到锁后执行线程安全的内容,调用await后放到条件队列后就可以释放CPU和锁了,等其他线程唤醒它时就重新尝试获取锁。
发现已经在阻塞队列后,它尝试获取锁,尝试获取锁的逻辑就是acquireQueue()
的逻辑,该函数的逻辑是阻塞和被唤醒
2、条件队列数据结构
ConditionObject是AQS的内部类,表示条件队列,该队列有两个属性,即头尾指针
private transient Node firstWaiter;
private transient Node lastWaiter;
条件队列的组成元素也是原来AQS中的Node
,但是不再使用Node里的prev
和next
指针,而是使用nextWaiter
指针,即带头尾指针的单向链表。
/**
指向条件队列中的next结点,或者特殊值SHARED。
因为条件队列仅在排它模式中才可访问,所以无需使用双向队列。
因为条件队列只能是排它模式,所以保留了一个属性来推断共享模式
*/
Node nextWaiter;
每一个condition对应一个条件队列。如线程 1 调用 condition1.await()
方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行。
调用condition1.signal()
触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到阻塞队列的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。
3、acquireQueue()
和acquire
的区别
后者是包括给他构造结点,即addWaiter后调用acquireQueue()
而acquireQueue()
是不管Node,已经有Node,他的任务就是死等
4、添加到条件队列
添加此结点前删除条件队列中被取消的结点,这里不多谈了。
构造一个结点,此时不再是默认的排它类型了,而是new Node(Thread.currentThread(), Node.CONDITION);
因为条件队列中只有有锁才操作,所以无需CAS,直接赋值尾结点,还会进行一些连接操作,如没有头时设置头
5、释放条件队列的锁
在fullyRelease(Node node)
代码中会将state减为0。
他调用了release(savedState)
,即会唤醒其他结点获取锁的,执行完后自己已经没有了锁
然后在没锁的情况下检查是否被其他线程放到阻塞队列中了
int interruptMode = 0;
// 如果不在阻塞队列中,注意了,是阻塞队列
while (!isOnSyncQueue(node)) {
// 线程挂起
LockSupport.park(this);
// 这里可以先不用看了,等看到它什么时候被 unpark 再说
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
6、判断是否还在条件队列
isOnSyncQueue()
的判断还算简单
1)waitStatus==Node.CONDITION,也就是 -2。则可以判断为还是条件队列
2) node.prev==null;判断为还在条件队列。为什么可以判断?我们知道只有head才满足prev为空,所以需要考虑的是极端情况,会不会已经到了head了从而这里判断错误?不会,因为signal是有锁的线程,signal和await线程不可能同时占有锁
3)node.next!=null;这个好解释,因为这个指针是阻塞队列才用的
4)从尾到头遍历阻塞队列,看是否在阻塞队列中
7、检查到在阻塞队列中了
调用acquireQueued()
来获取锁,它是阻塞式等待,即轮不到自己就接着睡,直到等到锁
8、条件队列跳转到阻塞队列时机
前面提了,原来结点已经没有锁了,所以是其他线程将它放到阻塞队列中的,这也符合我们对signal
的理解
9、signal
我们预测signal要干嘛:首先自己有锁,他要转移一个条件队列的结点,转移后自己释放锁,同时让阻塞队列结点竞争锁
到这我们终于想明白了,signal占有锁呢,await怎么可能是head呢?所以6.2)的问题解决了,它不可能是head。
同时我们引入一个新的问题:有没有可能自己拿条件队列时里面没有结点,浪费一次信号量?
我觉得会,因为
signal()
中有一句if (first != null) doSignal(first);
,所以写代码时还是需要注意一下
转移结点的时候还要处理prev和next指针,去掉nextWaiter指针。
开始转移时将node的ws从CONDITION转为0
然后enq(node)就是强制入队阻塞
入队后返回node.prev。判断node.prev.waitStatus别是被取消状态,不然就没人唤醒它了。(我想不到这个情况发生的极端情况,线程中断还有unlock保证啊,为什么需要这个?)
10、await醒来的地方和lock醒来的区别
signal线程unlock时可能唤醒当前线程,此时他判断为在阻塞队列中后acquireQueued强制获取锁
说明下条件队列的唤醒和阻塞队列的唤醒不在一个地方,条件队列唤醒后还在await代码中呢
这是await醒来的地方
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 线程挂起 // 记住条件队列是从这里醒来的
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
流程图中紫色的部分
而阻塞队列唤醒的地方:
所以我们知道一个是await里醒来的,一个是acquireQueued里醒来的
而await醒来的最终还能还会是acquired里醒来的,因为从await醒来后又去调用acquireQueued,所以又可能在阻塞队列中阻塞了
11、await醒来后检查wait中断
这里有个判断
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 线程挂起 // 记住条件队列是从这里醒来的
LockSupport.park(this);
// 检查await中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
先解释下 interruptMode。
- REINTERRUPT(1): 代表 await 返回的时候,需要重新设置中断状态
- THROW_IE(-1): 代表 await 返回的时候,需要抛出 InterruptedException 异常
- 0 :说明在 await 期间,没有发生中断
有以下三种情况会让 LockSupport.park(this); 这句返回继续往下执行:
- 常规路径。signal -> 转移await节点到阻塞队列 -> await获取了锁(unpark)
- 线程中断。在 park 的时候,另外一个线程对这个线程进行了中断
- signal 的时候我们说过,转移以后的前驱节点取消了,或者对前驱节点的CAS操作失败了
- 假唤醒。这个也是存在的,和 Object.wait() 类似,都有这个问题
12、await中断发生在signal前还是后?
这个问题解决思路是根据node.ws来判断,因为signal前的话是CONDITION,而signal后的话因为signal会帮助设置ws为0。所以根据这个规则就可以判断await中断发生在signal前还是后了
上述逻辑的实现函数是transferAfterCancelledWait()
- 1)它先自己尝试CAS将node.ws从CONDITION改为0,改成功说明是signal前的中断,将enq(node)入队后,return true代表是signal前
- 2)上面CAS失败,说明是SIGNAL后,返回false
- 当然这个过程中还有些细节点,比如signal会将该node入队,1)中也会帮助入队。1)是自己的线程肯定没问题,但是如果signal线程正在迁移呢?这时候就需要自旋等待一会,等不到的话先yield让步cpu(和park不一样)
上面的逻辑代码:
final boolean transferAfterCancelledWait(Node node) {
// 用 CAS 将节点状态设置为 0
// 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 将节点放入阻塞队列
// 这里我们看到,即使中断了,依然会转移到阻塞队列
enq(node);
return true;
}
// 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
// signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
// 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
总之我们记住,出来的时候不管被中断没有,都进去阻塞队列了
13、根据中断类型返回
// 1. 如果在 signal 之前已经中断,返回 THROW_IE
// 2. 如果是 signal 之后中断,返回 REINTERRUPT
// 3. 没有发生中断,返回 0
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
14、await中断玩意有什么用呢?
前面提过多次了,条件队列时我们从这里醒来
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 线程挂起 // 记住条件队列是从这里醒来的
LockSupport.park(this);
// 检查await中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
然后在await中又调用acquireQueued
而acquireQueued的作用是获取锁或者继续睡眠。这里也有可能继续睡去,但总的来说acquireQueued返回值基本可以通过Thread.interrupted()
判断。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
从acquireQueued跳出来后去哪了:(下面代码看不懂就看下面的说明)
public final void await() throws InterruptedException {
。。。;
// 第一个acquireQueued只有在被中断过才返回true
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 在await后报告中断
if (interruptMode != 0)
// 可能是报错IE,也可能只会报告RE
reportInterruptAfterWait(interruptMode);
}
acquireQueued如果反映中断过,并且中断不是SIGNAL前发生的异常,那么就标记为REINTERRUPT
然后用reportInterruptAfterWait()
函数报告一下,我们也能猜到了SIGNAL会再制造一个异常跑出去,
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
// SIGNAL之后的,设置线程中断位,不做其他任何事情,不影响线程运行// 更具体的放到三中讲,所以我建议你看完三再来看15、
selfInterrupt();
}
15、await超时机制
什么抛异常或者仅仅置中断位了
- 在SIGNAL之后发生的中断不算超时,因为已经转移到阻塞队列了
- 在SIGNAL之前发生的中断算超时
而我们知道await是可以有时间限制的
public final boolean await(long time, TimeUnit unit)
throws InterruptedException
public final long awaitNanos(long nanosTimeout)
throws InterruptedException
await是有中断机制是,但是如果我们不需要那种SIGNAL之前中断报异常的话,可以使用下面这个函数
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
好了,condition条件队列不多说了,源码可以结合我的思维导图和大佬的博客看https://javadoop/post/AbstractQueuedSynchronizer-2
三、线程中断
java的中断不是 kill -9 pid
杀死线程,而只是标记一下中断状态而已,让线程在合适的时机中进行处理。中断代表线程状态,每个线程都关联了一个中断状态,是一个 true 或 false 的 boolean 值,初始值为 false。
处理中断
一旦中断发生,我们接收到了这个信息,然后怎么去处理中断呢?本小节将简单分析这个问题。
我们经常会这么写代码:
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// ignore
}
// go on
当 sleep 结束继续往下执行的时候,我们往往都不知道这块代码是真的 sleep 了 10 秒,还是只休眠了 1 秒就被中断了。这个代码的问题在于,我们将这个异常信息吞掉了。(对于 sleep 方法,我相信大部分情况下,我们都不在意是否是中断了,这里是举例)
四、CountDownLatch
0、CountDownLatch和CyclicBarrier和Semaphore的思路
- CountDownLatch是初始化一个
state=n
,然后每countdown()一次就是state–。当state变为0后,再使用await时,他判断tryAcquire时,如果发现state==0,就直接判断为获取到了锁,不再操作state了 - CyclicBarrier是用一个条件队列,比如构造器传入4,即代表
count
=4,前3个await来的时候就放到条件队列,第四个await来的时候就先执行command命令,然后把前个唤醒从跳进队列放到阻塞队列 - Semaphore是使用state来操作的,如果state==0代表没有信号量了,就去阻塞队列,如果state>0,就state–后执行程序,代表信号量还够
1、CountDownLatch和CyclicBarrier区别
- CountDownLatch的实现原理是共享模式,CyclicBarrier实现原理是等待队列
- 以倒计时触发火箭发射为例:CountDownLatch的发射逻辑不在CountDownLatch中,而在await之后;而CyclicBarrier的发射逻辑一般在CyclicBarrier构造函数中
static final class Node {
// 标识节点当前在共享模式下
static final Node SHARED = new Node();
// 标识节点当前在独占模式下
static final Node EXCLUSIVE = null;
2、CountDownLatch构造器
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 老套路了,内部封装一个 Sync 类继承自 AQS
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
// 这样就 state == count 了
setState(count);
}
...
}
我们经历之前的说明,发现CountDownLatch很简单,就是先占有state变量,那么await线程当然获取不到锁了,它一直在条件队列中。
注意只是设置了state,没有head相关操作
3、countdown的await
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 和之前的doAcquire差不多,如果发送head.next不是自己,就去阻塞;如果是自己,就尝试获取锁
doAcquireSharedInterruptibly(arg);
}
// 只有当 state == 0 的时候,这个方法才会返回 1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
上面的和tryAcquireShared
传统的 tryAcquire
不同,他不改变state的值,只是判断是不是0。是0的话直接跳过,不是0的话就去入队等待唤醒
4、countdown的入队
我们知道构造函数里只设置了state,没有head呢,那么我们要入队时得补充head吧,这个做法我们在AQS就提过了,用addWaiter(Node.SHARED);
来做,他会补充head并入队当前node
5、countdown的阻塞
阻塞是通过shouldParkAfterFailedAcquire来进行的,如果head.next不是自己,就阻塞,在这里阻塞,从这里唤醒
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 1. 入队 // 这个也很关键,因为之前只设置了state,所以这里补充一下
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 同上,只要 state 不等于 0,那么这个方法返回 -1
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 2
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
6、countdown的唤醒
private void doAcquireSharedInterruptibly(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 被唤醒后来到这里,正常进入if
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
setHeadAndPropagate这个方法判断node.next;为共享结点后,会传播doReleaseShared();继续唤醒后续结点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// h.waitStatus < 0 ,从而调用doReleaseShared();
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
7、countdown的持续唤醒doReleaseShared
上面第一个结点会持续调用doReleaseShared();来唤醒后续
// 调用这个方法的时候,state == 0
private void doReleaseShared() {
for (;;) {
Node h = head;
// 1. h == null: 说明阻塞队列为空
// 2. h == tail: 说明头结点可能是刚刚初始化的头节点,
// 或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了
// 所以这两种情况不需要进行唤醒后继节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了
if (ws == Node.SIGNAL) {
// 这里 CAS 失败的场景请看下面的解读
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
// 在这里,也就是唤醒 t4
unparkSuccessor(h);
}
else if (ws == 0 &&
// 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环
// 否则,就是 head 没变,那么退出循环,
// 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会调用这个方法的
if (h == head) // loop if head changed
break;
}
}
总之他会调用unparkSuccessor继续唤醒,然后继续把自己提升为head,继续唤醒后续的
8、countdown的信号量
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true
// 否则只是简单的 state = state - 1 那么 countDown() 方法就结束了
// 将 state 减到 0 的那个操作才是最复杂的,继续往下吧
if (tryReleaseShared(arg)) {
// 唤醒 await 的线程
doReleaseShared();
return true;
}
// 可以通过该值判断是否是触发出发那个countdown
return false;
}
// 这个方法很简单,用自旋的方法实现 state 减 1
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
五、CyclicBarrier
1、CountDownLatch 初识
CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现。
也就是说,如果CountDownLatch 的构造函数是4,那么调用8次await,将触发2次火箭发射程序
public CyclicBarrier(int parties, Runnable barrierAction) {
this.parties = parties;// 参与的线程数
this.count = parties;// 还没有到栅栏的线程数
this.barrierCommand = barrierAction;
}
private final Condition trip = lock.newCondition();
// 当前所处的“代”
private Generation generation = new Generation();
trip是每个condition队列
凑过了之后调用trip.signalAll();唤醒所有条件队列的,转移到阻塞队列,转移到阻塞队列之后有人唤醒吗?
唤醒所有是用transferForSignal函数依次转移到阻塞队列
CyclicBarrier是论代的,每新生一代,count 会重新置为parties
2、CountDownLatch 的await
// 不带超时机制
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
// 带超时机制,如果超时抛出 TimeoutException 异常
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
dowait里会先获取锁
private int dowait(boolean timed, long nanos) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
。。。
} finally {
lock.unlock();
}
}
所以对count–是线程安全的
3、新的一代
CyclicBarrier是论代的,每新生一代,count 会重新置为parties
private void nextGeneration() {
// 首先,需要唤醒所有的在栅栏上等待的线程
trip.signalAll();
// 更新 count 的值
count = parties;
// 重新生成“新一代”
generation = new Generation();
}
4、代的重置
private int dowait(boolean timed, long nanos) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
。。。;
int index = --count;
// 如果等于 0,说明所有的线程都到栅栏上了,准备通过
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况
ranAction = true;
// 唤醒等待的线程,然后开启新的一代
nextGeneration();
return 0;
}
}
// 如果是最后一个线程调用 await,那么上面就返回了
// 下面的操作是给那些不是最后一个到达栅栏的线程执行的
for (;;) {
trip.await();
也就是说count为0后先执行命令,然后开启新的一代
5、–count>0
这里是在for里调用trip.await();
6、线程
count==0那个线程不进阻塞队列,也不进入dowait的for,直接结束后执行后面的代码
count!=0的线程进入for,并且trip.await();
入条件队列
不要担心唤醒多余的await线程,因为dowait是由锁的,第5个调用await的线程一直没有获取到锁
六、Semaphore
1、Semaphore是共享锁
套路解读:创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
由于 tryAcquireShared(arg) 返回小于 0 的时候,说明 state 已经小于 0 了(没资源了),此时 acquire 不能立马拿到资源,需要进入到阻塞队列等待,虽然贴了很多代码,不在乎多这点了:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
七、读写锁
1、方法层次
lock、不带try的一般在AQS中
带try的一般在各个子类中
// ReadLock
public void lock() {
sync.acquireShared(1);
}
// AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// ReentrantReadWriteLock.Sync
protected final int tryAcquireShared(int unused) {
2、尝试获取读锁思路
- 发现有写锁直接返回-1
- 没有写锁则CAS增加读锁数量compareAndSetState(
3、读写锁的bit设计
读写状态的设计
同步状态在重入锁的实现中是表示被同一个线程重复获取的次数,即一个整形变量来维护,但是之前的那个表示仅仅表示是否锁定,而不用区分是读锁还是写锁。而读写锁需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。
读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读,低16位表示写。
4、第一个读锁
有一个属性持有第一个读锁
- firstReader:线程
- firstReaderHoldCount:线程重入次数
4、读锁实体类
每个线程有个HoldCounter
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
// 计数器
static final class HoldCounter {
// 计数
int count = 0;
// Use id, not reference, to avoid garbage retention
// 获取当前线程的TID属性的值
final long tid = getThreadId(Thread.currentThread());
}
死锁释放
state–
tryAcquire
写锁读锁的try都是一样的,用CAS-state++
tryAcquire
读锁写锁升级
不支持读锁的升级,主要是避免死锁,比如AB线程都要升级写锁,A升级要求B释放读锁,B升级要求释放读锁,形成相互等待
本文标签: 源码AQSconditionCyclicBarrierCountDownLatch
版权声明:本文标题:【AQS源码】condition、CountDownLatch、CyclicBarrier、中断、共享等 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/dongtai/1728323433a1154126.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论