admin管理员组

文章数量:1599543

AQS如何实现等待通知?

在进行线程间的通信时,可以用基于Object对象的wait和notify方法实现等待/通知机制

基于Object实现等待/通知机制的相关方法

方法名称描述
notify()通知一个在对象上等待的线程,使其从wait()方法返回
notifyAll()通知所有等待在该对象上的线程
wait()调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才会返回,需要注意,调用wait()方法后,会释放对象的锁
wait(long)超时等待一段时间,这里的参数是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回
wait(long, int)对于超时时间更细粒度的控制,可以达到纳秒

用例子演示一下用法

public class WaitNotify {

    private static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread waitThread = new Thread(new Wait());
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify());
        notifyThread.start();
    }

    private static class Wait implements Runnable {

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    System.out.println("lock wait start");
                    lock.wait();
                    System.out.println("lock wait end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 条件满足,完成工作
                System.out.println("run finish");
            }
        }
    }

    private static class Notify implements Runnable {

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println("lock notify start");
                lock.notify();
                System.out.println("lock notify end");
            }
        }
    }
}
lock wait start
lock notify start
lock notify end
lock wait end
run finish

wait()方法被执行后,锁被自动释放,但notify()方法被执行后,锁却不自动释放,必须执行完notify()方法所在的同步synchronized代码块后才释放锁

在AQS相关类中怎么实现这种等待/通知机制呢?

答案是Condition,Condition是一个接口,AbstractQueuedSynchronizer中有一个内部类实现了这个接口,一个Condition就代表一个条件队列

基于Condition实现等待/通知机制

和synchronized一样,调用await和signal方法时,必须获得与Condition相关的锁

Condition的主要方法有2个

  1. await,将线程放入等待队列进行等待,当其他线程调用signal或者interrupt方法时会被唤醒
  2. signal,唤醒等待队列中的线程,将其放入同步队列,获得锁后开始执行

Conditon使用例子如下

public class WaitNotifyUseCondition {

    private static ReentrantLock lock = new ReentrantLock();
    private static Condition conditionA  = lock.newCondition();
    private static Condition conditionB = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        Thread waitThreadA = new Thread(new WaitA());
        waitThreadA.start();
        Thread waitThreadB = new Thread(new WaitB());
        waitThreadB.start();
        TimeUnit.SECONDS.sleep(2);
        lock.lock();
        try {
            conditionA.signal();
        } finally {
            lock.unlock();
        }
    }

    private static class WaitA implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println("WaitA begin wait");
                conditionA.await();
                System.out.println("WaitA end wait");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    private static class WaitB implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println("WaitB begin wait");
                conditionB.await();
                System.out.println("WaitB end wait");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}
WaitA begin wait
WaitB begin wait
WaitA end wait

WaitThreadB因为没有被通知,一直阻塞

可以看到Condition比Object的等待通知更强大,因为它可以实现条件通知

其实原因也和简单,基于Object的等待通知只有一个等待队列,而在AQS中有多个等待队列,一个Condition类代表一个条件队列

同步队列和等待队列


在之前的AQS文章中,我们提到了同步队列,本节我们又提到了等待队列,那他们两者是如何协同工作的?

AbstractQueuedSynchronizer内部维护着一个同步队列(双向链表实现),多个条件队列(单向链表实现),条件队列由AbstractQueuedSynchronizer的内部类ConditionObject来维护,new一个ConditonObject ,则多一个条件队列,当一个线程执行await方法是,会把当线程包装成一个Node节点,放到执行await方法的ConditionObject的条件队列中,释放锁并被阻塞,当执行signal方式时,会把条件队列的第一个节点移除,并转移到同步队列中,获取到锁即可继续执行

ConditionObject 是AbstractQueuedSynchronizer的一个内部类,用来实现条件队列,属性如下

public class ConditionObject implements Condition, java.io.Serializable {

	// 条件队列的头节点
	private transient Node firstWaiter;
	
	// 条件队列的尾节点
	private transient Node lastWaiter;

	public ConditionObject() { }

	// 阻塞过程中不响应中断,仅设置标志位,让之后的方法处理
	private static final int REINTERRUPT =  1;
	
	// 阻塞过程中响应中断,并throw InterruptedException
	private static final int THROW_IE    = -1;

}

