java并发编程——从源码手把手带你理解AQS源码

编程入门 行业动态 更新时间:2024-10-08 18:37:37

java并发编程——从<a href=https://www.elefans.com/category/jswz/34/1770099.html style=源码手把手带你理解AQS源码"/>

java并发编程——从源码手把手带你理解AQS源码

揭开AQS神秘面纱

  • 一、AQS简介
  • 二、AQS主要方法简介
  • 三、AQS实现原理
    • AQS独占功能
      • ReentrantLock公平策略
      • ReentrantLock非公平策略
      • AQS对中断的支持
      • AQS对限时等待的支持
    • AQS的condition等待
    • AQS共享功能
    • AQS读写锁

一、AQS简介

AbstractQueuedSynchronizer抽象类(以下简称AQS)是整个java.util.concurrent包的核心。在J.U.C包中,该包中的大多数同步器都是基于AQS来建立的。AQS框架提供了一套通用的机制来管理同步状态、阻塞/唤醒线程、管理等待队列。

AQS框架,采用了模板方法设计模式,定义好骨架和内部操作细节,具体规则让子类去实现。AQS框架将剩下一个问题留给用户:

什么是资源?如何定义资源是否可以被访问?

如图在AQS框架中对于核心方法tryAcquire方法并未作具体实现。真正实现是由其子类Sync、Worker去实现。通常Sync、Worker为内部类。

由下图可知ReentrantLock、CountDownLatch、Semaphore、ReentrantReadWriteLock 均存在Sync内部类,由此可见这个同步器底层都是通过AQS实现的。

我们来看看几个常见的同步器对 “什么是资源” 这一问题的定义:

同步器资源的定义
ReentrantLock资源表示独占锁。State为0表示锁可用;为1表示被占用;为N表示重入的次数
CountDownLatch资源表示倒数计数器。State为0表示计数器归零,所有线程都可以访问资源;为N表示计数器未归零,所有线程都需要阻塞。
Semaphore资源表示信号量或者令牌。State≤0表示没有令牌可用,所有线程都需要阻塞;大于0表示由令牌可用,线程每获取一个令牌,State减1,线程没释放一个令牌,State加1。
ReentrantReadWriteLock资源表示共享的读锁和独占的写锁。state逻辑上被分成两个16位的unsigned short,分别记录读锁被多少线程使用和写锁被重入的次数。

AQS提供的模板方法有以下几种,通过暴露以下方法来让用户自己来定义是否可以被访问。

方法描述
tryAcquire排它获取(资源数)
tryRelease排它释放(资源数)
tryAcquireShared共享获取(资源数)
tryReleaseShared共享获取(资源数)
isHeldExclusively是否排它状态

二、AQS主要方法简介

  1. CAS操作
    CAS,即CompareAndSet,在java中CAS操作的实现都委托给一个名Unsafe类,Unsafe中很多方法都是用native修饰的本地方法。常用的compareAndSwapInt、compareAndSetState、compareAndSetWaitStatus等方法都是Unsafe类中的本地方法。
  2. 等待队列的核心方法
方法名描述
enq入队操作
addWaiter入队操作
setHead设置头结点
unparkSuccessor唤醒后继结点
doReleaseShared释放共享结点
setHeadAndPropagate设置头结点并传播唤醒
  1. 资源获取
方法名描述
acquireQueued尝试获取资源,获取失败尝试阻塞线程
doAcquireInterruptibly独占地获取资源(响应中断)
doAcquireNanos独占地获取资源(限时等待)
doAcquireShared共享地获取资源
doAcquireSharedInterruptibly共享地获取资源(响应中断)
doAcquireSharedNanos共享地获取资源(限时等待)
  1. 资源释放操作
方法名描述
release释放独占资源
releaseShared释放共享资源

三、AQS实现原理

