admin管理员组文章数量:1599408
重要入口方法
======
Condition的实现主要包括:条件队列、等待和通知。其中条件队列放的是AQS里的Node数据结构,使用nextWaiter来维护条件队列。等待和通知共有7个方法。
-
signal():唤醒该条件队列的头节点。
-
signalAll():唤醒该条件队列的所有节点。
-
awaitUninterruptibly():等待,此方法无法被中断,必须通过唤醒才能解除阻塞。
-
await():当前线程进入等待。
-
awaitNanos(long):当前线程进入等待,有超时时间,入参的单位为纳秒。
-
awaitUntil(Date):当先线程进入等待,直到当前时间超过入参的时间。
-
await(long, TimeUnit):当前线程进入等待,有超时时间,入参可以自己设置时间单位。
这些方法其实大同小异,因此本文只对常用的signal()、signalAll()和await()方法展开详解。搞懂了这3个方法,搞懂其他几个方法也基本没什么阻碍。
基础属性
====
Condition的实现是ConditionObject,而ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为条件队列),该队列是Condition对象实现等待/通知功能的关键。
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter; // 条件队列的头节点
/** Last node of condition queue. */
private transient Node lastWaiter; // 条件队列的尾节点
/**
- Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
通过源码可知,条件队列的节点使用的是AQS的Node数据结构。(Node的数据结构见:Node的数据结构)
另外,由于ConditionObject是AQS的内部类,因此必然和AQS是有很多关联的,因此看本文之前必须先了解AQS的实现原理。(如果你对AQS不熟悉,可以参考我的另一篇文章:Java并发:AbstractQueuedSynchronizer详解(独占模式))
条件队列的基本数据结构如下图中的“条件队列”:
await方法
=======
public final void await() throws InterruptedException { // 阻塞当前线程,直接被唤醒或被中断
if (Thread.interrupted()) // 如果当前线程被中断过,则抛出中断异常
throw new InterruptedException();
Node node = addConditionWaiter(); // 添加一个waitStatus为CONDITION的节点到条件队列尾部
int savedState = fullyRelease(node); // 释放操作。我们知道只有在拥有锁(acquire成功)的时候才能调用await()方法,因此,调用await()方法的线程的节点必然是同步队列的头节点。所以,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的条件队列中。
int interruptMode = 0; // 0为正常,被中断值为THROW_IE或REINTERRUPT
while (!isOnSyncQueue(node)) { // isOnSyncQueue:判断node是否在同步队列(注意和条件队列区分。调用signal方法会将节点从条件队列移动到同步队列,因此这边就可以跳出while循环)
LockSupport.park(this); // node如果不在同步队列则进行park(阻塞当前线程)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查线程被唤醒是否是因为被中断,如果是则跳出循环,否则会进行下一次循环,因为被唤醒前提是进入同步队列,所以下一次循环也必然会跳出循环
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // acquireQueued返回true代表被中断过,如果中断模式不是THROW_IE,则必然为REINTERRUPT(见上面的checkInterruptWhileWaiting方法)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 移除waitStatus为CANCELLED的节点
if (interruptMode != 0) // 如果跳出while循环是因为被中断
reportInterruptAfterWait(interruptMode); // 则根据interruptMode,选择抛出InterruptedException 或 重新中断当前线程
}
-
如果当前线程被中断过,则抛出中断异常。
-
调用addConditionWaiter方法(详解见下文addConditionWaiter方法)添加一个waitStatus为CONDITION的节点到条件队列尾部。
-
调用fullyRelease方法(详解见下文fullyRelease方法)释放锁。
-
调用isOnSyncQueue方法(详解见下文isOnSyncQueue方法)来阻塞线程,直到被唤醒或被中断。
-
调用acquireQueued方法(详解见acquireQueued方法详解)来尝试获取锁,并判断线程跳出while循环是被唤醒还是被中断。
-
如果跳出while循环是因为被中断,则根据interruptMode,选择抛出InterruptedException 或 重新中断当前线程。
addConditionWaiter方法
private Node addConditionWaiter() { // 添加一个waitStatus为CONDITION的节点到条件队列尾部
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); // 移除waitStatus不为CONDITION的节点(条件队列里的节点waitStatus都为CONDITION)
t = lastWaiter; // 将t赋值为移除了waitStatus不为CONDITION后的尾节点(上面进行了移除操作,因此尾节点可能会发生变化)
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); // 以当前线程新建一个waitStatus为CONDITION的节点
if (t == null) // t为空,代表条件队列为空
firstWaiter = node; // 将头节点赋值为node
else
t.nextWaiter = node; // 否则,队列不为空。将t(原尾节点)的后继节点赋值为node
lastWaiter = node; // 将node赋值给尾节点,即将node放到条件队列的尾部。这里没有用CAS来保证原子性,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的
return node;
}
-
如果条件队列的尾节点不为null并且waitStatus不为CONDITION,则调用unlinkCancelledWaiters方法(详解见下文unlinkCancelledWaiters方法)移除waitStatus不为CONDITION的节点(条件队列里的节点waitStatus都为CONDITION),并将t赋值为移除了waitStatus不为CONDITION后的尾节点(上面进行了移除操作,因此尾节点可能会发生变化)
-
以当前线程新建一个waitStatus为CONDITION的节点。
-
如果t为空,代表条件队列为空,将头节点赋值为node;否则,队列不为空。将t(原尾节点)的后继节点赋值为node。
-
最后将node赋值给尾节点,即将node放到条件队列的尾部。这里没有用CAS来保证原子性,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
unlinkCancelledWaiters方法
private void unlinkCancelledWaiters() { // 从条件队列移除所有waitStatus不为CONDITION的节点
Node t = firstWaiter; // t赋值为条件队列的尾节点
Node trail = null;
while (t != null) {
Node next = t.nextWaiter; // 向下遍历
if (t.waitStatus != Node.CONDITION) { // 如果t的waitStatus不为CONDITION
t.nextWaiter = null; // 断开t与t后继节点的关联
if (trail == null) // 如果trail为null,则将firstWaiter赋值为next节点,此时还没有遍历到waitStatus为CONDITION的节点,因此直接移动firstWaiter的指针即可移除前面的节点
firstWaiter = next;
else
trail.nextWaiter = next; // 否则将trail的后继节点设为next节点。此时,trail节点到next节点中的所有节点被移除(包括t节点,但可能不止t节点。因为,trail始终指向遍历过的最后一个waitStatus为CONDITION,因此只需要将trail的后继节点设置为next,即可将trail之后到next之前的所有节点移除)
if (next == null)
lastWaiter = trail;
}
else
trail = t; // 如果t的waitStatus为CONDITION,则将trail赋值为t,trail始终指向遍历过的最后一个waitStatus为CONDITION
t = next; // t指向下一个节点
}
}
-
将t赋值为条件队列的尾节点 。
-
从t开始遍历整个条件队列。
-
如果t的waitStatus不为CONDITION,则断开t与t后继节点的关联。
-
如果trail为null,则将firstWaiter赋值为next节点,此时还没有遍历到waitStatus为CONDITION的节点,因此直接移动firstWaiter的指针即可移除前面的节点。
-
如果trail不为null,则将trail的后继节点设为next节点。此时,trail节点到next节点中的所有节点被移除(包括t节点,但可能不止t节点。因为,trail始终指向遍历过的最后一个waitStatus为CONDITION,因此只需要将trail的后继节点设置为next,即可将trail之后到next之前的所有节点移除)
-
如果t的waitStatus为CONDITION,则将trail赋值为t,trail始终指向遍历过的最后一个waitStatus为CONDITION。
-
最后将 t指向下一个节点,准备开始下一次循环。
例子图解过程:
fullyRelease方法
final int fullyRelease(Node node) { // 释放锁
boolean failed = true;
try {
int savedState = getState(); // 当前的同步状态
if (release(savedState)) { // 独占模式下release(一般指释放锁)
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED; // 如果release失败则将该节点的waitStatus设置为CANCELLED
}
}
调用release方法(详解见release方法详解)释放锁,如果释放失败,则将该节点的waitStatus设置为CANCELLED。
isOnSyncQueue方法
final boolean isOnSyncQueue(Node node) { // 判断node是否再同步队列中
if (node.waitStatus == Node.CONDITION || node.prev == null) // 如果waitStatus为CONDITION 或 node没有前驱节点,则必然不在同步队列,直接返回false
return false;
if (node.next != null) // If has successor, it must be on queue 如果有后继节点,必然是在同步队列中,返回true
return true;
/*
-
node.prev can be non-null, but not yet on queue because
-
the CAS to place it on queue can fail. So we have to
-
traverse from tail to make sure it actually made it. It
-
will always be near the tail in calls to this method, and
-
unless the CAS failed (which is unlikely), it will be
-
there, so we hardly ever traverse much.
*/
return findNodeFromTail(node); // 返回node是否为同步队列节点,如果是返回true,否则返回false
}
返回node是否为同步队列节点,如果是返回true,否则返回false。因为只有该节点的线程被唤醒(signal())才会从条件队列移到同步队列。
findNodeFromTail方法
private boolean findNodeFromTail(Node node) { // 从同步队列的尾节点开始向前遍历,如果node为同步队列节点则返回true,否则返回false
Node t = tail;
for (;😉 {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
从同步队列的尾节点开始向前遍历,如果node为同步队列节点则返回true,否则返回false。
signal方法
========
public final void signal() {
if (!isHeldExclusively()) // 检查当前线程是否为独占模式同步器的所有者
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 唤醒条件队列的头节点
}
-
检查当前线程是否为独占模式同步器的所有者,在ReentrantLock中即检查当前线程是否为拥有锁的线程。如果不是,则抛IllegalMonitorStateException。
-
拿到条件队列的头节点,如果不为null,则调用doSignal方法(详解见下文doSignal方法)唤醒头节点。
doSignal方法
private void doSignal(Node first) { // 将条件队列的头节点移到同步队列
do {
if ( (firstWaiter = first.nextWaiter) == null) // 将first节点赋值为first节点的后继节点(相当于移除first节点),如果first节点的后继节点为空,则将lastWaiter赋值为null
lastWaiter = null;
first.nextWaiter = null; // 断开first节点对first节点后继节点的关联
} while (!transferForSignal(first) && // transferForSignal:将first节点从条件队列移动到同步队列
(first = firstWaiter) != null); // 如果transferForSignal失败,并且first节点不为null,则向下遍历条件队列的节点,直到节点成功移动到同步队列 或者 firstWaiter为null
}
-
将first节点赋值为first节点的后继节点(相当于移除first节点),如果first节点的后继节点为空,则将lastWaiter赋值为null。
-
断开first节点与first节点后继节点的关联。
-
调用transferForSignal方法(详解见下文transferForSignal方法)将first节点从条件队列移动到同步队列。
-
如果transferForSignal失败,并且first节点的后继节点(firstWaiter)不为null,则向下遍历条件队列的节点,直到节点成功移动到同步队列 或者 first节点的后继节点为null。
transferForSignal方法
final boolean transferForSignal(Node node) { // 将node节点从条件队列移动到同步队列,如果成功则返回true。
/*
- If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 如果不能更改节点的waitStatus,则表示该节点已被取消,返回false
return false;
/*
-
Splice onto queue and try to set waitStatus of predecessor to
-
indicate that thread is (probably) waiting. If cancelled or
-
attempt to set waitStatus fails, wake up to resync (in which
-
case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); // 否则,调用enq方法将node添加到同步队列,注意:enq方法返回的节点是node的前驱节点
int ws = p.waitStatus; // 将ws赋值为node前驱节点的等待状态
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果node前驱节点的状态为CANCELLED(ws>0) 或 使用CAS将waitStatus修改成SIGNAL失败,则代表node的前驱节点无法来唤醒node节点,因此直接调用LockSupport.unpark方法唤醒node节点
LockSupport.unpark(node.thread);
return true;
}
-
如果不能更改节点的waitStatus,则表示该节点已被取消,返回false。
-
调用enq方法(详解见enq方法详解)将node添加到同步队列,注意:enq方法返回的节点是node的前驱节点。因此,此时p节点为node的前驱节点。
-
将ws赋值为node前驱节点(p节点)的waitStatus。
-
如果p节点的waitStatus为CANCELLED(ws>0) 或 使用CAS将p节点的waitStatus修改成SIGNAL失败,则代表p节点无法来唤醒node节点,因此直接调用LockSupport.unpark方法唤醒node节点。
signalAll方法
===========
public final void signalAll() {
if (!isHeldExclusively()) // 检查当前线程是否为独占模式同步器的所有者
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first); // 唤醒条件队列的所有节点
}
-
检查当前线程是否为独占模式同步器的所有者,在ReentrantLock中即检查当前线程是否为拥有锁的线程。如果不是,则抛IllegalMonitorStateException。
-
拿到条件队列的头节点,如果不为null,则调用doSignalAll方法(详解见下文doSignalAll方法)唤醒条件队列的所有节点。
doSignalAll方法
private void doSignalAll(Node first) { // 将条件队列的所有节点移到同步队列
lastWaiter = firstWaiter = null; // 因为要移除条件队列的所有节点到同步队列,因此这边直接将firstWaiter和lastWaiter赋值为null
do {
Node next = first.nextWaiter; // next赋值为first节点的后继节点
first.nextWaiter = null; // 断开first节点对first节点后继节点的关联
transferForSignal(first); // transferForSignal:将first节点从条件队列移动到同步队列
first = next; // first赋值为next节点
} while (first != null); // 循环遍历,将条件队列的所有节点移动到同步队列
}
-
因为要移除条件队列的所有节点到同步队列,因此这边直接将firstWaiter和lastWaiter赋值为null。
-
next赋值为first节点的后继节点 。
-
断开first节点对first节点后继节点的关联
-
调用transferForSignal方法(详解见上文transferForSignal方法)将first节点从条件队列移动到同步队列。
-
first赋值为next节点,准备下一次循环。
-
如果first不为null,则进入下一次循环。
总结
==
- 调用await和signal方法都需要先获得锁,否则会抛异常。
最后
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长,自己不成体系的自学效果低效漫长且无助。
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,不论你是刚入门Android开发的新手,还是希望在技术上不断提升的资深开发者,这些资料都将为你打开新的学习之门!
如果你觉得这些内容对你有帮助,需要这份全套学习资料的朋友可以戳我获取!!
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
得锁,否则会抛异常。
最后
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长,自己不成体系的自学效果低效漫长且无助。
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
[外链图片转存中…(img-bF7YIq6k-1715431578111)]
[外链图片转存中…(img-ZxvtQnCj-1715431578112)]
[外链图片转存中…(img-gQeLmtcV-1715431578112)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,不论你是刚入门Android开发的新手,还是希望在技术上不断提升的资深开发者,这些资料都将为你打开新的学习之门!
如果你觉得这些内容对你有帮助,需要这份全套学习资料的朋友可以戳我获取!!
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
版权声明:本文标题:Java并发:Condition详解 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/xitong/1728321347a1153862.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论