独占
(只有一个线程能访问执行,又根据是否按队列的顺序分为公平锁和非公平锁,如ReentrantLock
) 和共享
(多个线程可同时访问执行,如Semaphore
、CountDownLatch
、 CyclicBarrier
)。ReentrantReadWriteLock
可以看成是组合式,允许多个线程同时对某一资源进行读。AQS是一个构建锁和同步器的框架(模板)。使用AQS可以简单构建出应用广泛的同步器如:ReentrantLock
,Semaphore
,其他的诸如ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
等等皆是基于AQS的。我们也可以使用AQS构建符合自己需求的同步器
核心思想: 如果被请求的共享资源空闲,则将请求线程设为有效状态,并将共享资源设为锁定状态。如果共享资源被占有,则需要一套线程阻塞等待以及被唤醒时锁分配的机制。这个机制AQS是使用CLH队列锁实现的,即将暂时获取不到锁的线程放入队列。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS来对该同步状态进行原子性操作
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
状态信息通过procted类型的getState,setState,compareAndSetState进行操作
//返回同步状态的当前值
protected final int getState() { return state;
}// 设置同步状态的值
protected final void setState(int newState) { state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS对资源的共享方式:
不同模式(排他,共享)下的线程共享同一个FIFO队列,通常来说一个实现类只需支持一个模式,但是如ReadWriteLock
就同时支持了两种
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
使用者继承AbstractQueuedSynchronizer
并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state
的获取和释放) 将AQS
组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
自定义同步器时需要重写下面几个AQS提供的模板方法:
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
虽然AQS是基于FIFO队列的,但并不是强制使用FIFO的策略。独占同步器的核心形式如下:(共享模式类似,但可能涉及级联信号。)
Acquire:while (!tryAcquire(arg)) {enqueue thread if it is not already queued;possibly block current thread;}Release:if (tryRelease(arg))unblock the first queued thread;
AbstractQueuedSynchronizer
类底层的数据结构是使用CLH
(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS
是把每条请求共享资源的线程封装成一个CLH
锁队列的一个Node
来实现锁分配。
其中Sync queue
(同步队列),包含一个head node 和一个tail node。head结点主要用作后续的调度。而Condition queue
不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。
AQS非公平锁就是,一来就先去插队,如果插队失败,才去乖乖的排队。
static final class Node {// 模式,分为共享与独占// 共享模式static final Node SHARED = new Node();// 独占模式static final Node EXCLUSIVE = null; // 结点状态// CANCELLED,值为1,表示当前的线程被取消// SIGNAL,值为-1,表示当前节点被阻塞,后继节点包含的线程需要运行,也就是unpark// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行// 值为0,表示当前节点在sync队列中,等待着获取锁static final int CANCELLED = 1;static final int SIGNAL = -1;static final int CONDITION = -2;static final int PROPAGATE = -3; // 结点状态volatile int waitStatus; // 前驱结点volatile Node prev; // 后继结点volatile Node next; // 结点所对应的线程volatile Thread thread; // 下一个等待者Node nextWaiter;// 结点是否在共享模式下等待final boolean isShared() {return nextWaiter == SHARED;}// 获取前驱结点,若前驱结点为空,抛出异常final Node predecessor() throws NullPointerException {// 保存前驱结点Node p = prev; if (p == null) // 前驱结点为空,抛出异常throw new NullPointerException();else // 前驱结点不为空,返回return p;}// 无参构造方法Node() { // Used to establish initial head or SHARED marker}// 构造方法Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}// 构造方法Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}
每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。
方法:
private Node addConditionWaiter(): 添加新的waiter到condition队列中
private void doSignal(Node first): 移除并转移首个节点到syn同步队列,直到遇到未取消的节点或空节点。
private void doSignalAll(Node first) : 移除并转移所有节点到syn同步队列
private void unlinkCancelledWaiters() : 从condition队列中清除状态为CANCEL的结点
public final void signal(): 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
public final void signalAll(): 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
public final void awaitUninterruptibly(): 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
public final void await(): 等待,当前线程在接到信号或被中断之前一直处于等待状态
public final long awaitNanos(long nanosTimeout): 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
public final boolean awaitUntil(Date deadline): 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
protected final boolean hasWaiters() 查询是否有正在等待此条件的任何线程
protected final Collection getWaitingThreads() 返回包含那些可能正在等待此条件的线程集合
// 内部类
public class ConditionObject implements Condition, java.io.Serializable {// 版本号private static final long serialVersionUID = 1173984872572414699L;/** First node of condition queue. */// condition队列的头节点private transient Node firstWaiter;/** Last node of condition queue. */// condition队列的尾结点private transient Node lastWaiter;/*** Creates a new {@code ConditionObject} instance.*/// 构造方法public ConditionObject() { }// Internal methods/*** Adds a new waiter to wait queue.* @return its new wait node*/// 添加新的waiter到wait队列private Node addConditionWaiter() {// 保存尾结点Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) { // 尾结点不为空,并且尾结点的状态不为CONDITION// 清除状态为CONDITION的结点unlinkCancelledWaiters(); // 将最后一个结点重新赋值给tt = lastWaiter;}// 新建一个结点Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null) // 尾结点为空// 设置condition队列的头节点firstWaiter = node;else // 尾结点不为空// 设置为节点的nextWaiter域为node结点t.nextWaiter = node;// 更新condition队列的尾结点lastWaiter = node;return node;}/*** Removes and transfers nodes until hit non-cancelled one or* null. Split out from signal in part to encourage compilers* to inline the case of no waiters.* @param first (non-null) the first node on condition queue*/private void doSignal(Node first) {// 循环do {if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空// 设置尾结点为空lastWaiter = null;// 设置first结点的nextWaiter域first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头节点不为空,一直循环}/*** Removes and transfers all nodes.* @param first (non-null) the first node on condition queue*/private void doSignalAll(Node first) {// condition队列的头节点尾结点都设置为空lastWaiter = firstWaiter = null;// 循环do {// 获取first结点的nextWaiter域结点Node next = first.nextWaiter;// 设置first结点的nextWaiter域为空first.nextWaiter = null;// 将first结点从condition队列转移到sync队列transferForSignal(first);// 重新设置firstfirst = next;} while (first != null);}/*** Unlinks cancelled waiter nodes from condition queue.* Called only while holding lock. This is called when* cancellation occurred during condition wait, and upon* insertion of a new waiter when lastWaiter is seen to have* been cancelled. This method is needed to avoid garbage* retention in the absence of signals. So even though it may* require a full traversal, it comes into play only when* timeouts or cancellations occur in the absence of* signals. It traverses all nodes rather than stopping at a* particular target to unlink all pointers to garbage nodes* without requiring many re-traversals during cancellation* storms.*/// 从condition队列中清除状态为CANCEL的结点private void unlinkCancelledWaiters() {// 保存condition队列头节点Node t = firstWaiter;Node trail = null;while (t != null) { // t不为空// 下一个结点Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态// 设置t节点的nextWaiter域为空t.nextWaiter = null;if (trail == null) // trail为空// 重新设置condition队列的头节点firstWaiter = next;else // trail不为空// 设置trail结点的nextWaiter域为next结点trail.nextWaiter = next;if (next == null) // next结点为空// 设置condition队列的尾结点lastWaiter = trail;}else // t结点的状态为CONDTION状态// 设置trail结点trail = t;// 设置t结点t = next;}}// public methods// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。public final void signal() {if (!isHeldExclusively()) // 不被当前线程独占,抛出异常throw new IllegalMonitorStateException();// 保存condition队列头节点Node first = firstWaiter;if (first != null) // 头节点不为空// 唤醒一个等待线程doSignal(first);}// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。public final void signalAll() {if (!isHeldExclusively()) // 不被当前线程独占,抛出异常throw new IllegalMonitorStateException();// 保存condition队列头节点Node first = firstWaiter;if (first != null) // 头节点不为空// 唤醒所有等待线程doSignalAll(first);}// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断public final void awaitUninterruptibly() {}/** Mode meaning to reinterrupt on exit from wait */private static final int REINTERRUPT = 1;/** Mode meaning to throw InterruptedException on exit from wait */private static final int THROW_IE = -1;/*** Checks for interrupt, returning THROW_IE if interrupted* before signalled, REINTERRUPT if after signalled, or* 0 if not interrupted.*/private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0; }/*** Throws InterruptedException, reinterrupts current thread, or* does nothing, depending on mode.*/private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();}// // 等待,当前线程在接到信号或被中断之前一直处于等待状态public final void await() throws InterruptedException {// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 public final long awaitNanos(long nanosTimeout)throws InterruptedException {}// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态public final boolean awaitUntil(Date deadline)throws InterruptedException {}// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0public final boolean await(long time, TimeUnit unit)throws InterruptedException {}final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {return sync == AbstractQueuedSynchronizer.this;}// 查询是否有正在等待此条件的任何线程protected final boolean hasWaiters() {}// 返回正在等待此条件的线程数估计值protected final int getWaitQueueLength() {}// 返回包含那些可能正在等待此条件的线程集合protected final Collection getWaitingThreads() {}
}
此类实现了Condition
接口,Condition
接口定义了条件操作规范,具体如下
public interface Condition {// 等待,当前线程在接到信号或被中断之前一直处于等待状态void await() throws InterruptedException;// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断void awaitUninterruptibly();//等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 long awaitNanos(long nanosTimeout) throws InterruptedException;// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0boolean await(long time, TimeUnit unit) throws InterruptedException;// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态boolean awaitUntil(Date deadline) throws InterruptedException;// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。void signal();// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。void signalAll();
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizerimplements java.io.Serializable { // 版本号private static final long serialVersionUID = 7373984972572414691L; // 头节点private transient volatile Node head; // 尾结点private transient volatile Node tail; // 状态private volatile int state; // 自旋时间static final long spinForTimeoutThreshold = 1000L;// Unsafe类实例private static final Unsafe unsafe = Unsafe.getUnsafe();// state内存偏移地址private static final long stateOffset;// head内存偏移地址private static final long headOffset;// state内存偏移地址private static final long tailOffset;// tail内存偏移地址private static final long waitStatusOffset;// next内存偏移地址private static final long nextOffset;// 静态初始化块static {try {stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }}
}
该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下:
acquire
传入的是 Node.EXCLUSIVE
参数(结点的模式)
public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
tryAcquire
方法,调用此方法的线程会试图在独占模式下获取对象状态。如果允许,则获取它。在AQS源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。之后会进行分析。tryAcquire
失败,则调用addWaiter
方法,addWaiter
方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue
。acquireQueued
方法,此方法完成的功能是Sync queue
中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。调用此方法的线程会试图在独占模式下获取对象状态,将自己设置为独占线程
我们进入tryAcquire
方法发现没有逻辑代码,直接抛出异常。这就是典型的模板方法设计模式。意思是所有子类必须实现这个方法,不实现父类就抛出异常。
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
以ReentrantLock为例, 其内部类NofairSync 重写了这个方法:
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
这个方法其实调用的是ReentrantLock
内部类Sync
的 nonfairTryAcquire
方法。
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread(); //需要排队的第二位顾客int c = getState(); // 获取当前窗口的状态state(0空闲,1占用)//如果运气非常的好,窗口恰巧空闲了,就CAS改变状态,把窗口的线程设为自己。if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//如果当前线程 等于 正在办理业务的线程 (说明获得了多次锁,是可重入锁的理论)else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // nextc为当前状态加 1if (nextc < 0) // overflow(溢出)throw new Error("Maximum lock count exceeded");setState(nextc); // 设置状态变量 statereturn true;}return false;
}
我们传入的第二位顾客再次发现,有人在办理业务,返回 false。
在acquire方法中 !tryAcquire(arg) 取反为 true ,继续判断下面的方法。
将调用此方法的线程封装成为一个结点并放入Sync queue
。
addWaiter
方法使用快速添加的方式往sync queue
尾部添加结点,
private Node addWaiter(Node mode) {//构造Node结点(当前线程,模式)Node node = new Node(Thread.currentThread(), mode); // 获取Node的尾结点,如果为null,说明队列没有结点。Node pred = tail;// 当第三个顾客进入的时候,等候区已经有结点了,执行这个代码块。和enq方法相似,尾插法。if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//如果队列没有结点,调用enq方法准备进入队列enq(node);return node;
}
如果sync queue
队列还没有初始化,则会使用enq
插入队列中,enq
方法源码如下
private Node enq(final Node node) {//相当于自旋for (;;) {Node t = tail; // t 是尾指针//如果尾指针为null,说明队列无结点,进行初始化if (t == null) { /* 第一个结点并不是我们传入的结点,而是系统new了一个结点作为占位符。这个结点Thread=null,waitStatus=0,是傀儡结点又称哨兵结点,用于占位。 */if (compareAndSetHead(new Node())) tail = head;//队列有结点后,继续循环,进入下面这个代码块(尾插法,结点的尾、前、后结点都设置好) } else {//传入结点的前一个指针指向尾结点node.prev = t;//尾指针 指向 传入的节点if (compareAndSetTail(t, node)) {t.next = node; // 尾结点的下一个节点是 传入的节点return t; // 返回新插入的尾结点}}}
}
enq
方法会使用无限循环来确保节点的成功插入。
Sync queue
中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。
现在,分析acquireQueue
方法,传入的参数是 (addWaiter(Node.EXCLUSIVE), arg)
。其源码如下(sync队列中的结点在独占且忽略中断的模式下获取(资源))
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;//自旋for (;;) {final Node p = node.predecessor(); //传入结点的上一个结点 // 如果前结点 == 哨兵结点 && 再看窗口能否抢占,失败就false。if (p == head && tryAcquire(arg)) {// 头结点指向当前节点,节点Thread=null,prev=null,即当前节点变成了新的哨兵结点setHead(node); // 原哨兵结点的next=null,没有连接了,会被GC回收p.next = null; failed = false;return interrupted;}//抢占失败后是否park阻塞if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;/*这时自旋锁,抢占又失败后,继续进入shouldParkAfterFailedAcquire方法,因为第一次循环已经将前结点的waitStatus的值改为-1,所以返回true。 然后进入parkAndCheckInterrupt方法。*//* 锁被释放,其他线程被唤醒后!parkAndCheckInterrupt()返回false,继续自旋!B线程的前结点就是哨兵结点,执行tryAcquire方法,因为A线程走了,所以成功抢占!返回true */}} finally {if (failed)cancelAcquire(node);}
}
LockSupport类是Java 6 引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:
park(boolean isAbsolute, long time):阻塞当前线程
unpark(Thread jthread):使给定的线程停止阻塞
首先获取当前节点的前驱节点,如果前驱节点是头节点并且当前节点能够获取(资源),代表该当前节点能够占有锁,则将当前节点出队,设置当前节点为独占线程,然后设置头节点为当前节点,即当前节点变成了新的哨兵结点。否则,调用shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
方法,首先,我们看shouldParkAfterFailedAcquire
方法,代码如下
// 当获取(资源)失败后,检查并且更新结点状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 获取前驱结点的状态int ws = pred.waitStatus;if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1// 可以进行park操作return true; if (ws > 0) { // 表示状态为CANCELLED,为1do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点// 赋值pred结点的next域pred.next = node; } else { //把前结点的waitStatus值改为 -1,用于后续唤醒操作compareAndSetWaitStatus(pred, ws, Node.SIGNAL); }// 不能进行park操作return false;
}
只有当该节点的前驱结点的状态为SIGNAL
时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。再看parkAndCheckInterrupt
方法,源码如下
// 进行park操作并且返回该线程是否被中断
private final boolean parkAndCheckInterrupt() {// 在许可可用之前禁用当前线程,并且设置了blockerLockSupport.park(this);return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
}
此时这个acquireQueued
方法还没有结束,会被卡在parkAndCheckInterrupt
方法内部,如果这个线程被unpark
了。就会继续执行acquireQueued
方法的代码。
现在,再来看acquireQueued
方法的整个的逻辑。逻辑如下:
所以结点进入等待队列后,是调用park
使它进入阻塞状态的。只有头结点的线程是处于活跃状态的。
public final boolean release(int arg) {//释放一把锁后,返回trueif (tryRelease(arg)) {// 头结点就是哨兵结点Node h = head; // 哨兵的waitStatus为-1,符合条件进入if (h != null && h.waitStatus != 0)unparkSuccessor(h); //return true;}return false;
}
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}
tryRelease
方法,也是一个模板方法,ReentrantLock
类的Sync
重写了这个方法。
protected final boolean tryRelease(int releases) {//如果当前State为1,减去1后为0int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true; //c=0,说明可以解锁,free变为truesetExclusiveOwnerThread(null); //设置当前窗口的占用线程为 null}setState(c); //把状态改为相应的值return free;
}
unparkSuccessor
方法,释放锁!
private void unparkSuccessor(Node node) {// 传入的是哨兵结点,waitStatus为-1int ws = node.waitStatus;// 又把哨兵结点的waitStatus改为0if (ws < 0)compareAndSetWaitStatus(node, ws, 0); // s是哨兵结点的下一个结点。Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 如果哨兵结点的下一个结点存在,且waitStatus为0,释放锁!if (s != null)LockSupport.unpark(s.thread);
}
可以看到unparkSuccessor
中
// 如果哨兵结点的下一个结点存在,且waitStatus为0,释放锁!if (s != null)LockSupport.unpark(s.thread);
将acquireQueued
中parkAndCheckInterrupt
方法阻塞的线程,给停止阻塞了,于是停止阻塞的线程就又可以去tryacquire
了
// 进行park操作并且返回该线程是否被中断
private final boolean parkAndCheckInterrupt() {// 在许可可用之前禁用当前线程,并且设置了blockerLockSupport.park(this);return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
}
LockSupport
是线程等待唤醒机制(wait/notify
)的改良版本。LockSupport
中的 park()
和 unpark()
的作用分别是阻塞线程和解除阻塞线程。
使用Object
中的wait()
方法让线程等待,notify()
方法唤醒线程
static Object objectLock = new Object(); // 创建锁public static void main(String[] args) {// 创建A线程,进入后打印,并阻塞。new Thread(() -> {synchronized (objectLock) {System.out.println(Thread.currentThread().getName() + " 进来了!");objectLock.wait();System.out.println(Thread.currentThread().getName() + " 被唤醒!");}}, "A").start();// 创建B线程,用于唤醒new Thread(() -> { synchronized (objectLock) {objectLock.notify();System.out.println(Thread.currentThread().getName() + " 通知!");}}, "B").start();
}
wait、notify的限制:
wait
和 notify
如果不在一个代码块里面,必须与 synchronized
搭配使用,否则会报错。notify
、再使用wait
,因为wait
是后执行了,所以不能被唤醒。Lock + await + signal
// 创建Lock对象,得到condition
static Lock lock = new ReentrantLock();
static Condition condition = lock.newCondition();public static void main(String[] args) {// 创建A线程,用await方法阻塞new Thread(() -> {lock.lock();try { System.out.println(Thread.currentThread().getName() + " 进来了!");condition.await();} finally {lock.unlock();}System.out.println(Thread.currentThread().getName() + " 被唤醒!");}, "A").start();// 创建B线程,用于唤醒new Thread(() -> {lock.lock();try {System.out.println(Thread.currentThread().getName() + " 通知!");condition.signal();} finally {lock.unlock();}}, "B").start();}
await、signal的限制: 和 wait 、notify 的问题一模一样,他们的底层机制是一样的。
park + unpark:每个线程都有一个 “许可证” ,只有 0 和 1,默认为 0。unpark(Thread t)
方法发放许可证,没许可证就不允许放行。
public static void main(String[] args) {// 创建A线程,用park()方法阻塞Thread a = new Thread(() -> {System.out.println(Thread.currentThread().getName() + " 进来了!");LockSupport.park();System.out.println(Thread.currentThread().getName() + " 被唤醒!");}, "A");a.start();// 创建B线程,用于唤醒Thread b = new Thread(() -> {System.out.println(Thread.currentThread().getName() + " 通知!");// 唤醒指定线程LockSupport.unpark(a);}, "B");b.start();
}
LockSupport的优势:
synchronized
或Lock
。park()
方法相当于没执行。park()
底层调用了unsafe
类的park
本地方法。
UNSAFE.park(false, 0L);
调用一次 unpark
就加 1,变为 1。调用一次 park
会消费许可证,变回 0。重复调用 unpark
不会积累凭证。