要想知道AQS的实现原理,首先要知道三个问题:

  1. 同步状态的管理
    AQS使用getState、setState以及compareAndSetState操作来读取和更新这个状态。
  2. 阻塞/唤醒线程的操作
    在JDK1.5之前,除了内置的监视器机制外,没有其它方法可以安全且便捷得阻塞和唤醒当前线程。
    JDK1.5以后,java.util.concurrent.locks包提供了LockSupport类来作为线程阻塞和唤醒的工具。
  3. 线程等待队列的管理
    等待队列,是AQS框架的核心,整个框架的关键其实就是如何在并发状态下管理被阻塞的线程。
    等待队列是严格的FIFO队列,是Craig,Landin和Hagersten锁(CLH锁)的一种变种,采用双向链表实现,因此也叫CLH队列。
    CLH队列中的结点是对线程的包装,结点一共有两种类型:独占(EXCLUSIVE)和共享(SHARED)。
    每种类型的结点都有一些状态,其中独占结点使用其中的CANCELLED(1)、SIGNAL(-1)、CONDITION(-2),共享结点使用其中的CANCELLED(1)、SIGNAL(-1)、PROPAGATE(-3)。
结点状态描述
CANCELLED1取消。表示后驱结点被中断或超时,需要移出队列
SIGNAL-1发信号。表示后驱结点被阻塞了(当前结点在入队后、阻塞前,应确保将其prev结点类型改为SIGNAL,以便prev结点取消或释放时将当前结点唤醒。)
CONDITION-2Condition专用。表示当前结点在Condition队列中,因为等待某个条件而被阻塞了
PROPAGATE-3传播。适用于共享模式(比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。)
INITIAL0默认。新结点会处于这种状态

对于CLH队列,当线程请求资源时,如果请求不到,会将线程包装成结点,以自旋的方式不断尝试插入结点至队列尾部。

