队列同步器 AQS
从 AQS 的类名称和修饰上来看,这是⼀个抽象类,所以从设计模式的⻆度来看同步器⼀定是基于【模版模式】来
设计的,使⽤者需要继承同步器,实现⾃定义同步器,并重写指定⽅法,随后将同步器组合在⾃定义的同步组件
中,并调⽤同步器的模版⽅法,⽽这些模版⽅法⼜回调⽤使⽤者重写的⽅法
我不想将上⾯的解释说的这么抽象,其实想理解上⾯这句话,我们只需要知道下⾯两个问题就好了
- 哪些是⾃定义同步器可重写的⽅法?
- 哪些是抽象同步器提供的模版⽅法?
同步器可重写的⽅法
同步器提供的可重写⽅法只有 5 个,这⼤⼤⽅便了锁的使⽤者:
表格⽅法描述中所说的 同步状态 就是上⽂提到的有 volatile 修饰的 state,所以我们在 重写 上⾯⼏个⽅法时,还
要通过同步器提供的下⾯三个⽅法(AQS 提供的)来获取或修改同步状态:
⽽独占式和共享式操作 state 变量的区别也就很简单了
同步器提供的模版⽅法
总结
AQS 实现分析
有阻塞就需要排队,实现排队必然需要队列
- CLH:Craig、Landin and Hagersten 队列,是⼀个单向链表,AQS 中的队列是 CLH 变体的虚拟双向队列
队列中每个排队的个体就是⼀个 Node
Node 节点
AQS 内部维护了⼀个同步队列,⽤于管理同步状态。
- 当线程获取同步状态失败时,就会将当前线程以及等待状态等信息构造成⼀个 Node 节点,将其加⼊到同步队
列中尾部,阻塞该线程 - 当同步状态被释放时,会唤醒同步队列中“⾸节点”的线程获取同步状态
同步队列基本结构
独占式获取同步状态
public void lock() {
// 阻塞式的获取锁,调⽤同步器模版⽅法,获取同步状态
sync.acquire(1);
}
进⼊ AQS 的模版⽅法 acquire()
public final void acquire(int arg) {
// 调⽤⾃定义同步器重写的 tryAcquire ⽅法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
⾸先,也会尝试⾮阻塞的获取同步状态,如果获取失败(tryAcquire 返回 false),则会调⽤ addWaiter ⽅法构造
Node 节点(Node.EXCLUSIVE 独占式)并安全的(CAS)加⼊到同步队列【尾部】
private Node addWaiter(Node mode) {
//构造Node节点,包含当前线程信息以及节点模式【独占/共享】
Node node = new Node(Thread.currentThread(), mode);
// 新建变量 pred 将指针指向tail指向的节点
Node pred = tail;
if (pred != null) {
// 新加⼊的节点前驱节点指向尾节点
node.prev = pred;
//因为多个线程同时获取同步状态失败都会执行这段代码
//通过cas方式获取当前节点为最新的尾节点
if (compareAndSetTail(pred, node)) {
//曾经的尾节点指向当前的节点
pred.next = node;
//返回构建的节点
return node;
}
}
//尾节点为空,说明当前节点是第⼀个被加⼊到同步队列中的节点,执行入队
enq(node);
return node;
}
/**
入队
*/
private Node enq(final Node node) {
//无限循环
for (;;) {
//首先构建临时变量节点
Node t = tail;
//队列为空,虚拟化一个哨兵节点
if (t == null) {
//cas设置一个新节点,指向头部
if (compareAndSetHead(new Node()))
//尾部指针指向尾部节点
tail = head;
//第二次循环,队列中有节点
} else {
//将新节点的前节点指向t
node.prev = t;
//cas将新节点指向队列尾部节点
if (compareAndSetTail(t, node)) {
//前驱节点的后继节点指向当前新节点,完成双向队列
t.next = node;
//返回该t变量,当所有节点入队,结束循环
return t;
}
}
}
}
流程图解释入队
哨兵节点
主要是处理边界问题,这里主要是占队,如果没有边界,指定环节,按照同样算法可能会在边界处发⽣异常
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// "死循环",尝试获取锁,或者挂起
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 只有当前节点的前驱节点是头节点,才会尝试获取锁
// 看到这你应该理解添加哨兵节点的含义了吧
if (p == head && tryAcquire(arg)) {
// 获取同步状态成功,将⾃⼰设置为头
setHead(node);
// 将哨兵节点的后继节点置为空,⽅便GC
p.next = null; // help GC
failed = false;
// 返回中断标识
return interrupted;
}
// 当前节点的前驱节点不是头节点
//【或者】当前节点的前驱节点是头节点但获取同步状态失败
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
获取同步状态成功会返回可以理解了,但是如果失败就会⼀直陷⼊到“死循环”中浪费资源吗?很显然不
是, shouldParkAfterFailedAcquire(p, node) 和 parkAndCheckInterrupt() 就会将线程获取同步状态
失败的线程挂起,我们继续向下看
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
// 如果是 SIGNAL 状态,即等待被占⽤的资源释放,直接返回 true
// 准备继续调⽤ parkAndCheckInterrupt ⽅法
if (ws == Node.SIGNAL)
return true;
// ws ⼤于 0 说明是 CANCELLED 状态,
if (ws > 0) {
// 循环判断前驱节点的前驱节点是否也为 CANCELLED 状态,忽略该状态的节点,重新连接队列
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将当前节点的前驱节点设置为设置为 SIGNAL 状态,⽤于后续唤醒操作
// 程序第⼀次执⾏到这返回为 false,还会进⾏外层第⼆次循环,最终从代码第 7 ⾏返回
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
如果前驱节点的 waitStatus 是 SIGNAL 状态,即 shouldParkAfterFailedAcquire ⽅法会返回 true ,程序会继续向下执⾏ parkAndCheckInterrupt ⽅法,⽤于将当前线程挂起
private final boolean parkAndCheckInterrupt() {
// 线程挂起,程序不会继续向下执⾏
LockSupport.park(this);
// 根据 park ⽅法 API 描述,程序在下述三种情况会继续向下执⾏
// 1. 被 unpark
// 2. 被中断(interrupt)
// 3. 其他不合逻辑的返回才会继续向下执⾏
// 因上述三种情况程序执⾏⾄此,返回当前线程的中断状态,并清空中断状态
// 如果由于被中断,该⽅法会返回 true
return Thread.interrupted();
}
到底什么情况才会执⾏ if(failed) ⾥⾯的代码 ?
if (failed)
cancelAcquire(node);
这段代码被执⾏的条件是 failed 为 true,正常情况下,如果跳出循环,failed 的值为 false,如果不能跳出循环貌似
怎么也不能执⾏到这⾥,所以只有不正常的情况才会执⾏到这⾥,也就是会发⽣异常,才会执⾏到此处
private void cancelAcquire(Node node) {
// 忽略⽆效节点
if (node == null)
return;
// 将关联的线程信息清空
node.thread = null;
// 跳过同样是取消状态的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 跳出上⾯循环后找到前驱有效节点,并获取该有效节点的后继节点
Node predNext = pred.next;
//看到这个注释你可能有些乱了,其核⼼⽬的就是从等待队列中移除 CANCELLED 的节点,并重新拼接整个队列,总
结来看,其实设置 CANCELLED 状态节点只是有三种情况,我们通过画图来分析⼀下:
// 将当前节点的状态置为 CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点处在尾节点,直接从队列中删除⾃⼰就好
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 1. 如果当前节点的有效前驱节点不是头节点,也就是说当前节点不是头节点的后继节点
if (pred != head &&
// 2. 判断当前节点有效前驱节点的状态是否为 SIGNAL
((ws = pred.waitStatus) == Node.SIGNAL ||
// 3. 如果不是,尝试将前驱节点的状态置为 SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// 判断当前节点有效前驱节点的线程信息是否为空
pred.thread != null) {
// 上述条件满⾜
Node next = node.next;
// 将当前节点有效前驱节点的后继节点指针指向当前节点的后继节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点的前驱节点是头节点,或者上述其他条件不满⾜,就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
看到这个注释你可能有些乱了,其核⼼⽬的就是从等待队列中移除 CANCELLED 的节点,并重新拼接整个队列,总
结来看,其实设置 CANCELLED 状态节点只是有三种情况,我们通过画图来分析⼀下: