admin管理员组文章数量:1599542
文章目录
- 是什么
- 使用场景
- 怎样用
- 使用实例:
- 源码剖析
- await()
- signal
是什么
与Object.wait()\Object.notify()功能很类似。
以AQS非静态内部类的方式实现,因此Condition初始化的前提是先要有Lock实例,并且要先获取到锁。
使用场景
需要进程之间有协作的场景,典型的如等待-通知模型,生产-消费模型,比如有界队列:队列满了需要阻塞插入元素的线程,删除操作线程完成后唤醒被阻塞的插入操作线程。队列空了,需要阻塞删除操作线程,当有插入操作线程完成时唤醒被阻塞的删除操作线程;
怎样用
对比于Object的wait/notify的典型使用方式:
Object wait/notify的使用:
//Object监视器 wait/notify
threadA:
synchronized(theObject){//必须先要获得monitor锁(对象锁)
while(!flag)
theObject.wait();
doSomethings();
}
threadB:
synchronized(theObject){
...
flag=true;
theObject.notify();
}
Lock.Conodition await/signal的使用:
//Conodition await/signal
Condition condition=lock.newConditon();
threadA:
lock.lock()
try{
while(!flag)
condition.await();
doSomeThings();
}catch{
}finally{
Lock.unlock()
}
threadB:
lock.lock()
try{
...
flag=true;
condition.signal();
}catch{
}finally{
Lock.unlock()
}
使用实例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Food {
private boolean isBoiled = false;
private Lock lock = new ReentrantLock();
// 单个Lock产生一个Condition对象,用来管理任务间的通信
private Condition condition = lock.newCondition();
@Override
public String toString() {
return "Food [isBoiled=" + isBoiled + "]";
}
public void cook() {
// 显示锁必须紧跟try-finally,调用 await()\signalAll()都必须获得锁,最后finally中释放
lock.lock();
try {
isBoiled = true;
System.out.println("cook ");
condition.signalAll();
} finally {
lock.unlock();
}
// isBoiled = true;
// notifyAll();
}
public void eat() {
lock.lock();
try {
isBoiled = false;
System.out.println("eat ");
condition.signalAll();
} finally {
lock.unlock();
}
// isBoiled = false;
// notifyAll();
}
public void waitForEat() throws InterruptedException {
lock.lock();
try {
while (isBoiled == true) {
condition.await();
}
} finally {
lock.unlock();
}
// while (isBoiled == true) {
// wait();
//
// }
}
public void waitForCook() throws InterruptedException {
lock.lock();
try {
while (isBoiled == false) {
condition.await();
}
} finally {
lock.unlock();
}
// while (isBoiled == false) {
// wait();
// }
}
}
class Cook implements Runnable {
private Food food;
public Cook(Food food) {
this.food = food;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(200);
food.cook();
// System.out.println("Cook " + food);
food.waitForEat();
}
} catch (Exception e) {
System.err.println("Cook task interrupted ");
}
System.out.println("Cook task off ");
}
}
class Eater implements Runnable {
private Food food;
public Eater(Food food) {
this.food = food;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
food.waitForCook();
food.eat();
// System.out.println("Eater " + food);
}
} catch (Exception e) {
System.err.println("Eater task interrupted ");
}
System.out.println("Eater task off ");
}
}
public class CookAndEat {
public static void main(String[] args) throws InterruptedException {
Food food = new Food();
ExecutorService ex = Executors.newFixedThreadPool(2);
ex.execute(new Eater(food));
ex.execute(new Cook(food));
ex.execute(new Eater(food));
ex.execute(new Cook(food));
TimeUnit.SECONDS.sleep(1);
ex.shutdownNow();
}
}
源码剖析
前提:lock获取到锁(lock.lcok()成功返回)
await()
public final void await() throws InterruptedException {
if (Thread.interrupted())// 中断响应
throw new InterruptedException();
// addConditionWaiter:
// 1.尾结点Cancelled状态的结点,那么遍历队Condition列中的所有结点,提出Cancelled的结点。
// 2.将当前线程构造为一个Condition结点,并加入队列尾部。
Node node = addConditionWaiter();
// 释放锁并且唤醒等待这个锁的后继结点,并返回这个结点进入await方法时锁的状态(正常情况下应该是>=1的)
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果node在SyncQueue上,那么说明这个结点已经从Conditon队列中摘除,并放入了Sync队列。
// 也就是说对这个结点执行了Signal.
while (!isOnSyncQueue(node)) {
LockSupport.park(this);// 如果当前结点在conditon中不在sync队列中,阻塞,因为这个结点并没有其他线程对它执行signal
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 当收到另外一个线程的signal信号后,继续执行下列逻辑,重新开始正式竞争锁。同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)// 当前线程再次获取锁&&不需要抛出异常
interruptMode = REINTERRUPT;// 如果获取锁acquireQueued时返回true表示该线程被中断过
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();// 清理下条件队列中的不是在等待条件的节点
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();// 应该>=1
// AQS.release模板方法:CAS获取锁-修改state并唤醒后继结点.
// 注意!!!:
// 这里的逻辑是在Sync队列上操作的,当前线程执行Condition方法前提是获取了锁,那么当前线程应该是Sync队列的head结点。
// 这里release释放了当前结点(Sync.head)的锁,并在需要的情况下唤醒Sync.head的后继结点
if (release(savedState)) {
failed = false;
return savedState;
} else {// 调用condition之前没有获取锁
throw new IllegalMonitorStateException();
}
} finally {
if (failed)// 如果失败,则将当前结点置为Cancelled。Condition中结点的状态只有CONDITION与CANCELLED以及再唤醒逻辑中置为0
node.waitStatus = Node.CANCELLED;// 对应addConditionWaiter中的unlinkCancelledWaiters
}
}
总结
1.将当前线程构造为一个Condition结点,并加入Conditon队列尾部(lastWaiter.nextWaiter)。
Condition队列由三个Node引用构成: Node firstWaiter;Node lastWaiter;Node nextWaiter;
2.释放当前线程对应结点(当前线程结点在Sync队列上)的同步状态:释放锁
3.判断是否在Sync队列中,不在则阻塞。除次调用await构造的Conditon队列结点肯定不在Sync,每次调用一个对应的notify或者notifyAll才会把Conditon队列结点移动到Sync队列尾部。
那么换句话说:这里的阻塞,其实是在等待其他线程执行对应的notify方法。
4.如果其他线程F执行了notify方法,则这个结点从Conditon队列移入Sync队列。
然后其他线程F释放了锁(释放锁逻辑中会唤醒后继结点,当然Conditon结点在Sync上也是要排队的),当前线程从park处被唤醒,再次判断,isOnSyncQueue==true,执行AQS.acquireQueued方法(当前线程已经在Sync队列上了),在Sync同步队里尝试获取锁(不赘述,详见AQS解析)**
也就是说,一个线程调用conditon.await()并返回的过程:这个线程会从Conditon队列移动到Sync队列,并且两次获取锁。
signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//从Conditon队列头部开始唤醒结点(notifyAll会从头部依次唤醒所有Condition结点)
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 从conditon队列中移除这个结点(Node first).即将放入Sync队列尾部
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 加入到sync队列尾部失败 &&结点移除失败
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
/**
* 将Condition队列的头结点从Condition队列转移到同步队列上,为了保证Condition队列的结点再次同步获取锁
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or attempt
* to set waitStatus fails, wake up to resync (in which case the
* waitStatus can be transiently and harmlessly wrong).
* 拼接到队列中并且尝试设置前驱结点的waitStatus状态,用来表示线程在等待状态。
* 如果结点取消了或者尝试设置waitStatus失败,唤醒以再次同步
*/
Node p = enq(node);// 结点插入Sync队列尾部,并返回这个结点的前驱结点(也就是之前的Tail)
int ws = p.waitStatus;
// 是取消状态||设置为Signal态没有成功(表示node前驱结点的状态异常:需要将node结点在sync队列中唤醒,重新进行同步操作)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
总结:
1.从conditon队列头部移除结点(Node firstWaiter).即将放入Sync队列尾部:
2.从Conditon队列头部开始唤醒结点(firstWaiter):
3.将这个结点状态从CONDITION修改为0;放入Sync队列尾部,并将这个结点的前驱结点设为SIGNAL态。
4.执行完Conditon.signal,并且附带在finally中释放锁。释放锁这个逻辑里会唤醒后继结点,后继结点也会唤醒后继结点直到 后继是在Conditon上等待的线程,被唤醒。也就是说Condition从Condition队列移动到Sync队列,排队等待自己被唤醒
signalAll对每个结点都做了一次唤醒、放入同步队列的操作;而signal只对**condition队列的firstwaiter(也就是等待时间最长的结点)**做了同步处理。
版权声明:本文标题:java并发编程——Condition(waitsignalnotify的等待-通知模式 ) 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/xitong/1728323076a1154080.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论