假如在阻塞过程中发生了中断,REINTERRUPT标志了中断发生在 signalled之后,
THROW_IE标志了中断发生在 signalled之前,从而决定采用那种方式响应中断

await阻塞线程

将线程放入等待队列,并且阻塞线程。当调用signal的时候,唤醒线程,进入同步队列获取锁

// ConditionObject
public final void await() throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	// 当前线程加入等待队列
	Node node = addConditionWaiter();
	// 挂起线程前,必须释放当前锁,这里调用ReentrantLock释放锁的逻辑
	int savedState = fullyRelease(node);
	// 标志位
	int interruptMode = 0;
	// 判断当前线程是否在同步队列中,如果不在同步队列,则直接挂起线程
	while (!isOnSyncQueue(node)) {
		// 阻塞
		LockSupport.park(this);
		// 线程被唤醒,线程节点从条件队列移除,并放到放到同步队列,或被中断
		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
			break;
	}
	// 唤醒之后进入同步队列,竞争获取锁
	// 获取锁的过程中有中断,并且标志位不是(响应中断)
	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
		interruptMode = REINTERRUPT;
	if (node.nextWaiter != null) // clean up if cancelled
		// 清除等待队列中不是等待状态的节点
		unlinkCancelledWaiters();
	// 阻塞中被中断过,则处理中断
	if (interruptMode != 0)
		// 根据标志位,决定对中断的处理方式
		reportInterruptAfterWait(interruptMode);
}

增加等待节点的逻辑,每次增加节点都会剔除非condition的节点

// ConditionObject
private Node addConditionWaiter() {
	Node t = lastWaiter;
	// If lastWaiter is cancelled, clean out.
	if (t != null && t.waitStatus != Node.CONDITION) {
		// 清除等待队列中取消状态的节点
		unlinkCancelledWaiters();
		t = lastWaiter;
	}
	Node node = new Node(Thread.currentThread(), Node.CONDITION);
	// 链表还没有初始化
	if (t == null)
		firstWaiter = node;
	else
		t.nextWaiter = node;
	lastWaiter = node;
	return node;
}

释放锁

// AbstractQueuedSynchronizer
final int fullyRelease(Node node) {
	boolean failed = true;
	try {
		int savedState = getState();
		if (release(savedState)) {
			failed = false;
			return savedState;
		} else {
			// 释放锁失败
			throw new IllegalMonitorStateException();
		}
	} finally {
		// 释放锁失败后,将当前节点状态设置为CANCELLED
		// 后序会被清理出条件队列
		if (failed)
			node.waitStatus = Node.CANCELLED;
	}
}

判断节点是否在同步队列

// AbstractQueuedSynchronizer
final boolean isOnSyncQueue(Node node) {
	// 节点在条件队列
	// 同步队列中节点的状态 只能为0、SIGNAL、PROPAGATE 和 CANCELLED 其中之一
	if (node.waitStatus == Node.CONDITION || node.prev == null)
		return false;
	// 如果后继节点不为null,则表明节点在同步队列上
	// 因为条件队列使用的是nextWaiter指向后继节点的
	// 条件队列上节点的next均为null
	if (node.next != null) // If has successor, it must be on queue
		return true;
	
	// 走到这一步,说明node.prev!=null && node.next=null
	// 但这并不能说明node在同步队列中,因为节点在入队过程中
	// 是先设置node.prev后设置node.next的(详见addWaiter方法)
	// 有可能CAS设置尾节点失败,导致没有加入队列
	// 所以从尾到头遍历一遍
	return findNodeFromTail(node);
}
// AbstractQueuedSynchronizer
private boolean findNodeFromTail(Node node) {
	Node t = tail;
	for (;;) {
		if (t == node)
			return true;
		if (t == null)
			return false;
		t = t.prev;
	}
}

检测线程在等待期间是否发生中断

