一、概述
1.1 什么是AQS
AbstractQueuedSynchronizer(AQS)是Java并发包java.util.concurrent的核心基础框架,由Doug Lea设计并实现。它为实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等)提供了一个框架。
AQS 采用模板方法设计模式,将同步器的核心逻辑抽象为对同步状态的管理和线程排队机制。
核心特点:
- 基于FIFO队列的同步框架
- 使用int类型的state变量表示同步状态
- 提供独占和共享两种模式
- 支持条件队列(Condition)
- 高性能、可扩展的设计
1.2 AQS的重要性
AQS是Java并发编程的基石,以下重要的同步工具都基于AQS实现:
- ReentrantLock:可重入锁
- Semaphore:信号量
- CountDownLatch:倒计时门闩
- CyclicBarrier:循环屏障
- ReentrantReadWriteLock:读写锁
1.3 设计理念
// AQS的核心设计理念
public abstract class AbstractQueuedSynchronizer {
// 同步状态
private volatile int state;
// 等待队列的头节点
private transient volatile Node head;
// 等待队列的尾节点
private transient volatile Node tail;
}
二、核心数据结构
状态管理机制 state
AQS 使用一个 volatile 修饰的 int 类型变量 state 表示同步状态,提供三种原子操作方法:
/**
* 同步状态,使用volatile保证可见性
* 不同的同步器对state有不同的语义:
* - ReentrantLock: 0表示未锁定,>0表示锁定次数
* - Semaphore: 表示可用许可数
* - CountDownLatch: 表示剩余计数
*/
private volatile int state;
// 获取同步状态
protected final int getState() {
return state;
}
// 设置同步状态
protected final void setState(int newState) {
state = newState;
}
// CAS操作更新状态
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
不同同步器对 state 的解释不同:
- ReentrantLock:state=0 表示未锁定,>0 表示重入次数
- Semaphore:state 表示可用许可数量
- CountDownLatch:state 表示计数器值
- ReentrantReadWriteLock:高16位表示读锁持有数,低16位表示写锁重入次数
等待队列节点 Node
AQS 内部维护一个 CLH 变体的 FIFO 双向队列(虚拟队列),用于管理等待线程。队列中的节点是 Node 内部类实例,包含:
- waitStatus:节点状态(CANCELLED、SIGNAL、CONDITION、PROPAGATE、0)
- prev/next:前后节点指针
- thread:关联的线程
- nextWaiter:标记共享/独占模式或条件队列的下个节点
static final class Node {
// 共享模式标记
static final Node SHARED = new Node();
// 独占模式标记
static final Node EXCLUSIVE = null;
// 等待状态常量
static final int CANCELLED = 1; // 节点被取消
static final int SIGNAL = -1; // 后继节点需要被唤醒
static final int CONDITION = -2; // 节点在条件队列中等待
static final int PROPAGATE = -3; // 共享模式下的释放操作需要传播
// 等待状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 等待的线程
volatile Thread thread;
// 下一个等待条件的节点,或者共享模式标记
Node nextWaiter;
// 判断是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
2.3 队列结构图示
等待队列结构:
head tail
↓ ↓
[Node] ←→ [Node] ←→ [Node] ←→ [Node]
↑ ↑
虚拟头节点 最后加入的节点
每个Node包含:
- waitStatus: 等待状态
- prev: 前驱节点
- next: 后继节点
- thread: 等待的线程
- nextWaiter: 模式标记或条件队列下一节点
三、源码解析
独占模式获取锁
acquire方法
/**
* 独占模式获取锁的入口方法
* @param arg 获取锁的参数
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
执行流程:
- 调用
tryAcquire(arg)尝试获取锁,该方法由子类实现 - 如果获取失败,调用
addWaiter(Node.EXCLUSIVE)将当前线程加入等待队列(独占模式) - 调用
acquireQueued()在队列中自旋尝试获取锁 - 如果在等待过程中被中断,调用
selfInterrupt()重新设置中断状态
3.1.2 tryAcquire方法(模板方法)
由子类实现。
/**
* 尝试获取锁的模板方法,由子类实现具体逻辑
* @param arg 获取参数
* @return true表示获取成功,false表示获取失败
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// ReentrantLock中的实现示例
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 锁未被占用,尝试CAS获取
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 重入锁逻辑
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
3.1.3 addWaiter方法
/**
* 将当前线程包装成Node并加入等待队列
* @param mode 模式(独占或共享)
* @return 新创建的节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试在队尾添加
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速添加失败,使用完整的入队逻辑
enq(node);
return node;
}
/**
* 完整的入队逻辑,包含队列初始化
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,需要初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
3.1.4 acquireQueued方法
/**
* 在队列中等待获取锁
* @param node 当前线程对应的节点
* @param arg 获取参数
* @return 是否被中断
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果前驱是头节点,尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 检查是否需要阻塞,并阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.1.5 shouldParkAfterFailedAcquire方法
/**
* 检查获取锁失败后是否需要阻塞
* @param pred 前驱节点
* @param node 当前节点
* @return 是否需要阻塞
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驱节点已经设置了SIGNAL状态,可以安全阻塞
return true;
if (ws > 0) {
// 前驱节点被取消,需要跳过
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前驱节点状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
3.2 独占模式释放锁
3.2.1 release方法
/**
* 独占模式释放锁
* @param arg 释放参数
* @return 是否释放成功
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
3.2.2 tryRelease方法(模板方法)
/**
* 尝试释放锁的模板方法
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// ReentrantLock中的实现
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
3.2.3 unparkSuccessor方法
/**
* 唤醒后继节点
* @param node 当前节点(通常是头节点)
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾部向前查找最前面的有效节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
3.3 共享模式
3.3.1 acquireShared方法
/**
* 共享模式获取锁
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* 共享模式的队列等待逻辑
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.3.2 releaseShared方法
/**
* 共享模式释放锁
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
/**
* 共享模式的释放逻辑
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
3.4 条件队列(Condition)
public class ConditionObject implements Condition, java.io.Serializable {
// 条件队列的第一个节点
private transient Node firstWaiter;
// 条件队列的最后一个节点
private transient Node lastWaiter;
/**
* 等待条件
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* 唤醒等待的线程
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
}
四、关键设计思想与优化
4.1 模板方法模式
AQS使用模板方法模式,定义了同步器的骨架,子类只需实现特定的方法:
// 独占模式需要实现的方法
protected boolean tryAcquire(int arg) // 尝试获取锁
protected boolean tryRelease(int arg) // 尝试释放锁
protected boolean isHeldExclusively() // 是否独占持有
// 共享模式需要实现的方法
protected int tryAcquireShared(int arg) // 尝试共享获取
protected boolean tryReleaseShared(int arg) // 尝试共享释放
4.2 CAS无锁编程
AQS大量使用CAS操作来保证线程安全,避免了重量级锁的开销:
// 状态更新
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// 队列操作
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
4.3 LockSupport的使用
AQS使用LockSupport.park()和LockSupport.unpark()来实现线程的阻塞和唤醒:
/**
* 阻塞当前线程并检查中断
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport的优势:
- 不需要获取对象监视器
- 可以在任意位置阻塞和唤醒线程
- 支持超时等待
- 不会抛出InterruptedException
4.4 FIFO队列的优化
4.4.1 虚拟头节点
// 队列初始化时创建虚拟头节点
if (compareAndSetHead(new Node()))
tail = head;
优势:
- 简化了队列操作逻辑
- 避免了空队列的特殊处理
- 头节点不存储线程信息,只作为标记
4.4.2 从尾部向前遍历
// 在unparkSuccessor中从尾部向前查找
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
原因:
node.next = null的操作不是原子的- 从前向后遍历可能会遗漏节点
- 从后向前遍历保证了遍历的完整性
4.5 内存屏障与可见性
// 使用volatile保证可见性
private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;
// Node中的关键字段也使用volatile
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
五、应用实例
5.1 ReentrantLock的实现
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
// 非公平锁的获取逻辑
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 释放锁的逻辑
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
// 非公平锁实现
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 公平锁实现
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
5.2 Semaphore的实现
public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
// 非公平获取
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 释放许可
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
// 非公平信号量
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// 公平信号量
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
5.3 CountDownLatch的实现
public class CountDownLatch {
private final Sync sync;
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 尝试获取(等待计数为0)
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 倒计时(减少计数)
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 等待计数为0
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 减少计数
public void countDown() {
sync.releaseShared(1);
}
}
5.4 自定义同步器示例
/**
* 自定义的互斥锁,最多允许两个线程同时访问
*/
public class TwoThreadMutex {
private final Sync sync = new Sync();
private static final class Sync extends AbstractQueuedSynchronizer {
// 尝试获取锁
protected boolean tryAcquire(int arg) {
int state = getState();
if (state < 2) {
if (compareAndSetState(state, state + 1)) {
return true;
}
}
return false;
}
// 尝试释放锁
protected boolean tryRelease(int arg) {
int state = getState();
if (state > 0) {
if (compareAndSetState(state, state - 1)) {
return state == 1; // 当状态变为0时返回true
}
}
return false;
}
}
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
}
六、常见面试问题与总结
6.1 核心面试题
Q1: AQS的核心原理是什么?
答案: AQS基于以下核心原理:
- 状态管理:使用volatile int state表示同步状态
- FIFO队列:维护等待线程的先进先出队列
- 模板方法:定义获取/释放的框架,子类实现具体逻辑
- CAS操作:保证状态更新的原子性
- LockSupport:实现线程的阻塞和唤醒
Q2: AQS如何实现公平锁和非公平锁?
答案:
// 公平锁:检查队列中是否有前驱节点
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
// 非公平锁:直接尝试CAS获取
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
Q3: 为什么AQS要从尾部向前遍历?
答案: 因为节点入队时的操作顺序:
node.prev = pred; // 1. 设置prev指针
compareAndSetTail(pred, node); // 2. CAS设置tail
pred.next = node; // 3. 设置next指针
步骤3不是原子的,可能在设置next指针前被中断,导致从前向后遍历丢失节点。而prev指针在CAS成功前就已设置,从后向前遍历保证完整性。
Q4: AQS中的waitStatus有哪些状态?
答案:
- SIGNAL(-1):后继节点需要被唤醒
- CANCELLED(1):节点被取消,不再等待
- CONDITION(-2):节点在条件队列中等待
- PROPAGATE(-3):共享模式下的释放需要传播给后续节点
- 0:初始状态
Q5: AQS如何处理中断?
答案: AQS提供了三种中断处理方式:
- 不响应中断:
acquire()方法,获取锁后恢复中断状态 - 抛出异常:
acquireInterruptibly()方法,立即抛出InterruptedException - 超时中断:
tryAcquireNanos()方法,超时或中断时返回
6.2 性能优化要点
6.2.1 减少CAS竞争
// 快速路径:先检查tail是否为null
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 慢速路径:完整的入队逻辑
enq(node);
6.2.2 自旋优化
// 只有前驱是头节点才尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
6.2.3 内存布局优化
// 使用Unsafe直接操作内存
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
6.3 最佳实践
6.3.1 正确实现tryAcquire
protected boolean tryAcquire(int arg) {
// 1. 获取当前状态
int state = getState();
// 2. 判断是否可以获取
if (canAcquire(state, arg)) {
// 3. CAS更新状态
if (compareAndSetState(state, newState)) {
// 4. 设置独占线程(如果需要)
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
return false;
}
6.3.2 正确实现tryRelease
protected boolean tryRelease(int arg) {
// 1. 检查当前线程是否持有锁
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
// 2. 计算新状态
int newState = getState() - arg;
boolean free = (newState == 0);
// 3. 如果完全释放,清除独占线程
if (free) {
setExclusiveOwnerThread(null);
}
// 4. 更新状态(不需要CAS,因为是独占的)
setState(newState);
return free;
}
6.4 常见陷阱
6.4.1 重入性处理
// 错误:没有处理重入
if (getState() == 0 && compareAndSetState(0, 1)) {
return true;
}
// 正确:处理重入
if (getState() == 0) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
} else if (Thread.currentThread() == getExclusiveOwnerThread()) {
setState(getState() + 1);
return true;
}
6.4.2 状态溢出检查
// 检查重入次数溢出
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
附录:源码调试与图示
A.1 调试AQS的方法
/**
* 调试工具类
*/
public class AQSDebugger {
public static void printQueueState(AbstractQueuedSynchronizer aqs) {
try {
Field headField = AbstractQueuedSynchronizer.class.getDeclaredField("head");
Field tailField = AbstractQueuedSynchronizer.class.getDeclaredField("tail");
headField.setAccessible(true);
tailField.setAccessible(true);
Object head = headField.get(aqs);
Object tail = tailField.get(aqs);
System.out.println("Head: " + head);
System.out.println("Tail: " + tail);
System.out.println("State: " + aqs.getState());
// 遍历队列
if (head != null) {
printNode(head, "HEAD");
Object current = getNext(head);
int index = 1;
while (current != null) {
printNode(current, "Node-" + index);
current = getNext(current);
index++;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void printNode(Object node, String label) throws Exception {
if (node == null) return;
Class<?> nodeClass = node.getClass();
Field threadField = nodeClass.getDeclaredField("thread");
Field waitStatusField = nodeClass.getDeclaredField("waitStatus");
threadField.setAccessible(true);
waitStatusField.setAccessible(true);
Thread thread = (Thread) threadField.get(node);
int waitStatus = waitStatusField.getInt(node);
System.out.printf("%s: thread=%s, waitStatus=%d%n",
label,
thread != null ? thread.getName() : "null",
waitStatus);
}
private static Object getNext(Object node) throws Exception {
Field nextField = node.getClass().getDeclaredField("next");
nextField.setAccessible(true);
return nextField.get(node);
}
}
A.2 完整的执行流程图
AQS获取锁的完整流程:
1. acquire(1)
↓
2. tryAcquire(1) ——→ 成功 ——→ 返回
↓ 失败
3. addWaiter(EXCLUSIVE)
↓
4. acquireQueued(node, 1)
↓
5. 循环:
- predecessor() == head?
↓ 是
- tryAcquire(1) ——→ 成功 ——→ setHead(node) ——→ 返回
↓ 失败
- shouldParkAfterFailedAcquire()
↓
- parkAndCheckInterrupt()
↓
- 被唤醒,继续循环
AQS释放锁的完整流程:
1. release(1)
↓
2. tryRelease(1) ——→ 失败 ——→ 返回false
↓ 成功
3. head != null && head.waitStatus != 0?
↓ 是
4. unparkSuccessor(head)
↓
5. 找到后继节点并唤醒
↓
6. 返回true
A.3 内存模型分析
/**
* AQS的内存可见性保证
*/
public class MemoryModelAnalysis {
/*
* 1. volatile state 保证状态的可见性
* 2. volatile head/tail 保证队列结构的可见性
* 3. CAS操作提供内存屏障
* 4. LockSupport.park/unpark 提供happens-before关系
*
* 内存屏障效果:
* - CAS操作前的写入对后续读取可见
* - unpark操作前的写入对park后的读取可见
* - volatile写入对后续volatile读取可见
*/
}
A.4 性能测试代码
/**
* AQS性能测试
*/
public class AQSPerformanceTest {
private static final int THREAD_COUNT = 10;
private static final int ITERATIONS = 100000;
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
try {
startLatch.await();
for (int j = 0; j < ITERATIONS; j++) {
lock.lock();
try {
// 临界区操作
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
}).start();
}
long startTime = System.nanoTime();
startLatch.countDown();
endLatch.await();
long endTime = System.nanoTime();
System.out.printf("Total time: %.2f ms%n",
(endTime - startTime) / 1_000_000.0);
System.out.printf("Operations per second: %.0f%n",
(THREAD_COUNT * ITERATIONS * 1_000_000_000.0) / (endTime - startTime));
}
}
总结
AQS作为Java并发包的核心基础设施,其设计体现了以下重要思想:
- 分层设计:将同步状态管理和线程排队分离
- 模板方法:提供框架,子类实现具体策略
- 无锁编程:大量使用CAS避免重量级锁
- 性能优化:从队列结构到内存布局的全方位优化
- 可扩展性:支持独占、共享、条件等多种同步模式
理解AQS的原理和实现细节,对于深入掌握Java并发编程具有重要意义。通过学习AQS,我们不仅能更好地使用现有的同步工具,还能设计出高效的自定义同步器。
文章标签
冬眠
博主专注于技术、阅读与思考。在这里记录学习、思考与生活。
第 2 篇,共 6 篇