private Node enq(final Node node) {for (; ; ) {Node t = tail;if (t == null) { // 如果队列为空,则创建一个空的head结点if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}

AQS独占功能

这里以ReentrantLock为例,说明AQS提供的独占功能。分别从公平策略、非公平策略、以及AQS的锁中断、限时等待等功能来介绍。

ReentrantLock公平策略

本节ReentrantLock公平锁的分析基于一下示例:

假设现在有3个线程:ThreadA、ThreadB、ThreadC,一个公平的独占锁,3个线程会依次尝试去获取锁:
ReentrantLock lock=new ReentrantLock(true);
线程的操作时序如下:
//Thread A    lock
//Thread B    lock
//Thread C    lock
//Thread A    release
//Thread B    release
//Thread C    release
  1. Thread A 首先获取到锁
	//ReentrantLock的lock方法  public void lock() {//调用的是sync的同步方法 如果是公平锁则调用的是FairSync的lock方法,非公平锁调用的是NonfairSync的lock方法sync.lock();   }

最终调用的是FairSync的lock方法。

    static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}

acquire方法来自AQS

   public final void acquire(int arg) {//先尝试获取锁,如果获取失败则将线程添加到等到队列当中if (!tryAcquire(arg) &&         //acquireQueued  获取队列(也就是判断等待队列中是否有结点)// addWaiter方法,将线程封装成结点,并将结点插入到队列当红acquireQueued(addWaiter(Node.EXCLUSIVE), arg))      selfInterrupt();  //中断当前线程}

ReentrantLock重写了AQS的tryAcquire方法

 protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();     //获取当前线程int c = getState();   //获取当前线程状态  0表示锁可用,1表示锁被占用,大于1表示锁重入的次数 if (c == 0) {     //说明锁是可以用的if (!hasQueuedPredecessors() &&     	//判断该节点是否有前驱结点compareAndSetState(0, acquires)) {  //若没有前驱结点,尝试去将锁状态从0改为1setExclusiveOwnerThread(current);   //锁状态修改成功,则将当前线程设置为独占锁的占用者return true;     }//如果当前结点有前驱结点,说明前面有人等的时间比当前线程要长,则返回失败}else if (current == getExclusiveOwnerThread()) {  //锁已经被占有了,判断占有者是不是当前线程//如果是当前线程,即锁重入了int nextc = c + acquires;   // 将当前重入次数加1if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);   //CAS操作更新锁的状态return true;}return false;     }

ThreadA是首个获取锁的线程,所以上述方法会返回true,第一阶段结束。(ThreadA一直保持占有锁的状态)
此时,AQS中的等待队列还是空:

  1. Thread B 开始获取锁
    ThreadB先去调用lock方法,最终调用AQS的acquire方法,上述tryAcquire方法肯定是返回false(因为此时锁被线程A占有)。接下来会调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode); //将线程包装成结点// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {   //说明原先等待队列中有线程在等待node.prev = pred;   if (compareAndSetTail(pred, node)) {    //判断当前结点的前一个结点是不是尾结点pred.next = node;  //如果是,直接将接在后面。 (这里算是一个优化)return node;}}enq(node);  //如果有并发插入,此时将结点自旋插入队列。return node;}
private Node enq(final Node node) {for (;;) {   //自旋操作Node t = tail;   if (t == null) { //  如果没有尾结点,也是说等待队列没有初始化if (compareAndSetHead(new Node()))  //创建一个空的结点tail = head;     } else {node.prev = t;if (compareAndSetTail(t, node)) {   让当前节点的插入到队列这种t.next = node;return t;}}}}

大概流程如下图所示:

(1)当队列为null时,创建一个一个dummy头结点

(2)进入下一次循环,插入对尾结点。

将ThreadB结点插入队尾了,接下来回调用acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {    //自旋操作final Node p = node.predecessor();  //p为node的前驱结点if (p == head && tryAcquire(arg)) {  //如果p结点为dummy结点,说明node是首节点,则尝试获取锁setHead(node);  //获取锁成功,则将node结点设置为头结点(也就是之前的dummy结点)p.next = null; // help GCfailed = false;return interrupted;   }if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

上述方法,首先判断node结点是否为首结点(这里的首节点表示的是 head结点的下一个结点),发现确实是首结点(node.predecessor == head),就尝试去获取锁,如果获取失败(因为ThreadA占有者锁),就要判断是否需要阻塞当前线程。shouldParkAfterFailedAcquire。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;    //前驱结点的状态if (ws == Node.SIGNAL)      //SIGNAL:后续接地点需要被唤醒(这个状态表示当前结点的前驱结点将会唤醒自己,我今天安心阻塞)return true;if (ws > 0) {    //CANCELED:取消(说明当前结点因意外被中断或取消,需要将其从等待队列中移除)do {node.prev = pred = pred.prev;  //找到状态不为CANCELED的结点} while (pred.waitStatus > 0);pred.next = node;   } else {//对于独占功能来说,这里表示结点的初始状态为0,CAS操作将前驱结点状态标志设置为SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL); }return false;}

注意:对于独占功能,只是用了3中结点状态:CANCELLED:1 SIGNAL -1 COMDITION -2 ,所以上述代码中w>0就表示的CANCELLED状态。

结点状态描述
CANCELLED1取消。表示后驱结点被中断或超时,需要移出队列
SIGNAL-1发信号。表示后驱结点被阻塞了(当前结点在入队后、阻塞前,应确保将其prev结点类型改为SIGNAL,以便prev结点取消或释放时将当前结点唤醒。)
CONDITION-2Condition专用。表示当前结点在Condition队列中,因为等待某个条件而被阻塞了

对于在等待队列中的线程,如果要阻塞它,需要确保将来有线程可以唤醒它,AQS中通过将前驱结点的状态置为SIGNAL:-1,来表示将来会唤醒当前线程,当前线程可以安心阻塞。

线程阻塞流程图如下:
(1)插入完ThreadB后,队列的初始状态如下:

(2)虽然ThreadB是队首结点,但是它拿不到锁(被ThreadA占有着),所以ThreadB会阻塞,但在阻塞前需要设置下前驱的状态,以便将来可以唤醒我:

至此,ThreadB的执行也暂告一段落了(安心得在等待队列中睡觉)。
注意:补充一点,如果ThreadB在阻塞过程中被中断,其实是不会抛出异常的,只会在acquireQueued方法返回时,告诉调用者在阻塞器件有没被中断过,具体如果处理,要不要抛出异常,取决于调用者,这其实是一种延时中断机制。

	//shouldParkAfterFailedAcquire 判断线程是否应该阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())   //判断当前线程在阻塞阶段是否中断interrupted = true;private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

(3)Thread C 开始获取锁
ThreadC的调用过程和ThreadB完全一样,同样拿不到锁,然后加入到等待队列队尾:

	![在这里插入图片描述](.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L21rZmth,size_16,color_FFFFFF,t_70#pic_center)

然后,ThreadC在阻塞前需要把前驱结点的状态置为SIGNAL:-1,以确保将来可以被唤醒:

至此,ThreadC的执行也暂告一段落了,等着C放弃锁。
(4) Thread A 释放锁
ThreadA终于使用完了临界资源,要释放锁了,来看下ReentrantLock的unlock方法:

	public void unlock() {sync.release(1);}

unlock内部调用了AQS的release方法:

    public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);    //唤醒节点的后继者(如果存在)。return true;}return false;}

尝试释放锁的操作tryRelease:

	protected final boolean tryRelease(int releases) {int c = getState() - releases;    //锁的状态减一if (Thread.currentThread() != getExclusiveOwnerThread()) //当前线程如不是持有锁的线程throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {  //锁释放成功free = true;setExclusiveOwnerThread(null);   //设置锁的有者为null}setState(c);   //说明锁有重入,更新状态值return free;  }

释放成功后,调用unparkSuccessor方法,唤醒队列中的首结点:

	//唤醒当前结点的后继结点private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)   //SIGNAL : -1 compareAndSetWaitStatus(node, ws, 0);  //预置当前结点的状态为0,表示后续结点即将被唤醒Node s = node.next;  //正常情况下,会直接唤醒后继结点//但是如果后继结点处于:CANCELLED状态时(说明被取消了),会从队尾开始,向前找到第一个未被CANCELLED的结点if (s == null || s.waitStatus > 0) {   s = null;for (Node t = tail; t != null && t != node; t = t.prev) //从tail向前开始查询是为了考虑并发入队(enq)的情况。if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);  //此时s表示的就是当前结点的下一个结点。}

此时队列的状态为:

(5)Thread B 被唤醒后继续执行
队首结点(ThreadB)被唤醒了。ThreadB会继续从以下位置开始执行,先返回一个中断标识,用于表示ThreadB在阻塞期间有没被中断过:

	private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

然后ThreadB又开始了自旋操作,被唤醒的是队首结点,所以可以尝试tryAcquire获取锁,此时获取成功(ThreadA已经释放了锁)。获取成功后会调用setHead方法,将头结点置为当前结点,并清除线程信息:

 final boolean acquireQueued(final Node node, int arg) {//省略部分源码for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);   //获取锁成功则将当前结点设置为头结点。p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}//省略部分源码}

最终的队列状态如下:

(6) Thread B 释放锁
ThreadB也终于使用完了临界资源,要释放锁了,过程和ThreadA释放时一样,释放成功后,会调用unparkSuccessor方法,唤醒队列中的首结点:

	![在这里插入图片描述](.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L21rZmth,size_16,color_FFFFFF,t_70#pic_center)

队首结点(ThreadC)被唤醒后,继续从原来的阻塞处向下执行,并尝试获取锁,获取成功,最终队列状态如下:

(7) Thread C 释放锁
ThreadC也终于使用完了临界资源,要释放锁了。释放成功后,调用unparkSuccessor方法,唤醒队列中的首结点:
此时队列中只剩下一个头结点(dummy),所以这个方法其实什么都不做。最终队列的状态就是只有一个dummy头结点。

ReentrantLock非公平策略

ReentrantLock非公平锁策略的内部实现和公平锁策略的最主要的区别在于:
1、公平锁获取锁时,会判断等待队列中是否有线程排在当前线程前面,只有在没有的情况下,才去获取锁,这才是公平的意义。

protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&   //只有当前线程没有前驱结点时,才对锁状态进行修改compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//此处省略部分源码}

2、非公平锁获取锁时,会立即尝试修改同步状态,失败后再调用AQS的acquire方法。

	final void lock() {if (compareAndSetState(0, 1))   //先尝试更改锁状态,如果正好此时有线程释放锁,当前线程正好抢到了,使得在等待队列前面的线程没有获得锁,这就是非公平的意义setExclusiveOwnerThread(Thread.currentThread());  //如果抢到了锁,就将锁的持有者更改为自己elseacquire(1);     //如果尝试获取锁失败,则}

acquire方法调用tryAcquire方法,其实调用的是nonfairTryAcquire方法,该方法相对于公平锁,只是少了“队列中是否有其他线程排在当前线程前面”这一判断。

 final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {   //没有判断当前结点是否有前驱结点setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

AQS对中断的支持

ReentrantLock的lockInterruptibly方法是会响应中断的。(线程如果在阻塞中被中断,会跑出InterruptedException异常)
该方法实际调用的是acquireInterruptibly方法,

	public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}
public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))   //尝试获取锁doAcquireInterruptibly(arg);  }

上述代码显示,如果线程中断标志位为true,直接抛异常;然后尝试去获取锁,如果获取失败,则调用doAcquireInterruptibly方法,方法源码如下:

private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();   //这里直接抛出异常}} finally {if (failed)cancelAcquire(node);}}

这个方法跟acquireQueued方法唯一不同就是:当线程获取锁失败,进入阻塞之后,如果中途被打断,acquireQueued方法只是用一个标识记录线程被中断,而doAcquireInterruptibly则是直接抛出异常。

AQS对限时等待的支持

Lock接口中有tryLock方法,tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。tryLock()有一个重载方法,这个方法就是tryLock(long timeout, TimeUnit unit).
用于在指定的时间内尝试获取锁,获取不到就返回。

    public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}
	public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}

关键是doAcquireNano方法,和acquireQuqued方法类似,又是一个自旋操作,在超时前不断尝试获取锁,获取不到则阻塞(加上了等待时间的判断)。该方法内部,调用了LockSupport.parkNanos来超时阻塞线程:

private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);   //最终还会执行该方法}}

如果当前线程在指定时间内获取不到锁,除了返回false外,最终还会执行cancelAcquire方法:

   private void cancelAcquire(Node node) {if (node == null)return;node.thread = null;Node pred = node.prev;while (pred.waitStatus > 0)  node.prev = pred = pred.prev;  //指向第一个CANCELLED线程Node predNext = pred.next;node.waitStatus = Node.CANCELLED;   //将当前结点标记为CANCELLED:1if (node == tail && compareAndSetTail(node, pred)) {  //如果当前结点为尾结点,则尝试直接移除compareAndSetNext(pred, predNext, null);  } else {    //当前结点不是尾结点或尝试移除失败(存在尾部并发操作)int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;      if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);  //这一段代码的意思是,前驱结点.next = 当前结点的后继结点。} else {unparkSuccessor(node);}node.next = node; // help GC}}

AQS的condition等待

以ReentrantLock的调用为例,说明AbstractQueuedSynchronizer提供的Conditon等待功能。Condition接口的实现类其实是在AQS中——ConditionObject,ReentranLock的newConditon方法其实是创建了一个AbstractQueuedSynchronizer.ConditionObject对象:

	public Condition newCondition() {return sync.newCondition();}final ConditionObject newCondition() {return new ConditionObject();}

用一个例子来讲解await()方法与signal方法:

假设现在有3个线程:ThreadA、ThreadB、ThreadC,一个condition实现对象。
ReentrantLock lock = new ReentrantLock();
Conditon con = lock.newConditon();
线程调用顺序:
//ThreadA先调用lock方法获取到锁,然后调用con.await()
//ThreadB获取锁,调用con.signal()唤醒ThreadA
//ThreadB释放锁
  1. ThreadA获取到锁后,首先调用await方法
		public final void await() throws InterruptedException {if (Thread.interrupted())   //响应中断throw new InterruptedException();Node node = addConditionWaiter();   //插入条件队列int savedState = fullyRelease(node);  //释放锁,返回释放前的同步状态int interruptMode = 0;             while (!isOnSyncQueue(node)) {    //判断当前结点在等待队列中LockSupport.park(this);      //将当前线程阻塞if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  //检验线程在等待的时候是否中断。break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}

awai执行步骤如下:
(1)释放锁
(2)如果当前线程不在同步队列中,则执行(3)步;否则执行(4)步;
(3)先将当前线程阻塞,如果锁中断模式与中断状态均不为0,执行第(4)步。
(4)执行acquireQueued方法,自旋方式尝试去获取锁。

  1. ThreadB获取到锁后,首先调用signal方法

由于Condition的signal方法要求线程必须获得与此Condition对象相关联的锁,所以这里有个中断判断:

 public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();   //如果线程未持有锁,抛锁状态异常Node first = firstWaiter;  if (first != null)  //条件队列不为空doSignal(first);  }
  /*** 尝试转换当前结点,并插入等待队列/private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}

//将节点从条件队列转移到等待队列。 如果成功,则返回true。final boolean transferForSignal(Node node) {if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}

此时条件队列为null,而ThreadA被包装成新结点之后会添加到等待队列。

  1. ThreadB释放锁
    终于ThreadB释放了锁,释放成功后,会调用unparkSuccessor方法(参考AQS独占功能的讲解),唤醒队列中的首结点。

  2. ThreadA从唤醒处继续执行

因为调用acquireQueued方法再次尝试获取锁,ThreadA被唤醒后,,获取成功后从await方法的阻塞处开始继续往下执行。

AQS共享功能

前面独占功能采用的是ReentrantLock给大家讲解的源码,针对于AQS的共享功能,我们将以CountDownLatch为例,来分享AQS的共享功能。CountDownLatch,是J.U.C中的一个同步器类,可作为倒数计数器使用。
直接说案例:
CountDownLatch示例

假设现在有3个线程,ThreadA、ThreadB、mainThread,CountDownLatch初始计数为1:
CountDownLatch switcher = new CountDownLatch(1);
//ThreadA调用await()方法等待
//ThreadB调用await()方法等待
//主线程main调用countDown()放行
	public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}//调用Sync 构造函数 Sync(int count) {setState(count);   //count为计数器的初始化大小}
  1. ThreadA调用await()方法等待
	public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
	public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}

tryAcquireShared方法尝试获取锁。

	protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}

当锁状态标志不为0时,tryAcquireShared方法返回-1,此时会执行doAcquireSharedInterruptibly方法。

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);  //包装成共享结点,插入等待队列boolean failed = true;try {for (;;) {     //自旋操作final Node p = node.predecessor();  if (p == head) {int r = tryAcquireShared(arg);   //尝试获取锁if (r >= 0) {               //大于0表示获取成功setHeadAndPropagate(node, r); //这里将当前结点设置为头结点,唤醒其后面的结点。p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&   //是否需要阻塞当前结点parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

大概流程如下
(1)通过addWaiter方法将ThreadA包装成共享结点,插入等待队列,插入后队列结构如下:

(2)然后会进入自旋操作,先尝试获取一次锁,显然此时是获取失败的(主线程main还未调用countDown,同步状态State还是1)。
(3)然后判断是否要进入阻塞(shouldParkAfterFailedAcquire),shouldParkAfterFailedAcquire函数在前面独占锁已经有提过,这里不过多赘述,ThreadA获取锁失败后,进入阻塞。最终队列结构如下:

  1. ThreadB调用await()方法等待
    流程和ThreadA完全相同,调用后ThreadB也被加入到等待队列中 :

  1. 主线程main调用countDown()放行
    ThreadA和ThreadB调用了await()方法后都在等待了,现在主线程main开始调用countDown()方法,当锁状态state变为0时,ThreadA和ThreadB都会被唤醒,并继续往下执行,达到类似门栓的作用。
public void countDown() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {   //尝试释放锁doReleaseShared();return true;}return false;}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
private void doReleaseShared() {for (;;) {          //CAS操作Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  //将线程状态从signal设置为0,表示将唤醒后继结点。continue;           unparkSuccessor(h);   //唤醒后继结点}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;               }if (h == head)                  break;}}

先调用compareAndSetWaitStatus将头结点的等待状态置为0,表示将唤醒后续结点(ThreadA),成功后的等待队列结构如下:

然后调用unparkSuccessor唤醒后继结点:此时,等待队列的结构如下:

  1. ThreadA从原阻塞处继续向下执行
    ThreadA被唤醒后,会从原来的阻塞处继续向下执行:
    由于是一个自旋操作,ThreadA会再次尝试获取锁,由于此时State同步状态值为0(无锁状态),所以获取成功。然后调用setHeadAndPropagate方法:
    private void doAcquireSharedInterruptibly(int arg)//此处省略部分源码for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//此处省略部分源码}
   private void setHeadAndPropagate(Node node, int propagate) {Node h = head; //记录旧的头结点setHead(node);    //将node设置为新的头结点//判断是否需要唤醒后继结点if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();  //释放并唤醒后继结点}}

执行步骤如下
(1)将ThreadA变成头结点

		![在这里插入图片描述](.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L21rZmth,size_16,color_FFFFFF,t_70#pic_center)

(2)调用doReleaseShared方法,释放并唤醒ThreadB结点

  1. ThreadB从原阻塞处继续向下执行

    ThreadB被唤醒后,从原阻塞处继续向下执行,这个过程和步骤5(ThreadA唤醒后继续执行)完全一样。setHeadAndPropagate方法把ThreadB结点变为头结点,并根据传播状态判断是否要唤醒并释放后继结点:

    (1)将ThreadB变成头结点

    (2)调用doReleaseShared方法,释放并唤醒后继结点(此时没有后继结点了,则直接break),最终队列状态如下:

AQS的共享功能,与独占功能最主要的区别就是:

共享功能的结点,一旦被唤醒,会向队列后部传播(Propagate)状态,以实现共享结点的连续唤醒。这也是共享的含义,当锁被释放时,所有持有该锁的共享线程都会被唤醒,并从等待队列移除。

AQS读写锁

在非公平策略中,写锁的获取永远不需要排队,这其实时性能优化的考虑,因为大多数情况写锁涉及的操作时间耗时要远大于读锁,频次远低于读锁,这样可以防止写线程一直处于饥饿状态。
关于ReentrantReadWriteLock,有两点规律需要注意:

1、当RRW的等待队列队首结点是共享结点,说明当前写锁被占用(因为首节点是共享结点,是读线程阻塞,说明拥有锁的是写线程),当写锁释放时,会以传播的方式唤醒头结点之后紧邻的各个共享结点。

2、当RRW的等待队列队首结点是独占结点,当前既可能被读锁占用,也可能被写锁占用。因为读写互斥、写写互斥。

ReentrantReadWriteLock通过两个内部类同时实现了AQS的两套API,核心部分与共享/独占锁并无什么区别。

后面有更加厉害的StampedLock,再详细拿源码分析。

更多推荐

java并发编程——从源码手把手带你理解AQS源码

本文发布于:2024-02-06 07:37:56,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1747554.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:源码   手把手   带你   java   AQS

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!