// ConditionObject
// Checks for interrupt, returning THROW_IE if interrupted
// before signalled, REINTERRUPT if after signalled, or
// 0 if not interrupted.
private int checkInterruptWhileWaiting(Node node) {
	return Thread.interrupted() ?
		(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
		0;
}
// AbstractQueuedSynchronizer
final boolean transferAfterCancelledWait(Node node) {
	// signalled之前发生中断,因为signalled之后会将会将节点状态从CONDITION 设置为0
	if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
		enq(node);
		return true;
	}

	// signalled之后发生中断
	// 如果节点还没有被放入同步队列,则放弃当前CPU资源
	// 让其他任务执行
	while (!isOnSyncQueue(node))
		Thread.yield();
	return false;
}

清除等待队列中取消状态的节点

// ConditionObject
private void unlinkCancelledWaiters() {
	Node t = firstWaiter;
	// 指向尾节点
	Node trail = null;
	while (t != null) {
		Node next = t.nextWaiter;
		if (t.waitStatus != Node.CONDITION) {
			t.nextWaiter = null;
			// 只有头节点的状态不是CONDITION才会执行到这一步
			if (trail == null)
				firstWaiter = next;
			else
				trail.nextWaiter = next;
			// 遍历完链表,设置尾节点
			if (next == null)
				lastWaiter = trail;
		}
		else
			trail = t;
		t = next;
	}
}

响应中断的方式
interruptMode=THROW_IE(-1):await退出时,直接抛出InterruptedException
interruptMode=REINTERRUPT(1):await退出时,重置中断标记位

// ConditionObject
private void reportInterruptAfterWait(int interruptMode)
	throws InterruptedException {
	if (interruptMode == THROW_IE)
		// 直接响应中断
		throw new InterruptedException();
	else if (interruptMode == REINTERRUPT)
		// 重置中断标记位
		selfInterrupt();
}
// AbstractQueuedSynchronizer
static void selfInterrupt() {
	Thread.currentThread().interrupt();
}

signal,唤醒等待时间最长的线程

// ConditionObject
public final void signal() {
	// 当前线程没有获取到锁
	if (!isHeldExclusively())
		throw new IllegalMonitorStateException();
	// 唤醒等待队列中的头结点
	Node first = firstWaiter;
	if (first != null)
		doSignal(first);
}
// ConditionObject
private void doSignal(Node first) {
	do {
		// 将同步队列的头结点,设置为目前头结点的下一个节点
		// 如果头节点的下一个节点为null,则设置尾节点为null
		if ( (firstWaiter = first.nextWaiter) == null)
			lastWaiter = null;
		// 将first节点从条件队列中移除
		first.nextWaiter = null;
		// 通知第一个非CANCELLED节点被唤醒,或者遍历完,退出
	} while (!transferForSignal(first) &&
			 (first = firstWaiter) != null);
}

唤醒第一个非CANCELLED节点

// 将节点从条件队列放入同步队列,true为成功 
// AbstractQueuedSynchronizer
final boolean transferForSignal(Node node) {
	 
	// 通过CAS将节点的状态从CONDITION设置为0
	// 如果设置失败,说明这个节点状态为CANCELLED
	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
		return false;

	// 加入同步队列,并返回前继节点
	Node p = enq(node);
	int ws = p.waitStatus;
	// 前继节点为CANCELLED状态,或者设置SIGNAL状态失败
	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
		// 唤醒线程
		LockSupport.unpark(node.thread);
	return true;
}

signalAll和signal实现类似,区别如下,signal将等待队列中的一个非CANCELLED节点放到同步队列,而signalAll是将等待队列中的所有非CANCELLED节点放到同步队列中

参考博客

队列示意图
[0]https://juejin/post/6877821298110627854
[1]http://www.importnew/9281.html
[2]https://www.jianshu/p/6b5aa7b7684c
[3]http://www.importnew/9281.html
[4]https://blog.csdn/prestigeding/article/details/53158246
[5]https://blog.csdn/javazejian/article/details/75043422
[6]https://segmentfault/a/1190000014751308
[7]https://blog.csdn/woshilijiuyi/article/details/79313247
好文章
[8]https://blog.csdn/lsgqjh/article/details/103552392?utm_medium=distribute.pc_relevant_t0.none-task-blog-BlogCommendFromMachineLearnPai2-1.control&dist_request_id=1328602.10554.16149251689465853&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-BlogCommendFromMachineLearnPai2-1.control
[9]https://blog.csdn/fd135/article/details/104449912?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-0&spm=1001.2101.3001.4242

本文标签: 如何实现条件通知工具condition