【原创】Java并发编程系列14 | AQS源码分析
2021-03-15 04:29
标签:重入 区别 port timeout 面试题 finally 唤醒 编程之美 取消 属性 方法 内部类 AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作。 同步队列由双向链表实现,AQS持有头尾指针(head/tail属性)来管理同步队列。 入队操作 以独占锁为例详细讲解获取锁及排队等待的过程。直接在代码中加了详细的注释讲解,耐心看一定可以看懂。 线程1来获取锁,此时没有竞争,直接获取到锁。AQS队列为空。 AQS结构:锁状态state、当前只有锁的线程exclusiveOwnerThread以及双向链表实现的同步队列。 《Java并发编程之美》 【原创】01|开篇获奖感言 点在看好不好,喵~ 【原创】Java并发编程系列14 | AQS源码分析 标签:重入 区别 port timeout 面试题 finally 唤醒 编程之美 取消 原文地址:https://blog.51cto.com/15009303/2552803收录于话题
#进阶架构师 | 并发编程专题
12个
本文为何适原创并发编程系列第 14 篇,文末有本系列文章汇总。
AbstractQueuedSynchronizer是Java并发包java.util.concurrent的核心基础组件,是实现Lock的基础。
AQS实现了对同步状态的管理,以及对阻塞线程进行排队、等待通知等,本文将从源码角度深入理解AQS的实现原理。
建议:本文涉及大量源码,在源码中加了很多详细的注释,用电脑阅读会更方便。1. AQS类结构
// 属性
private transient volatile Node head;// 同步队列头节点
private transient volatile Node tail;// 同步队列尾节点
private volatile int state;// 当前锁的状态:0代表没有被占用,大于0代表锁已被线程占用(锁可以重入,每次重入都+1)
private transient Thread exclusiveOwnerThread; // 继承自AbstractOwnableSynchronizer 持有当前锁的线程
// 锁状态
getState()// 返回同步状态的当前值;
setState(int newState)// 设置当前同步状态;
compareAndSetState(int expect, int update)// 使用CAS设置当前状态,保证状态设置的原子性;
// 独占锁
acquire(int arg)// 独占式获取同步状态,如果获取失败则插入同步队列进行等待;
acquireInterruptibly(int arg)// 与acquire(int arg)相同,但是该方法响应中断;
tryAcquireNanos(int arg,long nanos)// 在acquireInterruptibly基础上增加了超时等待功能,在超时时间内没有获得同步状态返回false;
release(int arg)// 独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中头节点的下一个节点包含的线程唤醒;
// 共享锁
acquireShared(int arg)// 共享式获取同步状态,与独占式的区别在于同一时刻有多个线程获取同步状态;
acquireSharedInterruptibly(int arg)// 在acquireShared方法基础上增加了能响应中断的功能;
tryAcquireSharedNanos(int arg, long nanosTimeout)// 在acquireSharedInterruptibly基础上增加了超时等待的功能;
releaseShared(int arg)// 共享式释放同步状态;
// AQS使用模板方法设计模式
// 模板方法,需要子类实现获取锁/释放锁的方法
tryAcquire(int arg)// 独占式获取同步状态;
tryRelease(int arg)// 独占式释放同步状态;
tryAcquireShared(int arg)// 共享式获取同步状态;
tryReleaseShared(int arg)// 共享式释放同步状态;
// 同步队列的节点类
static final class Node {}
2. 同步队列
如果当前线程获取锁失败时,AQS会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会park当前线程;当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
队列结构
节点的数据结构,即AQS的静态内部类Node,包括节点对应的线程、节点的等待状态等信息。
节点类:static final class Node {
volatile Node prev;// 当前节点/线程的前驱节点
volatile Node next;// 当前节点/线程的后继节点
volatile Thread thread;// 每一个节点对应一个线程
volatile int waitStatus;// 节点状态
static final int CANCELLED = 1;// 节点状态:此线程取消了争抢这个锁
static final int SIGNAL = -1;// 节点状态:当前node的后继节点对应的线程需要被唤醒(表示后继节点的状态)
static final int CONDITION = -2;// 节点状态:当前节点进入等待队列中
static final int PROPAGATE = -3;// 节点状态:表示下一次共享式同步状态获取将会无条件传播下去
Node nextWaiter;// 共享模式/独占模式
static final Node SHARED = new Node();// 共享模式
static final Node EXCLUSIVE = null;// 独占模式
}
/**
* 1.线程抢锁失败后,封装成node加入队列
* 2.队列有tail,可直接入队。
* 2.1入队时,通过CAS将node置为tail。CAS操作失败,说明被其它线程抢先入队了,node需要通过enq()方法入队。
* 3.队列没有tail,说明队列是空的,node通过enq()方法入队,enq()会初始化head和tail。
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);// 线程抢锁失败后,封装成node加入队列
Node pred = tail;
if (pred != null) {// 如果有tail,node加入队尾
node.prev = pred;
if (compareAndSetTail(pred, node)) {// 通过CAS将node置为tail。CAS操作失败,说明被其它线程抢先入队了,node需要通过enq()方法入队。
pred.next = node;
return node;
}
}
enq(node);// 如果没有tail,node通过enq()方法入队。
return node;
}
/**
* 1.通过自旋的方式将node入队,只有node入队成功才返回,否则一直循环。
* 2.如果队列为空,初始化head/tail,初始化之后再次循环到else分支,将node入队。
* 3.node入队时,通过CAS将node置为tail。CAS操作失败,说明被其它线程抢先入队了,自旋,直到成功。
*/
private Node enq(final Node node) {
for (;;) {// 自旋:循环入列,直到成功
Node t = tail;
if (t == null) {
// 初始化head/tail,初始化之后再次循环到else分支,将node入队
if (compareAndSetHead(new Node()))
tail = head;
} else {
// node入队
node.prev = t;
if (compareAndSetTail(t, node)) {// 通过CAS将node置为tail。操作失败,说明被其它线程抢先入队了,自旋,直到成功。
t.next = node;
return t;
}
}
}
}
3. 获取锁
/**
* 1.当前线程通过tryAcquire()方法抢锁。
* 2.线程抢到锁,tryAcquire()返回true,结束。
* 3.线程没有抢到锁,addWaiter()方法将当前线程封装成node加入同步队列,并将node交由acquireQueued()处理。
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 子类的抢锁操作,下文有解释
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 子类抢锁失败进入队列中,重点方法,下文详细讲解
selfInterrupt();
}
/**
* 需要子类实现的抢锁的方法
* 目前可以理解为通过CAS修改state的值,成功即为抢到锁,返回true;否则返回false。
* 之后重入锁ReentrantLock、读写锁ReentrantReadWriteLock中会详细讲解。
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 上文介绍过的入队操作,线程抢锁失败,将当前线程封装成node加入同步队列,并返回node
* Node.EXCLUSIVE-表示独占锁,先不用关注
*/
addWaiter(Node.EXCLUSIVE)
/**
* 重点方法!!
* 1.只有head的后继节点能去抢锁,一旦抢到锁旧head节点从队列中删除,next被置为新head节点。
* 2.如果node线程没有获取到锁,将node线程挂起。
* 3.锁释放时head节点的后继节点唤醒,唤醒之后继续for循环抢锁。
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {// 注意这里是循环
/*
* 1.node的前置节点是head时,可以调用tryAcquire()尝试去获取锁,获取锁成功则将node置为head
* 注意:只有head的后继节点能去抢锁,一旦抢到锁旧head节点从队列中删除,next被置为新head节点
* 2.node线程没有获取到锁,继续执行下面另一个if的代码
* 此时有两种情况:1)node不是head的后继节点,没有资格抢锁;2)node是head的后继节点但抢锁没成功
*/
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
* shouldParkAfterFailedAcquire(p, node):通过前置节点pred的状态waitStatus 来判断是否可以将node节点线程挂起
* parkAndCheckInterrupt():将当前线程挂起
* 1.如果node前置节点p.waitStatus==Node.SIGNAL(-1),直接将当前线程挂起,等待唤醒。
* 锁释放时会将head节点的后继节点唤醒,唤醒之后继续for循环抢锁。
* 2.如果node前置节点p.waitStatus0,
* 1)shouldParkAfterFailedAcquire(p, node)为node找一个waitStatus 0) {
/*
* waitStatus>0 ,表示节点取消了排队
* 这里检测一下,将不需要排队的线程从队列中删除(因为同步队列中保存的是等锁的线程)
* 为node找一个waitStatus 0);
pred.next = node;
} else {
// 此时pred.waitStatus
AQS获取锁4. 释放锁
/**
* 释放锁之后,唤醒head的后继节点next。
* 回顾上文讲的acquireQueued()方法,next节点会进入for循环的下一次循环去抢锁
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {// 子类实现的释放锁的方法,下文有讲解
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);// 唤醒node节点(也就是head)的后继节点,下文有讲解
return true;
}
return false;
}
/**
* 需要子类实现的释放锁的方法,对应于tryAcquire()
* 目前可以理解为将state的值置为0。
* 之后重入锁ReentrantLock、读写锁ReentrantReadWriteLock中会详细讲解。
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* 唤醒node节点(也就是head)的后继节点
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus
5. 回顾整个过程
线程2来获取锁,因为线程1占用锁,线程2需要做两件事:
1)线程2构造成Node到AQS的同步队列中排队。此时初始化同步队列。
2)线程2阻塞,等待被唤醒之后再去抢锁。
线程3来获取锁,锁被占用,同样做两件事:排队并阻塞。此时的同步队列结构:
线程1执行完同步代码之后释放锁,唤醒head的后继节点(线程2),线程2获取锁,并把线程2对应的Node置为head。
线程2执行完同步代码之后释放锁,唤醒head的后继节点(线程3),线程3获取锁,并把线程3对应的Node置为head。
线程3执行完同步代码之后释放锁,同步队列中head之后没有节点了,将head置为null即可。总结
AQS使用模板方法设计模式,子类必须重写AQS获取锁tryAcquire()和释放锁tryRelease()的方法,一般是对state和exclusiveOwnerThread的操作。
获取锁acquire()过程:
子类调用tryAcquire()尝试获取锁,如果获取锁成功,完成。
如果获取锁失败,当前线程会封装成Node节点插入同步队列中,并且将当前线程park()阻塞,等待被唤醒之后再抢锁。
释放锁release()过程:当前线程调用子类的tryRelease()方法释放锁,释放锁成功后,会unpark(thread)唤醒head的后继节点,让其再去抢锁。
参考资料
《Java并发编程实战》
《Java并发编程的艺术》并发系列文章汇总
【原创】02|并发编程三大核心问题
【原创】03|重排序-可见性和有序性问题根源
【原创】04|Java 内存模型详解
【原创】05|深入理解 volatile
【原创】06|你不知道的 final
【原创】07|synchronized 原理
【原创】08|synchronized 锁优化
【原创】09|基础干货
【原创】10|线程状态
【原创】11|线程调度
【原创】12|揭秘CAS
【原创】13|LookSupport
———— e n d ————
金三银四,师长为大家准备了三份面试宝典:
《java面试宝典5.0》
《350道Java面试题:整理自100+公司》
《资深java面试宝典-视频版》
分别适用于初中级,中高级,以及资深级工程师的面试复习。
内容包含java基础、javaweb、各个性能优化、JVM、锁、高并发、反射、Spring原理、微服务、Zookeeper、数据库、数据结构、限流熔断降级等等。
获取方式:点“在看”,V信关注师长的小号:编程最前线并回复 面试 领取,更多精彩陆续奉上。