java并发 AQS
创始人
2025-05-31 19:37:14
  • 什么是AQS? 为什么它是核心?
  • AQS的核心思想是什么? 它是怎么实现的? 底层数据结构等
  • AQS有哪些核心的方法?
  • AQS定义什么样的资源获取方式? AQS定义了两种资源获取方式:独占(只有一个线程能访问执行,又根据是否按队列的顺序分为公平锁和非公平锁,如ReentrantLock) 和共享(多个线程可同时访问执行,如SemaphoreCountDownLatchCyclicBarrier )。ReentrantReadWriteLock可以看成是组合式,允许多个线程同时对某一资源进行读。
  • AQS底层使用了什么样的设计模式? 模板
  • AQS的应用示例

文章目录

  • 1 AQS(AbstractQueuedSynchronizer)
  • 2 AQS的核心思想
  • 3 AQS底层使用的模板设计模式
  • 4 AQS数据结构
    • 4.1 AQS的继承关系
    • 4.2 AQS的内部类 - Node类
    • 4.3 AQS的内部类 - ConditionObject类
    • 4.4 类的属性
    • 4.5 类的核心方法 -获取资源- acquire方法
      • 4.5.1 tryAcquire
      • 4.5.2 addWaiter
      • 4.5.3 acquireQueue
    • 4.6 类的核心方法 -释放资源- release方法
  • 提一句LockSupport
    • 3种让线程等待和唤醒的方法(线程通信)
      • synchronized + wait + notify
      • JUC包中Condition的await()方法让线程等待,signal()方法唤醒线程
      • LockSupport类可以阻塞当前线程以及唤醒指定被阻塞的线程

1 AQS(AbstractQueuedSynchronizer)

AQS是一个构建锁和同步器的框架(模板)。使用AQS可以简单构建出应用广泛的同步器如:ReentrantLockSemaphore,其他的诸如ReentrantReadWriteLockSynchronousQueueFutureTask等等皆是基于AQS的。我们也可以使用AQS构建符合自己需求的同步器

2 AQS的核心思想

核心思想: 如果被请求的共享资源空闲,则将请求线程设为有效状态,并将共享资源设为锁定状态。如果共享资源被占有,则需要一套线程阻塞等待以及被唤醒时锁分配的机制。这个机制AQS是使用CLH队列锁实现的,即将暂时获取不到锁的线程放入队列。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS来对该同步状态进行原子性操作

private volatile int state;//共享变量,使用volatile修饰保证线程可见性

状态信息通过procted类型的getState,setState,compareAndSetState进行操作

//返回同步状态的当前值
protected final int getState() {  return state;
}// 设置同步状态的值
protected final void setState(int newState) { state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS对资源的共享方式:

  1. 排他:只能有一个线程独占,如ReentrantLock。又可分为公平锁非公平锁
    • 公平锁:按排队顺序拿锁
    • 非公平锁: 无视排队顺序,抢占式
  2. 共享:可以被多个线程占有。这个类并不 "理解 "排他和共享差异,只是在机械意义上,当一个共享模式的获取成功后,下一个等待的线程(如果存在的话)也必须确定它是否也能获取。

不同模式(排他,共享)下的线程共享同一个FIFO队列,通常来说一个实现类只需支持一个模式,但是如ReadWriteLock就同时支持了两种

3 AQS底层使用的模板设计模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state获取和释放) 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

自定义同步器时需要重写下面几个AQS提供的模板方法

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

虽然AQS是基于FIFO队列的,但并不是强制使用FIFO的策略。独占同步器的核心形式如下:(共享模式类似,但可能涉及级联信号。)

	Acquire:while (!tryAcquire(arg)) {enqueue thread if it is not already queued;possibly block current thread;}Release:if (tryRelease(arg))unblock the first queued thread;

4 AQS数据结构

AbstractQueuedSynchronizer类底层的数据结构是使用CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是把每条请求共享资源的线程封装成一个CLH锁队列的一个Node来实现锁分配。

其中Sync queue(同步队列),包含一个head node 和一个tail nodehead结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue

4.1 AQS的继承关系

在这里插入图片描述

AQS非公平锁就是,一来就先去插队,如果插队失败,才去乖乖的排队。

4.2 AQS的内部类 - Node类

static final class Node {// 模式,分为共享与独占// 共享模式static final Node SHARED = new Node();// 独占模式static final Node EXCLUSIVE = null;        // 结点状态// CANCELLED,值为1,表示当前的线程被取消// SIGNAL,值为-1,表示当前节点被阻塞,后继节点包含的线程需要运行,也就是unpark// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行// 值为0,表示当前节点在sync队列中,等待着获取锁static final int CANCELLED =  1;static final int SIGNAL    = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;        // 结点状态volatile int waitStatus;        // 前驱结点volatile Node prev;    // 后继结点volatile Node next;        // 结点所对应的线程volatile Thread thread;        // 下一个等待者Node nextWaiter;// 结点是否在共享模式下等待final boolean isShared() {return nextWaiter == SHARED;}// 获取前驱结点,若前驱结点为空,抛出异常final Node predecessor() throws NullPointerException {// 保存前驱结点Node p = prev; if (p == null) // 前驱结点为空,抛出异常throw new NullPointerException();else // 前驱结点不为空,返回return p;}// 无参构造方法Node() {    // Used to establish initial head or SHARED marker}// 构造方法Node(Thread thread, Node mode) {    // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}// 构造方法Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}

每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。

  • CANCELLED,值为1,表示当前的线程被取消。
  • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。
  • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。
  • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。
  • 值为0,表示当前节点在sync queue中,等待着获取锁。

4.3 AQS的内部类 - ConditionObject类

方法:

  • private Node addConditionWaiter(): 添加新的waiter到condition队列中

  • private void doSignal(Node first): 移除并转移首个节点到syn同步队列,直到遇到未取消的节点或空节点。

  • private void doSignalAll(Node first) : 移除并转移所有节点到syn同步队列

  • private void unlinkCancelledWaiters() : 从condition队列中清除状态为CANCEL的结点

  • public final void signal(): 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。

  • public final void signalAll(): 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。

  • public final void awaitUninterruptibly(): 等待,当前线程在接到信号之前一直处于等待状态,不响应中断

  • public final void await(): 等待,当前线程在接到信号或被中断之前一直处于等待状态

  • public final long awaitNanos(long nanosTimeout): 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态

  • public final boolean awaitUntil(Date deadline): 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态

  • protected final boolean hasWaiters() 查询是否有正在等待此条件的任何线程

