admin管理员组

文章数量:1599441

Condition

        Jdk中独占锁的实现除了使用关键字synchronized外,还可以使用ReentrantLock。虽然在性能上ReentrantLock和synchronized没有什么区别,但ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。ReentrantLock的lock和unlock的原理之前文章已经讲过。

        -->> 面试难点:深度解析ReentrantLock的实现原理

        使用synchronized结合Object上的wait和notify方法可以实现线程间的等待通知机制。Condition同样可以实现这个功能,而且相比前者使用起来更清晰也更简单。前者是java底层级别的,后者是语言级别的,后者可控制性和扩展性更好。

与wait/notify区别 

        1.Condition能够支持不响应中断,而通过使用Object方式不支

        2.Condition能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个 

        3.Condition能够支持超时时间的设置,而Object不支持

等待队列结构

       

‍        为了方便理解,我们先写一个用condition实现的生产者消费者的例子。

public class Demo {
    static volatile int i = 0;
    static final ReentrantLock LOCK = new ReentrantLock();
    static final Condition condition = LOCK.newCondition();


    public static void add() throws InterruptedException {
        synchronized (LOCK) {
            while (i == 0) {
                System.out.print("add\t");
                System.out.println(++i);
                condition.signal();
                condition.await();
            }
        }
    }
    public static void sub() throws InterruptedException {
        LOCK.lock();
        try {
            while (i == 1) {
                System.out.print("sub\t");
                System.out.println(--i);
                condition.signal();
                condition.await();
            }
        }finally {
            LOCK.unlock();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                    add();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                    sub();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

        可以看到,想要获得一个condition对象,需要首先通过一个ReentrantLock锁来创建,而最终调用其实为AQS中的内部类ConditionObject。

Condition condition = LOCK.newCondition();
//源码
//ReentrantLock内部类Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
    final ConditionObject newCondition() {
        return new ConditionObject();
    }
}
//AQS内部类 ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;


    //真正的创建Condition对象
    public ConditionObject() { }
    }

        condition是要和lock配合使用的,而lock的实现原理又依赖于AQS,所以AQS内部实现了ConditionObject。我们知道在锁机制的实现上,AQS内部维护了一个双向的同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列。condition内部也是使用相似的方式,内部维护了一个单向的 等待队列,所有调用condition.await方法的线程会加入到等待队列中,并且线程状态转换为等待状态。

        ConditionObject中有两个成员变量:头节点firstWaiter 和 尾节点lastWaiter ,同步队列的成员Node 复用了实现同步队列的内部类Node。用nextWaiter保存了下一个等待节点,结构如图。

用Object的方式Object对象监视器上只能拥有一个同步队列和一个等待队列,而使用Lock可以有有一个同步队列和多个等待队列。可以多次调用lock.newCondition()创建多个Condition,所以一个Lock可以持有多个等待队列,如图。

await原理

        当前线程进入等待状态(进入等待队列),如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常。如果该线程能够从await()方法返回的话一定是该线程获取了与condition相关联的lock。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾
    Node node = addConditionWaiter();
    // 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 3. 当前线程进入到等待状态
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 4. 自旋等待获取到同步状态(即获取到lock)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        //删除无效的等待节点
        unlinkCancelledWaiters();
    // 5. 处理被中断的情况
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

        主要执行过程见代码中注释。保存新节点addConditionWaiter()方法如下。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 清除被取消的尾节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        //解除关联
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //将当前线程保存在Node中
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        //队尾插入
        t.nextWaiter = node;
    //更新lastWaiter
    lastWaiter = node;
    return node;
}

        将当前节点保存到新建立的Node,如果等待队列的firstWaiter为null的话(等待队列为空队列),则将firstWaiter指向当前的Node,否则,更新lastWaiter(尾节点)即可。可以看出等待队列是一个不带头结点的链式队列,而AQS中的同步队列是一个带头结点的链式队列。

        将当前节点插入到等待对列之后,会调用fullyRelease,使当前线程释放lock。

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            //成功释放同步状态
            failed = false;
            return savedState;
        } else {
            //不成功释放同步状态抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

        方法内部调用AQS的模板方法release方法释放AQS的同步状态,并且唤醒在同步队列中头结点的后继节点引用的线程,如果释放成功则正常返回,若失败的话就抛出异常。

while (!isOnSyncQueue(node)) {
    // 3. 当前线程进入到等待状态
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

        当线程第一次调用condition.await()方法时,会进入到这个while()循环中,然后通过LockSupport.park(this)方法使得当前线程进入等待状态,那么要想退出这个await方法第一个前提条件自然而然的是要先退出这个while循环,有两种可能: 

        1. 逻辑走到break退出while循环(当前等待的线程被中断) 

        2. while循环中的逻辑判断为false(当前节点被移动到了同步队列中,即另外线程调用的condition的signal或者signalAll方法)。 

        

        总的说就是当前线程被中断或者调用condition.signal/condition.signalAll方法当前节点移动到了同步队列后 ,这是当前线程退出await方法的前提条件。当退出while循环后就会调用acquireQueued(node, savedState)(之前Reentlock中讲过),自旋过程中线程不断尝试获取同步状态,直至获取lock成功。这也说明了退出await方法必须是已经获得了condition关联的lock。

signal原理

        调用condition的signal唤醒一个等待在condition上的线程(头节点),将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回,源码如下。

public final void signal() {
    //1. 先检测当前线程是否已经获取lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

        signal方法首先会检测当前线程是否已经获取lock,没有获取lock会直接抛出异常,再调用doSignal传入头节点。doSignal方法源码为:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //1. 将头结点从等待队列中移除
        first.nextWaiter = null;
        //2. while中transferForSignal方法对头结点做真正的处理
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

        具体逻辑请看注释,真正对头节点做处理的逻辑在transferForSignal放,该方法源码为:

final boolean transferForSignal(Node node) {
    //1. 更新状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //2.将该节点移入到同步队列中去
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

        关键逻辑请看注释,这段代码主要做了两件事情。

        1.将头结点的状态更改为CONDITION

        2.调用enq方法,将该节点尾插入到同步队列中。

        由此可以看出,调用condition的signal的前提条件是当前线程已经获取了lock,该方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列,而移入到同步队列后才有机会使得等待线程被唤醒,即从await方法中的LockSupport.park(this)方法中返回,从而才有机会使得调用await方法的线程成功退出。

        signalAll与signal方法的区别体现在doSignalAll方法上,前面我们已经知道doSignal方法只会对等待队列的头节点进行操作,代码如下。

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

        该方法只不过时间等待队列中的每一个节点都移入到同步队列中,即“通知”当前调用condition.await()方法的每一个线程。

关注并后台回复 “面试” 或者  “视频”

即可免费获取最新2019BAT

大厂面试题和大数据微服务视频

您的分享和支持是我更新的动力

·END·

后端开发技术

追求技术的深度

微信号:后端开发技术

本文标签: 详解原理ReentrantLockcondition