文章 62
浏览 15135
AQS

AQS

队列同步器 AQS

从 AQS 的类名称和修饰上来看,这是⼀个抽象类,所以从设计模式的⻆度来看同步器⼀定是基于【模版模式】来
设计的,使⽤者需要继承同步器,实现⾃定义同步器,并重写指定⽅法,随后将同步器组合在⾃定义的同步组件
中,并调⽤同步器的模版⽅法,⽽这些模版⽅法⼜回调⽤使⽤者重写的⽅法

我不想将上⾯的解释说的这么抽象,其实想理解上⾯这句话,我们只需要知道下⾯两个问题就好了

  1. 哪些是⾃定义同步器可重写的⽅法?
  2. 哪些是抽象同步器提供的模版⽅法?

同步器可重写的⽅法

同步器提供的可重写⽅法只有 5 个,这⼤⼤⽅便了锁的使⽤者:

image.png

表格⽅法描述中所说的 同步状态 就是上⽂提到的有 volatile 修饰的 state,所以我们在 重写 上⾯⼏个⽅法时,还
要通过同步器提供的下⾯三个⽅法(AQS 提供的)来获取或修改同步状态:

image.png

⽽独占式和共享式操作 state 变量的区别也就很简单了

image.png

同步器提供的模版⽅法

image.png

总结

image.png

AQS 实现分析

有阻塞就需要排队,实现排队必然需要队列

  • CLH:Craig、Landin and Hagersten 队列,是⼀个单向链表,AQS 中的队列是 CLH 变体的虚拟双向队列

队列中每个排队的个体就是⼀个 Node

Node 节点

AQS 内部维护了⼀个同步队列,⽤于管理同步状态。

  • 当线程获取同步状态失败时,就会将当前线程以及等待状态等信息构造成⼀个 Node 节点,将其加⼊到同步队
    列中尾部,阻塞该线程
  • 当同步状态被释放时,会唤醒同步队列中“⾸节点”的线程获取同步状态

image.png

同步队列基本结构

image.png

独占式获取同步状态

public void lock() {
// 阻塞式的获取锁,调⽤同步器模版⽅法,获取同步状态
sync.acquire(1);
}

进⼊ AQS 的模版⽅法 acquire()

public final void acquire(int arg) {
// 调⽤⾃定义同步器重写的 tryAcquire ⽅法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

⾸先,也会尝试⾮阻塞的获取同步状态,如果获取失败(tryAcquire 返回 false),则会调⽤ addWaiter ⽅法构造
Node 节点(Node.EXCLUSIVE 独占式)并安全的(CAS)加⼊到同步队列【尾部】

private Node addWaiter(Node mode) {
    //构造Node节点,包含当前线程信息以及节点模式【独占/共享】
        Node node = new Node(Thread.currentThread(), mode);
         // 新建变量 pred 将指针指向tail指向的节点
           Node pred = tail;
        if (pred != null) {
        // 新加⼊的节点前驱节点指向尾节点
            node.prev = pred;
            //因为多个线程同时获取同步状态失败都会执行这段代码
            //通过cas方式获取当前节点为最新的尾节点
            if (compareAndSetTail(pred, node)) {
               //曾经的尾节点指向当前的节点
                pred.next = node;
                //返回构建的节点
                return node;
            }
        }
        //尾节点为空,说明当前节点是第⼀个被加⼊到同步队列中的节点,执行入队
        enq(node);
        return node;
    }

/**
入队
*/
 private Node enq(final Node node) {
        //无限循环
        for (;;) {
        //首先构建临时变量节点
            Node t = tail;
            //队列为空,虚拟化一个哨兵节点
            if (t == null) { 
            //cas设置一个新节点,指向头部
                if (compareAndSetHead(new Node()))
                //尾部指针指向尾部节点
                    tail = head;
                
                //第二次循环,队列中有节点
            } else {
            //将新节点的前节点指向t
                node.prev = t;
                //cas将新节点指向队列尾部节点
                if (compareAndSetTail(t, node)) {
                //前驱节点的后继节点指向当前新节点,完成双向队列
                    t.next = node;
                    //返回该t变量,当所有节点入队,结束循环
                    return t;
                }
            }
        }
    }

流程图解释入队

image.png

哨兵节点

主要是处理边界问题,这里主要是占队,如果没有边界,指定环节,按照同样算法可能会在边界处发⽣异常

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// "死循环",尝试获取锁,或者挂起
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 只有当前节点的前驱节点是头节点,才会尝试获取锁
// 看到这你应该理解添加哨兵节点的含义了吧
if (p == head && tryAcquire(arg)) {
// 获取同步状态成功,将⾃⼰设置为头
setHead(node);
// 将哨兵节点的后继节点置为空,⽅便GC
p.next = null; // help GC
failed = false;
// 返回中断标识
return interrupted;
 }
// 当前节点的前驱节点不是头节点
//【或者】当前节点的前驱节点是头节点但获取同步状态失败
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
 }
 } finally {
if (failed)
cancelAcquire(node);
 }
 }