  • protected final Collection getWaitingThreads() 返回包含那些可能正在等待此条件的线程集合

// 内部类
public class ConditionObject implements Condition, java.io.Serializable {// 版本号private static final long serialVersionUID = 1173984872572414699L;/** First node of condition queue. */// condition队列的头节点private transient Node firstWaiter;/** Last node of condition queue. */// condition队列的尾结点private transient Node lastWaiter;/*** Creates a new {@code ConditionObject} instance.*/// 构造方法public ConditionObject() { }// Internal methods/*** Adds a new waiter to wait queue.* @return its new wait node*/// 添加新的waiter到wait队列private Node addConditionWaiter() {// 保存尾结点Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) { // 尾结点不为空,并且尾结点的状态不为CONDITION// 清除状态为CONDITION的结点unlinkCancelledWaiters(); // 将最后一个结点重新赋值给tt = lastWaiter;}// 新建一个结点Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null) // 尾结点为空// 设置condition队列的头节点firstWaiter = node;else // 尾结点不为空// 设置为节点的nextWaiter域为node结点t.nextWaiter = node;// 更新condition队列的尾结点lastWaiter = node;return node;}/*** Removes and transfers nodes until hit non-cancelled one or* null. Split out from signal in part to encourage compilers* to inline the case of no waiters.* @param first (non-null) the first node on condition queue*/private void doSignal(Node first) {// 循环do {if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空// 设置尾结点为空lastWaiter = null;// 设置first结点的nextWaiter域first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头节点不为空,一直循环}/*** Removes and transfers all nodes.* @param first (non-null) the first node on condition queue*/private void doSignalAll(Node first) {// condition队列的头节点尾结点都设置为空lastWaiter = firstWaiter = null;// 循环do {// 获取first结点的nextWaiter域结点Node next = first.nextWaiter;// 设置first结点的nextWaiter域为空first.nextWaiter = null;// 将first结点从condition队列转移到sync队列transferForSignal(first);// 重新设置firstfirst = next;} while (first != null);}/*** Unlinks cancelled waiter nodes from condition queue.* Called only while holding lock. This is called when* cancellation occurred during condition wait, and upon* insertion of a new waiter when lastWaiter is seen to have* been cancelled. This method is needed to avoid garbage* retention in the absence of signals. So even though it may* require a full traversal, it comes into play only when* timeouts or cancellations occur in the absence of* signals. It traverses all nodes rather than stopping at a* particular target to unlink all pointers to garbage nodes* without requiring many re-traversals during cancellation* storms.*/// 从condition队列中清除状态为CANCEL的结点private void unlinkCancelledWaiters() {// 保存condition队列头节点Node t = firstWaiter;Node trail = null;while (t != null) { // t不为空// 下一个结点Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态// 设置t节点的nextWaiter域为空t.nextWaiter = null;if (trail == null) // trail为空// 重新设置condition队列的头节点firstWaiter = next;else // trail不为空// 设置trail结点的nextWaiter域为next结点trail.nextWaiter = next;if (next == null) // next结点为空// 设置condition队列的尾结点lastWaiter = trail;}else // t结点的状态为CONDTION状态// 设置trail结点trail = t;// 设置t结点t = next;}}// public methods// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。public final void signal() {if (!isHeldExclusively()) // 不被当前线程独占,抛出异常throw new IllegalMonitorStateException();// 保存condition队列头节点Node first = firstWaiter;if (first != null) // 头节点不为空// 唤醒一个等待线程doSignal(first);}// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。public final void signalAll() {if (!isHeldExclusively()) // 不被当前线程独占,抛出异常throw new IllegalMonitorStateException();// 保存condition队列头节点Node first = firstWaiter;if (first != null) // 头节点不为空// 唤醒所有等待线程doSignalAll(first);}// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断public final void awaitUninterruptibly() {}/** Mode meaning to reinterrupt on exit from wait */private static final int REINTERRUPT =  1;/** Mode meaning to throw InterruptedException on exit from wait */private static final int THROW_IE    = -1;/*** Checks for interrupt, returning THROW_IE if interrupted* before signalled, REINTERRUPT if after signalled, or* 0 if not interrupted.*/private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0; }/*** Throws InterruptedException, reinterrupts current thread, or* does nothing, depending on mode.*/private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();}// // 等待,当前线程在接到信号或被中断之前一直处于等待状态public final void await() throws InterruptedException {// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 public final long awaitNanos(long nanosTimeout)throws InterruptedException {}// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态public final boolean awaitUntil(Date deadline)throws InterruptedException {}// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0public final boolean await(long time, TimeUnit unit)throws InterruptedException {}final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {return sync == AbstractQueuedSynchronizer.this;}//  查询是否有正在等待此条件的任何线程protected final boolean hasWaiters() {}// 返回正在等待此条件的线程数估计值protected final int getWaitQueueLength() {}// 返回包含那些可能正在等待此条件的线程集合protected final Collection getWaitingThreads() {}
}

此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下

public interface Condition {// 等待,当前线程在接到信号或被中断之前一直处于等待状态void await() throws InterruptedException;// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断void awaitUninterruptibly();//等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 long awaitNanos(long nanosTimeout) throws InterruptedException;// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0boolean await(long time, TimeUnit unit) throws InterruptedException;// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态boolean awaitUntil(Date deadline) throws InterruptedException;// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。void signal();// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。void signalAll();
}

4.4 类的属性

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizerimplements java.io.Serializable {    // 版本号private static final long serialVersionUID = 7373984972572414691L;    // 头节点private transient volatile Node head;    // 尾结点private transient volatile Node tail;    // 状态private volatile int state;    // 自旋时间static final long spinForTimeoutThreshold = 1000L;// Unsafe类实例private static final Unsafe unsafe = Unsafe.getUnsafe();// state内存偏移地址private static final long stateOffset;// head内存偏移地址private static final long headOffset;// state内存偏移地址private static final long tailOffset;// tail内存偏移地址private static final long waitStatusOffset;// next内存偏移地址private static final long nextOffset;// 静态初始化块static {try {stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }}
}

4.5 类的核心方法 -获取资源- acquire方法

该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下:
acquire传入的是 Node.EXCLUSIVE 参数(结点的模式)

public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}

