冬眠的笔记
首页文章分类书单项目关于
冬眠
X

© 2026 冬眠的笔记 · 用文字记录思考,用思考改变生活

首页>文章>Java
JavaJUCAQS并发

AbstractQueuedSynchronizer (AQS) 详解

AQS 同步器框架的核心设计:state 状态、CLH 队列、独占/共享模式以及自定义同步器的实现

冬眠
冬眠
专注于技术、阅读与思考
2025-11-19
发布日期
23 min read
阅读时长
浏览量
AbstractQueuedSynchronizer (AQS) 详解

一、概述

1.1 什么是AQS

AbstractQueuedSynchronizer(AQS)是Java并发包java.util.concurrent的核心基础框架,由Doug Lea设计并实现。它为实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等)提供了一个框架。

AQS 采用模板方法设计模式,将同步器的核心逻辑抽象为对​​同步状态​​的管理和​​线程排队​​机制。

核心特点:

  • 基于FIFO队列的同步框架
  • 使用int类型的state变量表示同步状态
  • 提供独占和共享两种模式
  • 支持条件队列(Condition)
  • 高性能、可扩展的设计

1.2 AQS的重要性

AQS是Java并发编程的基石,以下重要的同步工具都基于AQS实现:

  • ReentrantLock:可重入锁
  • Semaphore:信号量
  • CountDownLatch:倒计时门闩
  • CyclicBarrier:循环屏障
  • ReentrantReadWriteLock:读写锁

1.3 设计理念

// AQS的核心设计理念
public abstract class AbstractQueuedSynchronizer {
    // 同步状态
    private volatile int state;
    
    // 等待队列的头节点
    private transient volatile Node head;
    
    // 等待队列的尾节点
    private transient volatile Node tail;
}

二、核心数据结构

状态管理机制 state

AQS 使用一个 volatile 修饰的 int 类型变量 state 表示同步状态,提供三种原子操作方法:

/**
 * 同步状态,使用volatile保证可见性
 * 不同的同步器对state有不同的语义:
 * - ReentrantLock: 0表示未锁定,>0表示锁定次数
 * - Semaphore: 表示可用许可数
 * - CountDownLatch: 表示剩余计数
 */
private volatile int state;

// 获取同步状态
protected final int getState() {
    return state;
}

// 设置同步状态
protected final void setState(int newState) {
    state = newState;
}

// CAS操作更新状态
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

不同同步器对 state 的解释不同:

  • ReentrantLock:state=0 表示未锁定,>0 表示重入次数
  • Semaphore:state 表示可用许可数量
  • CountDownLatch:state 表示计数器值
  • ReentrantReadWriteLock:高16位表示读锁持有数,低16位表示写锁重入次数

等待队列节点 Node

AQS 内部维护一个 CLH 变体的 FIFO 双向队列(虚拟队列),用于管理等待线程。队列中的节点是 Node 内部类实例,包含:

  • waitStatus:节点状态(CANCELLED、SIGNAL、CONDITION、PROPAGATE、0)
  • prev/next:前后节点指针
  • thread:关联的线程
  • nextWaiter:标记共享/独占模式或条件队列的下个节点
static final class Node {
    // 共享模式标记
    static final Node SHARED = new Node();
    // 独占模式标记
    static final Node EXCLUSIVE = null;
    
    // 等待状态常量
    static final int CANCELLED =  1;  // 节点被取消
    static final int SIGNAL    = -1;  // 后继节点需要被唤醒
    static final int CONDITION = -2;  // 节点在条件队列中等待
    static final int PROPAGATE = -3;  // 共享模式下的释放操作需要传播
    
    // 等待状态
    volatile int waitStatus;
    
    // 前驱节点
    volatile Node prev;
    
    // 后继节点
    volatile Node next;
    
    // 等待的线程
    volatile Thread thread;
    
    // 下一个等待条件的节点,或者共享模式标记
    Node nextWaiter;
    
    // 判断是否为共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    // 获取前驱节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
}

2.3 队列结构图示

等待队列结构:
     head                tail
      ↓                   ↓
   [Node] ←→ [Node] ←→ [Node] ←→ [Node]
     ↑                           ↑
  虚拟头节点                   最后加入的节点

