ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

JDK15源码(五):await和signal

2021-02-12 09:32:23  阅读:181  来源: 互联网

标签:node 结点 队列 JDK15 ConditionNode 源码 线程 signal Condition


await和signal方法使用

await和signal方法是基于ReentrantLock的Condition使用的。并且await方法要先于singnal,否则await方法会一直阻塞该线程。lockA线程调用ReentrantLock实例的lock方法占用锁,再调用Condition实例的await方法释放锁挂起线程;2秒后,lockB线程调用ReentrantLock实例的lock方法占用锁,再调用Condition实例的signal方法将lockA放在同步队列中,lockB调用unlock方法释放锁之后就会唤醒lockA线程。

private static Lock lock = new ReentrantLock();
@Test
public void testAwaitAndSignal() {
    Condition condition = lock.newCondition();
    new Thread(() -> {
        lock.lock();
        try {
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }, "lockA").start();

    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    new Thread(() -> {
        lock.lock();
        condition.signal();
        lock.unlock();
    }, "lockB").start();
}

Condition

(1)Condition能与任意的Lock结合使用,Lock代替Synchronized方法和语句,Condition代替Object监控对象。Condition.await()可以用来代替Object.wait(),Condition.signal可以用来代替Object.notify()。

(2)条件(也称为条件队列或条件变量)为一种线程中止执行“等待”直到另一线程通知某些状态条件现在为真提供了一种手段。由于对该共享状态信息的访问发生在不同的线程中,因此必须对其进行保护,因此某种形式的锁与该条件相关联。Condition实例从本质上绑定到锁。要获取特定Lock实例的Condition实例,请使用其Lock#newCondition方法。

package java.util.concurrent.locks;

import java.util.Date;
import java.util.concurrent.TimeUnit;

public interface Condition {

    //当前线程一直等待直到被或许或者中断
    void await() throws InterruptedException;

    //当前线程一直等待直到被唤醒
    void awaitUninterruptibly();

    //当前线程会一直等待直到它被唤醒或者中断或者指定的时间到期。
    long awaitNanos(long nanosTimeout) throws InterruptedException;

   
    //当前线程会一直等待直到它被唤醒或者中断或者指定的时间到期。 
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    //当前线程会一直等待直到它被唤醒或者中断或者指定的时间到期。
    boolean awaitUntil(Date deadline) throws InterruptedException;

    //唤醒一个等待的线程
    void signal();

    //唤醒全部的等待线程
    void signalAll();
}

ConditionNode

ConditionNode是AbstractQueuedSynchronizer(AQS同步器)的静态final内部类。ConditionNode(条件结点)继承了Node类,实现了ForkJoinPool.ManagedBlocker接口。

static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
    //表示下一个等待结点
    ConditionNode nextWaiter;            

    //允许在ForkJoinPools中使用条件,而不会冒固定池耗尽的风险。 这仅适用于非定时条件等待,不适用于定时版本。
    public final boolean isReleasable() {
        //如果结点是条件等待且不是中断状态,返回false。
        return status <= 1 || Thread.currentThread().isInterrupted();
    }

    public final boolean block() {
        //如果结点是条件等待且不是中断状态,挂起线程
        while (!isReleasable()) LockSupport.park();
        return true;
    }
} 

ConditionObject

ConditionObject是AbstractQueuedSynchronizer(AQS同步器)的内部类。维护了2个属性,一个是firstWait,表示条件队列的第一个结点;另一个是lastWaiter,表示条件队列的最后一个结点。提供了一个无参构造函数用于构建ConditionObject实例。

public class ConditionObject implements Condition, java.io.Serializable {

    private transient ConditionNode firstWaiter;

    private transient ConditionNode lastWaiter;

    public ConditionObject() { }
}

ReentrantLock.Sync.isHeldExclusively

ReentrantLock有个内部类Sync,Sync实现了AbstractQueueSynchronizer接口。

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

ConditionObject#canReacquire

//如果一个结点是在条件队列初始化,现在也存放在同步队列就会返回true。
private boolean canReacquire(ConditionNode node) {
    return node != null && node.prev != null && isEnqueued(node);
}

AbstractQueueSynchronizer#isEnqueued

从同步队列的尾结点往前遍历,判断给定的结点是否在同步队列中。

//判断结点是否在同步队列中
final boolean isEnqueued(Node node) {
    for (Node t = tail; t != null; t = t.prev)
        if (t == node)
            return true;
    return false;
}

ConditionObject#enableWait

这个方法主要是将条件结点添加到条件队列,然后释放占用的锁资源。

static final int WAITING   = 1;          // 二进制表示是 0000 0001
static final int COND      = 2;          // 二进制表示是 0000 0010

private int enableWait(ConditionNode node) {
    //判断锁的持有者是否是当前线程
    if (isHeldExclusively()) {
        //结点的等待线程是当前线程
        node.waiter = Thread.currentThread();
        //WAITING | COND = 0000 0001 | 0000 0010 = 0000 0011 = 3
        node.setStatusRelaxed(COND | WAITING);
        ConditionNode last = lastWaiter;
        //将结点加入到条件队列尾部
        if (last == null)
            firstWaiter = node;
        else
            last.nextWaiter = node;
        lastWaiter = node;
        //获取state值
        int savedState = getState();
        //释放锁资源
        if (release(savedState))
            return savedState;
    }
    //释放锁资源失败,状态设置为取消,抛出IllegalMonitorStateException。
    node.status = CANCELLED;
    throw new IllegalMonitorStateException();
}

ConditionObject#await

(1)创建一个ConditionNode。
(2)调用enableWait方法将将新创建的ConditionNode加入到条件队列中,然后释放占用的锁资源。
(3)判断新建条件结点是否在同步队列中,存在返回true,不存在返回false。
(4)如果新建条件结点不在同步队列中,判断当前线程是否是中断状态,如果是中断状态,从同步队列中移出该条件结点,并调用当前线程的中断方法。如果不是中断状态,调用ForkJoinPool.managedBlock方法挂起线程。
(5)如果新建条件结点在同步队列中,清除新建条件结点的状态,重置为0,并尝试获取锁。

public final void await() throws InterruptedException {
    //判断当前线程是否是中断状态,如果为true,抛出InterruptedException。
    if (Thread.interrupted())
        throw new InterruptedException();
    //创建一个条件结点    
    ConditionNode node = new ConditionNode();
    //将条件结点放在条件队列,释放lock占用的资源
    int savedState = enableWait(node);
    LockSupport.setCurrentBlocker(this); // for back-compatibility
    boolean interrupted = false, cancelled = false;
    while (!canReacquire(node)) {
        //判断当前线程是否是中断状态,false | false = false
        if (interrupted |= Thread.interrupted()) {
            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                break;              // else interrupted after signal
        //node.state = 3; 3 & 2 = 2        
        } else if ((node.status & COND) != 0) {
            try {
                //线程堵塞
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                interrupted = true; 
            }
        } else
            Thread.onSpinWait();
    }
    LockSupport.setCurrentBlocker(null);
    node.clearStatus();
    acquire(node, savedState, false, false, false, 0L);
    if (interrupted) {
        if (cancelled) {
            unlinkCancelledWaiters(node);
            throw new InterruptedException();
        }
        Thread.currentThread().interrupt();
    }
}

ConditionObject#signal

public final void signal() {
    //获取条件队列的第一个等待结点
    ConditionNode first = firstWaiter;
    //如果持有锁的线程不是当前线程,抛出IllegalMonitorStateException异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //判断第一个等待结点是否为空    
    if (first != null)
        //将第一个等待结点放在同步队列尾部
        doSignal(first, false);
}

ConditionObject#signal

(1)重新设置条件队列的第一个条件结点。
(2)将给定的条件结点添加到同步队列的尾部。
(3)入参有一个boolean值,表示是否将全部的条件结点都放在同步队列中。

private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        //获取first结点的后一个条件结点
        ConditionNode next = first.nextWaiter;
        //将后一个结点设置为第一个条件结点,如果next结点为空,条件队列的最后一个结点也为null。
        if ((firstWaiter = next) == null)
            lastWaiter = null;
        //COND=2(0000 0010)
        //`COND=`2=-3(10000000 00000000 00000000 00000011) & 00000010 = 00000010 = 2  
        if ((first.getAndUnsetStatus(COND) & COND) != 0) {
            //将结点添加到同步队列的尾部
            enqueue(first);
            if (!all)
                break;
        }
        first = next;
    }
}

AbstractQueueSynchoronizer#enqueue

final void enqueue(Node node) {
    if (node != null) {
        for (;;) {
            Node t = tail;
            node.setPrevRelaxed(t);        // avoid unnecessary fence
            //判断尾结点是否为空,如果为空,初始化头结点和尾结点
            if (t == null)                 // initialize
                tryInitializeHead();
            //将新结点添加同步    
            else if (casTail(t, node)) {
                t.next = node;
                if (t.status < 0)          // wake up to clean link
                    LockSupport.unpark(node.waiter);
                break;
            }
        }
    }
}

总结

(1)await和signal方法是接口Condition的方法。ReentrantLock的内部类ConditionObject实现了Condition接口,暴露了一个newCondition方法可以让我们获取到ConditionObject对象。

(2)await和signal方法的阻塞释放机制是:A线程获取锁成功,调用await方法后进入等待队列,释放占用的锁,挂起A线程;然后B线程获取锁成功,调用signal方法将第一个条件结点放在同步队列中,然后释放锁,唤醒同步队列的第一个等待结点(head.next, head不关联任何线程信息)。signalAll方法会将条件队列的全部结点都加入到同步队列中。

标签:node,结点,队列,JDK15,ConditionNode,源码,线程,signal,Condition
来源: https://blog.csdn.net/u012734723/article/details/113791379

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有