在这里插入图片描述

  • 首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。如果允许,则获取它。在AQS源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。之后会进行分析。
  • tryAcquire失败,则调用addWaiter方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue
  • 调用acquireQueued方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。

4.5.1 tryAcquire

调用此方法的线程会试图在独占模式下获取对象状态,将自己设置为独占线程

我们进入tryAcquire方法发现没有逻辑代码,直接抛出异常。这就是典型的模板方法设计模式。意思是所有子类必须实现这个方法,不实现父类就抛出异常。

protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}

以ReentrantLock为例, 其内部类NofairSync 重写了这个方法:

protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}

这个方法其实调用的是ReentrantLock内部类SyncnonfairTryAcquire 方法。

final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();    //需要排队的第二位顾客int c = getState();        // 获取当前窗口的状态state(0空闲,1占用)//如果运气非常的好,窗口恰巧空闲了,就CAS改变状态,把窗口的线程设为自己。if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//如果当前线程 等于 正在办理业务的线程 (说明获得了多次锁,是可重入锁的理论)else if (current == getExclusiveOwnerThread()) {    int nextc = c + acquires;        // nextc为当前状态加 1if (nextc < 0) // overflow(溢出)throw new Error("Maximum lock count exceeded");setState(nextc);                // 设置状态变量 statereturn true;}return false;
}

我们传入的第二位顾客再次发现,有人在办理业务,返回 false。

在acquire方法中 !tryAcquire(arg) 取反为 true ,继续判断下面的方法。

4.5.2 addWaiter

将调用此方法的线程封装成为一个结点并放入Sync queue
addWaiter方法使用快速添加的方式往sync queue尾部添加结点,

 private Node addWaiter(Node mode) {//构造Node结点(当前线程,模式)Node node = new Node(Thread.currentThread(), mode);    // 获取Node的尾结点,如果为null,说明队列没有结点。Node pred = tail;// 当第三个顾客进入的时候,等候区已经有结点了,执行这个代码块。和enq方法相似,尾插法。if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//如果队列没有结点,调用enq方法准备进入队列enq(node);return node;
}

如果sync queue队列还没有初始化,则会使用enq插入队列中,enq方法源码如下

private Node enq(final Node node) {//相当于自旋for (;;) {Node t = tail;        // t 是尾指针//如果尾指针为null,说明队列无结点,进行初始化if (t == null) { /*    第一个结点并不是我们传入的结点,而是系统new了一个结点作为占位符。这个结点Thread=null,waitStatus=0,是傀儡结点又称哨兵结点,用于占位。    */if (compareAndSetHead(new Node()))    tail = head;//队列有结点后,继续循环,进入下面这个代码块(尾插法,结点的尾、前、后结点都设置好)    } else {//传入结点的前一个指针指向尾结点node.prev = t;//尾指针 指向 传入的节点if (compareAndSetTail(t, node)) {t.next = node;    // 尾结点的下一个节点是 传入的节点return t;        // 返回新插入的尾结点}}}
}

enq方法会使用无限循环来确保节点的成功插入。

4.5.3 acquireQueue

Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。

现在,分析acquireQueue方法,传入的参数是 (addWaiter(Node.EXCLUSIVE), arg)。其源码如下(sync队列中的结点在独占且忽略中断的模式下获取(资源))

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;//自旋for (;;) {final Node p = node.predecessor();    //传入结点的上一个结点 // 如果前结点 == 哨兵结点 && 再看窗口能否抢占,失败就false。if (p == head && tryAcquire(arg)) {// 头结点指向当前节点,节点Thread=null,prev=null,即当前节点变成了新的哨兵结点setHead(node);    // 原哨兵结点的next=null,没有连接了,会被GC回收p.next = null; failed = false;return interrupted;}//抢占失败后是否park阻塞if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;/*这时自旋锁,抢占又失败后,继续进入shouldParkAfterFailedAcquire方法,因为第一次循环已经将前结点的waitStatus的值改为-1,所以返回true。    然后进入parkAndCheckInterrupt方法。*//* 锁被释放,其他线程被唤醒后!parkAndCheckInterrupt()返回false,继续自旋!B线程的前结点就是哨兵结点,执行tryAcquire方法,因为A线程走了,所以成功抢占!返回true   */}} finally {if (failed)cancelAcquire(node);}
}

LockSupport类是Java 6 引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:
park(boolean isAbsolute, long time):阻塞当前线程
unpark(Thread jthread):使给定的线程停止阻塞

首先获取当前节点的前驱节点,如果前驱节点是头节点并且当前节点能够获取(资源),代表该当前节点能够占有锁,则将当前节点出队,设置当前节点为独占线程,然后设置头节点为当前节点,即当前节点变成了新的哨兵结点。否则,调用shouldParkAfterFailedAcquireparkAndCheckInterrupt方法,首先,我们看shouldParkAfterFailedAcquire方法,代码如下

// 当获取(资源)失败后,检查并且更新结点状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 获取前驱结点的状态int ws = pred.waitStatus;if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1// 可以进行park操作return true; if (ws > 0) { // 表示状态为CANCELLED,为1do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点// 赋值pred结点的next域pred.next = node; } else { //把前结点的waitStatus值改为 -1,用于后续唤醒操作compareAndSetWaitStatus(pred, ws, Node.SIGNAL); }// 不能进行park操作return false;
}

只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。再看parkAndCheckInterrupt方法,源码如下

// 进行park操作并且返回该线程是否被中断
private final boolean parkAndCheckInterrupt() {// 在许可可用之前禁用当前线程,并且设置了blockerLockSupport.park(this);return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
}

此时这个acquireQueued方法还没有结束,会被卡在parkAndCheckInterrupt方法内部,如果这个线程被unpark了。就会继续执行acquireQueued方法的代码。

现在,再来看acquireQueued方法的整个的逻辑。逻辑如下:

  1. 判断结点的前驱是否为head(哨兵节点)并且本线程是否能成功获取(资源)。
  2. 若步骤1均满足,则设置本线程为独占线程,并将本结点设置为head(哨兵节点),之后会判断是否finally模块,然后返回。
  3. 若步骤1不满足,则判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作。
  4. 若park了当前线程,之后某个线程对本线程unpark后,并且本线程也获得机会运行。那么,将会继续进行步骤①的判断。

所以结点进入等待队列后,是调用park使它进入阻塞状态的。只有头结点的线程是处于活跃状态的。

4.6 类的核心方法 -释放资源- release方法

public final boolean release(int arg) {//释放一把锁后,返回trueif (tryRelease(arg)) {// 头结点就是哨兵结点Node h = head;        // 哨兵的waitStatus为-1,符合条件进入if (h != null && h.waitStatus != 0)unparkSuccessor(h);        //return true;}return false;
}
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}

tryRelease 方法,也是一个模板方法,ReentrantLock类的Sync重写了这个方法。

protected final boolean tryRelease(int releases) {//如果当前State为1,减去1后为0int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;    //c=0,说明可以解锁,free变为truesetExclusiveOwnerThread(null);    //设置当前窗口的占用线程为 null}setState(c);    //把状态改为相应的值return free;
}

unparkSuccessor方法,释放锁

private void unparkSuccessor(Node node) {// 传入的是哨兵结点,waitStatus为-1int ws = node.waitStatus;// 又把哨兵结点的waitStatus改为0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);    // s是哨兵结点的下一个结点。Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 如果哨兵结点的下一个结点存在,且waitStatus为0,释放锁!if (s != null)LockSupport.unpark(s.thread);
}

可以看到unparkSuccessor

// 如果哨兵结点的下一个结点存在,且waitStatus为0,释放锁!if (s != null)LockSupport.unpark(s.thread);

acquireQueuedparkAndCheckInterrupt方法阻塞的线程,给停止阻塞了,于是停止阻塞的线程就又可以去tryacquire

// 进行park操作并且返回该线程是否被中断
private final boolean parkAndCheckInterrupt() {// 在许可可用之前禁用当前线程,并且设置了blockerLockSupport.park(this);return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
}

提一句LockSupport

LockSupport是线程等待唤醒机制(wait/notify)的改良版本。LockSupport中的 park()unpark() 的作用分别是阻塞线程和解除阻塞线程。

3种让线程等待和唤醒的方法(线程通信)

synchronized + wait + notify

使用Object中的wait()方法让线程等待,notify()方法唤醒线程