每个Node包含:
- waitStatus: 等待状态
- prev: 前驱节点
- next: 后继节点  
- thread: 等待的线程
- nextWaiter: 模式标记或条件队列下一节点

三、源码解析

独占模式获取锁

acquire方法

/**
 * 独占模式获取锁的入口方法
 * @param arg 获取锁的参数
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

执行流程:

  1. 调用 tryAcquire(arg) 尝试获取锁,该方法由子类实现
  2. 如果获取失败,调用 addWaiter(Node.EXCLUSIVE) 将当前线程加入等待队列(独占模式)
  3. 调用acquireQueued()在队列中自旋尝试获取锁
  4. 如果在等待过程中被中断,调用selfInterrupt()重新设置中断状态

3.1.2 tryAcquire方法(模板方法)

由子类实现。

/**
 * 尝试获取锁的模板方法,由子类实现具体逻辑
 * @param arg 获取参数
 * @return true表示获取成功,false表示获取失败
 */
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// ReentrantLock中的实现示例
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 锁未被占用,尝试CAS获取
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 重入锁逻辑
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

3.1.3 addWaiter方法

/**
 * 将当前线程包装成Node并加入等待队列
 * @param mode 模式(独占或共享)
 * @return 新创建的节点
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    
    // 快速尝试在队尾添加
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 快速添加失败,使用完整的入队逻辑
    enq(node);
    return node;
}

/**
 * 完整的入队逻辑,包含队列初始化
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 队列为空,需要初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

3.1.4 acquireQueued方法

/**
 * 在队列中等待获取锁
 * @param node 当前线程对应的节点
 * @param arg 获取参数
 * @return 是否被中断
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果前驱是头节点,尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            
            // 检查是否需要阻塞,并阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.1.5 shouldParkAfterFailedAcquire方法

/**
 * 检查获取锁失败后是否需要阻塞
 * @param pred 前驱节点
 * @param node 当前节点
 * @return 是否需要阻塞
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前驱节点已经设置了SIGNAL状态,可以安全阻塞
        return true;
    if (ws > 0) {
        // 前驱节点被取消,需要跳过
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将前驱节点状态设置为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

3.2 独占模式释放锁

3.2.1 release方法

/**
 * 独占模式释放锁
 * @param arg 释放参数
 * @return 是否释放成功
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

3.2.2 tryRelease方法(模板方法)

/**
 * 尝试释放锁的模板方法
 */
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

// ReentrantLock中的实现
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

3.2.3 unparkSuccessor方法

/**
 * 唤醒后继节点
 * @param node 当前节点(通常是头节点)
 */
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾部向前查找最前面的有效节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

3.3 共享模式

3.3.1 acquireShared方法

/**
 * 共享模式获取锁
 */
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

/**
 * 共享模式的队列等待逻辑
 */
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.3.2 releaseShared方法

/**
 * 共享模式释放锁
 */
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

/**
 * 共享模式的释放逻辑
 */
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}

3.4 条件队列(Condition)

public class ConditionObject implements Condition, java.io.Serializable {
    // 条件队列的第一个节点
    private transient Node firstWaiter;
    // 条件队列的最后一个节点
    private transient Node lastWaiter;
    
    /**
     * 等待条件
     */
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    
    /**
     * 唤醒等待的线程
     */
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
}

四、关键设计思想与优化

4.1 模板方法模式

AQS使用模板方法模式,定义了同步器的骨架,子类只需实现特定的方法:

// 独占模式需要实现的方法
protected boolean tryAcquire(int arg)        // 尝试获取锁
protected boolean tryRelease(int arg)        // 尝试释放锁
protected boolean isHeldExclusively()        // 是否独占持有

// 共享模式需要实现的方法
protected int tryAcquireShared(int arg)      // 尝试共享获取
protected boolean tryReleaseShared(int arg)  // 尝试共享释放

4.2 CAS无锁编程

AQS大量使用CAS操作来保证线程安全,避免了重量级锁的开销:

// 状态更新
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

// 队列操作
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

4.3 LockSupport的使用

AQS使用LockSupport.park()和LockSupport.unpark()来实现线程的阻塞和唤醒:

/**
 * 阻塞当前线程并检查中断
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

LockSupport的优势:

  • 不需要获取对象监视器
  • 可以在任意位置阻塞和唤醒线程
  • 支持超时等待
  • 不会抛出InterruptedException

4.4 FIFO队列的优化

4.4.1 虚拟头节点

// 队列初始化时创建虚拟头节点
if (compareAndSetHead(new Node()))
    tail = head;

优势:

  • 简化了队列操作逻辑
  • 避免了空队列的特殊处理
  • 头节点不存储线程信息,只作为标记

4.4.2 从尾部向前遍历

// 在unparkSuccessor中从尾部向前查找
for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
        s = t;

原因:

  • node.next = null的操作不是原子的
  • 从前向后遍历可能会遗漏节点
  • 从后向前遍历保证了遍历的完整性

4.5 内存屏障与可见性

// 使用volatile保证可见性
private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;

// Node中的关键字段也使用volatile
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;

五、应用实例

5.1 ReentrantLock的实现

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 非公平锁的获取逻辑
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        
        // 释放锁的逻辑
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }
    
    // 非公平锁实现
    static final class NonfairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    // 公平锁实现
    static final class FairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

5.2 Semaphore的实现

public class Semaphore implements java.io.Serializable {
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits);
        }
        
        final int getPermits() {
            return getState();
        }
        
        // 非公平获取
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        
        // 释放许可
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    }
    
    // 非公平信号量
    static final class NonfairSync extends Sync {
        NonfairSync(int permits) {
            super(permits);
        }
        
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    
    // 公平信号量
    static final class FairSync extends Sync {
        FairSync(int permits) {
            super(permits);
        }
        
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

5.3 CountDownLatch的实现

public class CountDownLatch {
    private final Sync sync;
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);
        }
        
        int getCount() {
            return getState();
        }
        
        // 尝试获取(等待计数为0)
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        
        // 倒计时(减少计数)
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    
    // 等待计数为0
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    // 减少计数
    public void countDown() {
        sync.releaseShared(1);
    }
}

5.4 自定义同步器示例

/**
 * 自定义的互斥锁,最多允许两个线程同时访问
 */
public class TwoThreadMutex {
    private final Sync sync = new Sync();
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        // 尝试获取锁
        protected boolean tryAcquire(int arg) {
            int state = getState();
            if (state < 2) {
                if (compareAndSetState(state, state + 1)) {
                    return true;
                }
            }
            return false;
        }
        
        // 尝试释放锁
        protected boolean tryRelease(int arg) {
            int state = getState();
            if (state > 0) {
                if (compareAndSetState(state, state - 1)) {
                    return state == 1; // 当状态变为0时返回true
                }
            }
            return false;
        }
    }
    
    public void lock() {
        sync.acquire(1);
    }
    
    public void unlock() {
        sync.release(1);
    }
    
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
}

六、常见面试问题与总结

6.1 核心面试题

Q1: AQS的核心原理是什么?

答案: AQS基于以下核心原理:

  1. 状态管理:使用volatile int state表示同步状态
  2. FIFO队列:维护等待线程的先进先出队列
  3. 模板方法:定义获取/释放的框架,子类实现具体逻辑
  4. CAS操作:保证状态更新的原子性
  5. LockSupport:实现线程的阻塞和唤醒

Q2: AQS如何实现公平锁和非公平锁?

答案:

// 公平锁:检查队列中是否有前驱节点
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
}

// 非公平锁:直接尝试CAS获取
if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
}

Q3: 为什么AQS要从尾部向前遍历?

答案: 因为节点入队时的操作顺序:

node.prev = pred;           // 1. 设置prev指针
compareAndSetTail(pred, node); // 2. CAS设置tail
pred.next = node;           // 3. 设置next指针

步骤3不是原子的,可能在设置next指针前被中断,导致从前向后遍历丢失节点。而prev指针在CAS成功前就已设置,从后向前遍历保证完整性。

Q4: AQS中的waitStatus有哪些状态?

答案:

  • SIGNAL(-1):后继节点需要被唤醒
  • CANCELLED(1):节点被取消,不再等待
  • CONDITION(-2):节点在条件队列中等待
  • PROPAGATE(-3):共享模式下的释放需要传播给后续节点
  • 0:初始状态

Q5: AQS如何处理中断?

答案: AQS提供了三种中断处理方式:

  1. 不响应中断:acquire()方法,获取锁后恢复中断状态
  2. 抛出异常:acquireInterruptibly()方法,立即抛出InterruptedException
  3. 超时中断:tryAcquireNanos()方法,超时或中断时返回

6.2 性能优化要点

6.2.1 减少CAS竞争

// 快速路径:先检查tail是否为null
Node pred = tail;
if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
    }
}
// 慢速路径:完整的入队逻辑
enq(node);

6.2.2 自旋优化

// 只有前驱是头节点才尝试获取锁
if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null;
    failed = false;
    return interrupted;
}

6.2.3 内存布局优化

// 使用Unsafe直接操作内存
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;

6.3 最佳实践

6.3.1 正确实现tryAcquire

protected boolean tryAcquire(int arg) {
    // 1. 获取当前状态
    int state = getState();
    
    // 2. 判断是否可以获取
    if (canAcquire(state, arg)) {
        // 3. CAS更新状态
        if (compareAndSetState(state, newState)) {
            // 4. 设置独占线程(如果需要)
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
    }
    return false;
}

6.3.2 正确实现tryRelease

protected boolean tryRelease(int arg) {
    // 1. 检查当前线程是否持有锁
    if (Thread.currentThread() != getExclusiveOwnerThread()) {
        throw new IllegalMonitorStateException();
    }
    
    // 2. 计算新状态
    int newState = getState() - arg;
    boolean free = (newState == 0);
    
    // 3. 如果完全释放,清除独占线程
    if (free) {
        setExclusiveOwnerThread(null);
    }
    
    // 4. 更新状态(不需要CAS,因为是独占的)
    setState(newState);
    return free;
}

6.4 常见陷阱

6.4.1 重入性处理

// 错误:没有处理重入
if (getState() == 0 && compareAndSetState(0, 1)) {
    return true;
}

// 正确:处理重入
if (getState() == 0) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
} else if (Thread.currentThread() == getExclusiveOwnerThread()) {
    setState(getState() + 1);
    return true;
}

6.4.2 状态溢出检查

// 检查重入次数溢出
int nextc = c + acquires;
if (nextc < 0) {
    throw new Error("Maximum lock count exceeded");
}

附录:源码调试与图示

A.1 调试AQS的方法

/**
 * 调试工具类
 */
public class AQSDebugger {
    public static void printQueueState(AbstractQueuedSynchronizer aqs) {
        try {
            Field headField = AbstractQueuedSynchronizer.class.getDeclaredField("head");
            Field tailField = AbstractQueuedSynchronizer.class.getDeclaredField("tail");
            headField.setAccessible(true);
            tailField.setAccessible(true);
            
            Object head = headField.get(aqs);
            Object tail = tailField.get(aqs);
            
            System.out.println("Head: " + head);
            System.out.println("Tail: " + tail);
            System.out.println("State: " + aqs.getState());
            
            // 遍历队列
            if (head != null) {
                printNode(head, "HEAD");
                Object current = getNext(head);
                int index = 1;
                while (current != null) {
                    printNode(current, "Node-" + index);
                    current = getNext(current);
                    index++;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private static void printNode(Object node, String label) throws Exception {
        if (node == null) return;
        
        Class<?> nodeClass = node.getClass();
        Field threadField = nodeClass.getDeclaredField("thread");
        Field waitStatusField = nodeClass.getDeclaredField("waitStatus");
        threadField.setAccessible(true);
        waitStatusField.setAccessible(true);
        
        Thread thread = (Thread) threadField.get(node);
        int waitStatus = waitStatusField.getInt(node);
        
        System.out.printf("%s: thread=%s, waitStatus=%d%n", 
                         label, 
                         thread != null ? thread.getName() : "null", 
                         waitStatus);
    }
    
    private static Object getNext(Object node) throws Exception {
        Field nextField = node.getClass().getDeclaredField("next");
        nextField.setAccessible(true);
        return nextField.get(node);
    }
}

A.2 完整的执行流程图

AQS获取锁的完整流程:

1. acquire(1)
   ↓
2. tryAcquire(1) ——→ 成功 ——→ 返回
   ↓ 失败
3. addWaiter(EXCLUSIVE)
   ↓
4. acquireQueued(node, 1)
   ↓
5. 循环:
   - predecessor() == head?
     ↓ 是
   - tryAcquire(1) ——→ 成功 ——→ setHead(node) ——→ 返回
     ↓ 失败
   - shouldParkAfterFailedAcquire()
     ↓
   - parkAndCheckInterrupt()
     ↓
   - 被唤醒,继续循环

AQS释放锁的完整流程:

1. release(1)
   ↓
2. tryRelease(1) ——→ 失败 ——→ 返回false
   ↓ 成功
3. head != null && head.waitStatus != 0?
   ↓ 是
4. unparkSuccessor(head)
   ↓
5. 找到后继节点并唤醒
   ↓
6. 返回true

A.3 内存模型分析

/**
 * AQS的内存可见性保证
 */
public class MemoryModelAnalysis {
    /*
     * 1. volatile state 保证状态的可见性
     * 2. volatile head/tail 保证队列结构的可见性
     * 3. CAS操作提供内存屏障
     * 4. LockSupport.park/unpark 提供happens-before关系
     * 
     * 内存屏障效果:
     * - CAS操作前的写入对后续读取可见
     * - unpark操作前的写入对park后的读取可见
     * - volatile写入对后续volatile读取可见
     */
}

A.4 性能测试代码

/**
 * AQS性能测试
 */
public class AQSPerformanceTest {
    private static final int THREAD_COUNT = 10;
    private static final int ITERATIONS = 100000;
    
    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        CountDownLatch startLatch = new CountDownLatch(1);
        CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
        
        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(() -> {
                try {
                    startLatch.await();
                    for (int j = 0; j < ITERATIONS; j++) {
                        lock.lock();
                        try {
                            // 临界区操作
                        } finally {
                            lock.unlock();
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    endLatch.countDown();
                }
            }).start();
        }
        
        long startTime = System.nanoTime();
        startLatch.countDown();
        endLatch.await();
        long endTime = System.nanoTime();
        
        System.out.printf("Total time: %.2f ms%n", 
                         (endTime - startTime) / 1_000_000.0);
        System.out.printf("Operations per second: %.0f%n", 
                         (THREAD_COUNT * ITERATIONS * 1_000_000_000.0) / (endTime - startTime));
    }
}

总结

AQS作为Java并发包的核心基础设施,其设计体现了以下重要思想:

  1. 分层设计:将同步状态管理和线程排队分离
  2. 模板方法:提供框架,子类实现具体策略
  3. 无锁编程:大量使用CAS避免重量级锁
  4. 性能优化:从队列结构到内存布局的全方位优化
  5. 可扩展性:支持独占、共享、条件等多种同步模式

理解AQS的原理和实现细节,对于深入掌握Java并发编程具有重要意义。通过学习AQS,我们不仅能更好地使用现有的同步工具,还能设计出高效的自定义同步器。

文章标签

JavaJUCAQS并发源码分析
ReentrantLock 源码解析及面试题
上一篇

ReentrantLock 源码解析及面试题

2025-11-19

JUC 中的锁
下一篇

JUC 中的锁

2025-11-19

冬眠

冬眠

博主

专注于技术、阅读与思考。在这里记录学习、思考与生活。

116
文章
3
分类
关注我
系列:Java 锁

第 2 篇,共 6 篇

上一篇

JUC 中的锁

下一篇

ReentrantLock 源码解析及面试题

文章目录

目录

  • 一、概述
  • 二、核心数据结构
  • 三、源码解析
  • 四、关键设计思想与优化
  • 五、应用实例
  • 六、常见面试问题与总结
  • 附录:源码调试与图示
  • 总结

相关文章

查看更多
CountDownLatch 源码分析及面试题

CountDownLatch 源码分析及面试题

2025-11-19 · 20 min read

JUC 中的锁

JUC 中的锁

2025-11-19 · 11 min read

独占锁和共享锁

独占锁和共享锁

2025-11-19 · 32 min read