获取同步状态成功会返回可以理解了,但是如果失败就会⼀直陷⼊到“死循环”中浪费资源吗?很显然不
是, shouldParkAfterFailedAcquire(p, node) 和 parkAndCheckInterrupt() 就会将线程获取同步状态
失败的线程挂起,我们继续向下看

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点的状态
int ws = pred.waitStatus;
// 如果是 SIGNAL 状态,即等待被占⽤的资源释放,直接返回 true
// 准备继续调⽤ parkAndCheckInterrupt ⽅法
if (ws == Node.SIGNAL)
return true;
// ws ⼤于 0 说明是 CANCELLED 状态,
if (ws > 0) {
// 循环判断前驱节点的前驱节点是否也为 CANCELLED 状态,忽略该状态的节点,重新连接队列
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将当前节点的前驱节点设置为设置为 SIGNAL 状态,⽤于后续唤醒操作
// 程序第⼀次执⾏到这返回为 false,还会进⾏外层第⼆次循环,最终从代码第 7 ⾏返回
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
如果前驱节点的 waitStatus 是 SIGNAL 状态,即 shouldParkAfterFailedAcquire ⽅法会返回 true ,程序会继续向下执⾏ parkAndCheckInterrupt ⽅法,⽤于将当前线程挂起

private final boolean parkAndCheckInterrupt() {
// 线程挂起,程序不会继续向下执⾏
LockSupport.park(this);
// 根据 park ⽅法 API 描述,程序在下述三种情况会继续向下执⾏
// 1. 被 unpark
// 2. 被中断(interrupt)
// 3. 其他不合逻辑的返回才会继续向下执⾏
// 因上述三种情况程序执⾏⾄此,返回当前线程的中断状态,并清空中断状态
// 如果由于被中断,该⽅法会返回 true
return Thread.interrupted();
}
到底什么情况才会执⾏ if(failed) ⾥⾯的代码 ?

if (failed)
cancelAcquire(node);
这段代码被执⾏的条件是 failed 为 true,正常情况下,如果跳出循环,failed 的值为 false,如果不能跳出循环貌似
怎么也不能执⾏到这⾥,所以只有不正常的情况才会执⾏到这⾥,也就是会发⽣异常,才会执⾏到此处

private void cancelAcquire(Node node) {
// 忽略⽆效节点
if (node == null)
return;
// 将关联的线程信息清空
node.thread = null;
// 跳过同样是取消状态的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 跳出上⾯循环后找到前驱有效节点,并获取该有效节点的后继节点
Node predNext = pred.next;
//看到这个注释你可能有些乱了,其核⼼⽬的就是从等待队列中移除 CANCELLED 的节点,并重新拼接整个队列,总
结来看,其实设置 CANCELLED 状态节点只是有三种情况,我们通过画图来分析⼀下:
// 将当前节点的状态置为 CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点处在尾节点,直接从队列中删除⾃⼰就好
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 1. 如果当前节点的有效前驱节点不是头节点,也就是说当前节点不是头节点的后继节点
if (pred != head &&
// 2. 判断当前节点有效前驱节点的状态是否为 SIGNAL
((ws = pred.waitStatus) == Node.SIGNAL ||
// 3. 如果不是,尝试将前驱节点的状态置为 SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// 判断当前节点有效前驱节点的线程信息是否为空
pred.thread != null) {
// 上述条件满⾜
Node next = node.next;
// 将当前节点有效前驱节点的后继节点指针指向当前节点的后继节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点的前驱节点是头节点,或者上述其他条件不满⾜,就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
看到这个注释你可能有些乱了,其核⼼⽬的就是从等待队列中移除 CANCELLED 的节点,并重新拼接整个队列,总
结来看,其实设置 CANCELLED 状态节点只是有三种情况,我们通过画图来分析⼀下:

image.png


标题:AQS
作者:xiaohugg
地址:https://xiaohugg.top/articles/2023/05/05/1683258517810.html

人民有信仰 民族有希望 国家有力量