static Object objectLock = new Object(); // 创建锁public static void main(String[] args) {// 创建A线程,进入后打印,并阻塞。new Thread(() -> {synchronized (objectLock) {System.out.println(Thread.currentThread().getName() + " 进来了!");objectLock.wait();System.out.println(Thread.currentThread().getName() + " 被唤醒!");}}, "A").start();// 创建B线程,用于唤醒new Thread(() -> { synchronized (objectLock) {objectLock.notify();System.out.println(Thread.currentThread().getName() + " 通知!");}}, "B").start();
}

wait、notify的限制:

  • 我们发现 waitnotify 如果不在一个代码块里面,必须与 synchronized 搭配使用,否则会报错。
  • 如果我们先使用notify、再使用wait,因为wait是后执行了,所以不能被唤醒。

JUC包中Condition的await()方法让线程等待,signal()方法唤醒线程

Lock + await + signal

// 创建Lock对象,得到condition
static Lock lock = new ReentrantLock();
static Condition condition = lock.newCondition();public static void main(String[] args) {// 创建A线程,用await方法阻塞new Thread(() -> {lock.lock();try { System.out.println(Thread.currentThread().getName() + " 进来了!");condition.await();} finally {lock.unlock();}System.out.println(Thread.currentThread().getName() + " 被唤醒!");}, "A").start();// 创建B线程,用于唤醒new Thread(() -> {lock.lock();try {System.out.println(Thread.currentThread().getName() + " 通知!");condition.signal();} finally {lock.unlock();}}, "B").start();}

await、signal的限制: 和 wait 、notify 的问题一模一样,他们的底层机制是一样的。

LockSupport类可以阻塞当前线程以及唤醒指定被阻塞的线程

park + unpark:每个线程都有一个 “许可证” ,只有 0 和 1,默认为 0。unpark(Thread t) 方法发放许可证,没许可证就不允许放行。

public static void main(String[] args) {// 创建A线程,用park()方法阻塞Thread a = new Thread(() -> {System.out.println(Thread.currentThread().getName() + " 进来了!");LockSupport.park();System.out.println(Thread.currentThread().getName() + " 被唤醒!");}, "A");a.start();// 创建B线程,用于唤醒Thread b = new Thread(() -> {System.out.println(Thread.currentThread().getName() + " 通知!");// 唤醒指定线程LockSupport.unpark(a);}, "B");b.start();
}

LockSupport的优势:

  • 不用synchronizedLock
  • 先唤醒,再阻塞,也能够被唤醒。因为线程已经有了“许可证”了,所以park()方法相当于没执行

park()底层调用了unsafe类的park本地方法。

UNSAFE.park(false, 0L);

调用一次 unpark 就加 1,变为 1。调用一次 park 会消费许可证,变回 0。重复调用 unpark 不会积累凭证。

相关内容

热门资讯

什么是几九,几九是什么意思 极... 什么是几九目录什么是几九几九是什么意思今天是几九了啊,一九,二九,三九,四九,五九,是什么意思什么是...
东北都有什么特色小零食啊,东北... 东北都有什么特色小零食啊目录东北都有什么特色小零食啊东北特产零食东北特色小吃东北三省各自有什么特色零...
化妆棉的作用有哪些,化妆棉的作... 化妆棉的作用有哪些目录化妆棉的作用有哪些化妆棉的作用化妆棉有什么作用?化妆棉的作用有哪些 1....
六道有哪六道,火影忍者六道能力... 六道有哪六道目录六道有哪六道火影忍者六道能力六道轮回,都有哪六道?六道是指哪六道?六道有哪六道 ...
飞信手机多方通话怎么用(飞信手... 本篇文章极速百科给大家谈谈飞信手机多方通话怎么用,以及飞信手机好友收费吗对应的知识点,希望对各位有所...
会计中的流动资产包括哪些,下列... 会计中的流动资产包括哪些目录会计中的流动资产包括哪些下列会计科目中,属于流动资产的是(  )。会计基...
旺旺碎冰冰是什么梗的简单介绍 ... 本篇文章极速百科给大家谈谈旺旺碎冰冰是什么梗,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站...
怎样给芭比做衣服,给芭比娃娃做... 怎样给芭比做衣服目录怎样给芭比做衣服给芭比娃娃做衣服怎么做不用缝怎样帮芭比娃娃做衣服怎样给芭比做衣服...
如何区分,植物油,矿物油,白油... 本篇文章极速百科给大家谈谈如何区分,植物油,矿物油,白油,液蜡?,以及怎么区别植物油和矿物油对应的知...
武汉空气为什么那么差(为什么武... 本篇文章极速百科给大家谈谈武汉空气为什么那么差,以及为什么武汉空气质量这么差对应的知识点,希望对各位...
美国4大全尺寸皮卡之一,性能不... 本篇文章极速百科给大家谈谈美国4大全尺寸皮卡之一,性能不输福特猛禽,通用塞拉...,以及美国皮卡车品...
skp是啥(SKP是啥格式) ... 本篇文章极速百科给大家谈谈skp是啥,以及SKP是啥格式对应的知识点,希望对各位有所帮助,不要忘了收...
C1的教练资格证要怎么考,c1... C1的教练资格证要怎么考目录C1的教练资格证要怎么考c1教练资格证怎么考取怎么考c1汽车教练资格证?...
经典一词中的典是什么意思,经典... 经典一词中的典是什么意思目录经典一词中的典是什么意思经典的典是什么意思经典的典是什么意思?请问金典和...
什么叫做商,什么是商? 极速百... 什么叫做商目录什么叫做商什么是商?除法算式里的商是什么意思什么是商?什么叫做商 “商”的含义可...
柯南伦敦告白小兰是哪一集,名侦... 柯南伦敦告白小兰是哪一集目录柯南伦敦告白小兰是哪一集名侦探柯南哪集新一在伦敦向 小兰表白?名侦探柯南...
9个复韵母是哪9个韵母,六个单... 9个复韵母是哪9个韵母目录9个复韵母是哪9个韵母六个单韵母,九个复韵母,九个鼻韵母和十六个整体认读音...
如何去除猫尿味,怎样才能去除猫... 如何去除猫尿味目录如何去除猫尿味怎样才能去除猫尿的味道???并且让猫猫不会再乱尿?帮帮忙,如何去除猫...
吉他有多少和弦都是什么,吉他有... 吉他有多少和弦都是什么目录吉他有多少和弦都是什么吉他有多少和弦 都是什么吉他一共有多少和弦啊?听别人...
网上怎么订购好利来蛋糕(如何订... 本篇文章极速百科给大家谈谈网上怎么订购好利来蛋糕,以及如何订购好利来蛋糕对应的知识点,希望对各位有所...
杭州重点高中有哪些,杭州有哪些... 杭州重点高中有哪些目录杭州重点高中有哪些杭州有哪些重点高中杭州哪所高中最好杭州重点高中有哪些 ...
女为悦己者容意思,士为知己者死... 女为悦己者容意思目录女为悦己者容意思士为知己者死,女为悦己者容.是什么意思.女人无需为悦己者容悦人不...
北戴河是海吗 极速百科网 极速... 北戴河是海吗目录北戴河是海吗北戴河是海吗 北戴河是海。北戴河古称渝水,清光绪年间,因沙河流经戴...
外婆是什么样的关系,姥姥与我的... 外婆是什么样的关系目录外婆是什么样的关系姥姥与我的关系叫什么关系姥姥是指外婆还是奶奶??我与外婆是什...
头歌--第1关:Linux文件... 任务描述 假设系统中存在一个文件File,修改该文件的权限,根据实际需求...
【Spring从成神到升仙系列... 👏作者简介:大家好,我是爱敲代码的小黄,独...
梦见蜈蚣是什么意思,做梦梦见蜈... 梦见蜈蚣是什么意思目录梦见蜈蚣是什么意思做梦梦见蜈蚣什么意思梦见蜈蚣是什么意思,哪里有解释啊梦见蜈蚣...
小区车位比一般是多少,车库配比... 小区车位比一般是多少目录小区车位比一般是多少车库配比是什么小区总户数8200,总车位是1450个,配...
车锁上的lock什么意思,汽车... 车锁上的lock什么意思目录车锁上的lock什么意思汽车上lock是什么意思?车子上“lock标志”...
kirin710是什么处理器,... kirin710是什么处理器目录kirin710是什么处理器海思kirin710是高通多少?骁龙71...