冬眠的笔记
首页文章分类书单项目关于
冬眠
X

© 2026 冬眠的笔记 · 用文字记录思考,用思考改变生活

首页>文章>Java
JavaJUCCountDownLatch并发

CountDownLatch 源码分析及面试题

CountDownLatch 基于 AQS 共享模式的实现原理、典型用法和使用注意点

冬眠
冬眠
专注于技术、阅读与思考
2025-11-19
发布日期
20 min read
阅读时长
浏览量
CountDownLatch 源码分析及面试题

概述和核心特性

CountDownLatch(倒计时门闩)是Java并发包(java.util.concurrent)中的一个同步工具类,它允许一个或多个线程等待其他线程完成操作。

主要特性

  1. 一次性使用:CountDownLatch是一次性的,计数器不能被重置
  2. 共享等待:多个线程可以同时等待同一个CountDownLatch
  3. 灵活的等待方式:支持无限等待、超时等待和可中断等待
  4. 基于AQS实现:底层使用AbstractQueuedSynchronizer的共享锁模式
  5. 线程安全:所有操作都是线程安全的

核心概念

// CountDownLatch的基本结构
public class CountDownLatch {
    private final Sync sync;
    
    // 构造函数:设置初始计数值
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    
    // 等待计数归零
    public void await() throws InterruptedException
    
    // 带超时的等待
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException
    
    // 递减计数
    public void countDown()
    
    // 获取当前计数值
    public long getCount()
}

基础概念和使用方式

构造函数

// 创建一个初始计数为5的CountDownLatch
CountDownLatch latch = new CountDownLatch(5);
  • count参数:初始计数值,必须为非负数
  • 一次性设置:构造后无法修改初始计数值

核心方法

1. await() - 等待计数归零

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

2. countDown() - 递减计数

public void countDown() {
    sync.releaseShared(1);
}

3. getCount() - 获取当前计数

public long getCount() {
    return sync.getCount();
}

基本使用示例

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 3;
        CountDownLatch latch = new CountDownLatch(threadCount);
        
        // 启动多个工作线程
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Worker(latch, "Worker-" + i)).start();
        }
        
        System.out.println("主线程等待所有工作线程完成...");
        latch.await(); // 等待所有线程完成
        System.out.println("所有工作线程已完成,主线程继续执行");
    }
    
    static class Worker implements Runnable {
        private final CountDownLatch latch;
        private final String name;
        
        Worker(CountDownLatch latch, String name) {
            this.latch = latch;
            this.name = name;
        }
        
        @Override
        public void run() {
            try {
                System.out.println(name + " 开始工作");
                Thread.sleep(2000); // 模拟工作
                System.out.println(name + " 完成工作");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                latch.countDown(); // 完成工作后递减计数
            }
        }
    }
}

AQS共享锁实现原理

CountDownLatch基于AbstractQueuedSynchronizer(AQS)的共享锁模式实现,这与ReentrantLock的独占锁模式不同。

AQS共享锁模式特点

  1. 多个线程可以同时获取锁:所有等待的线程在条件满足时会同时被唤醒
  2. 状态共享:多个线程共享同一个同步状态
  3. 传播性唤醒:一个线程的释放操作可能唤醒多个等待线程

CountDownLatch中的AQS实现

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    
    // 构造函数:设置初始状态
    Sync(int count) {
        setState(count);
    }
    
    // 获取当前计数
    int getCount() {
        return getState();
    }
    
    // 尝试获取共享锁(用于await)
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    // 尝试释放共享锁(用于countDown)
    protected boolean tryReleaseShared(int releases) {
        // 循环CAS递减计数
        for (;;) {
            int c = getState();
            if (c == 0)
                return false; // 已经为0,无需释放
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0; // 返回是否需要唤醒等待线程
        }
    }
}

状态变化流程

// 初始状态:state = count(如3)
// ┌─────────────────────────────────────┐
// │ state = 3 (初始计数)                 │
// └─────────────────────────────────────┘
//                    ↓
// 第一次countDown(): state = 2
// ┌─────────────────────────────────────┐
// │ state = 2 (还有2个任务未完成)        │
// └─────────────────────────────────────┘
//                    ↓
// 第二次countDown(): state = 1
// ┌─────────────────────────────────────┐
// │ state = 1 (还有1个任务未完成)        │
// └─────────────────────────────────────┘
//                    ↓
// 第三次countDown(): state = 0
// ┌─────────────────────────────────────┐
// │ state = 0 (所有任务完成,唤醒等待线程) │
// └─────────────────────────────────────┘

源码深度分析

构造函数分析

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

// Sync构造函数
Sync(int count) {
    setState(count); // 设置AQS的state为初始计数值
}

关键点:

  • 参数校验:count必须非负
  • 直接设置AQS的state为初始计数值
  • 创建内部Sync对象

await()方法分析

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// AQS中的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

// CountDownLatch中的tryAcquireShared实现
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

执行流程:

  1. 检查线程中断状态
  2. 调用tryAcquireShared检查state是否为0
  3. 如果state为0,直接返回(不需要等待)
  4. 如果state不为0,进入doAcquireSharedInterruptibly等待

countDown()方法分析

public void countDown() {
    sync.releaseShared(1);
}

// AQS中的releaseShared方法
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared(); // 唤醒等待线程
        return true;
    }
    return false;
}

// CountDownLatch中的tryReleaseShared实现
protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false; // 已经为0,无需释放
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0; // 只有当计数变为0时才返回true
    }
}

执行流程:

  1. 使用CAS循环递减state
  2. 如果state已经为0,直接返回false
  3. 如果递减后state变为0,返回true并触发doReleaseShared
  4. doReleaseShared会唤醒所有等待的线程

带超时的await()方法

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// AQS中的tryAcquireSharedNanos方法
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
           doAcquireSharedNanos(arg, nanosTimeout);
}

特点:

  • 支持超时等待
  • 超时后返回false,正常完成返回true
  • 同样支持中断响应

实际应用场景

1. 主线程等待多个子线程完成

public class ParallelTaskExample {
    public static void main(String[] args) throws InterruptedException {
        int taskCount = 5;
        CountDownLatch latch = new CountDownLatch(taskCount);
        List<Future<String>> futures = new ArrayList<>();
        ExecutorService executor = Executors.newFixedThreadPool(taskCount);
        
        // 提交多个任务
        for (int i = 0; i < taskCount; i++) {
            final int taskId = i;
            Future<String> future = executor.submit(() -> {
                try {
                    // 模拟任务执行
                    Thread.sleep(1000 + taskId * 500);
                    return "Task " + taskId + " completed";
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return "Task " + taskId + " interrupted";
                } finally {
                    latch.countDown(); // 任务完成后递减计数
                }
            });
            futures.add(future);
        }
        
        // 等待所有任务完成
        System.out.println("等待所有任务完成...");
        latch.await();
        System.out.println("所有任务已完成!");
        
        // 收集结果
        for (Future<String> future : futures) {
            try {
                System.out.println(future.get());
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        
        executor.shutdown();
    }
}

2. 服务启动协调

public class ServiceStartupCoordinator {
    private final CountDownLatch startupLatch;
    private final List<Service> services;
    
    public ServiceStartupCoordinator(List<Service> services) {
        this.services = services;
        this.startupLatch = new CountDownLatch(services.size());
    }
    
    public void startAllServices() throws InterruptedException {
        // 启动所有服务
        for (Service service : services) {
            new Thread(() -> {
                try {
                    service.start();
                    System.out.println(service.getName() + " 启动完成");
                } catch (Exception e) {
                    System.err.println(service.getName() + " 启动失败: " + e.getMessage());
                } finally {
                    startupLatch.countDown();
                }
            }).start();
        }
        
        // 等待所有服务启动完成
        System.out.println("等待所有服务启动...");
        startupLatch.await();
        System.out.println("所有服务启动完成,系统就绪!");
    }
    
    interface Service {
        void start() throws Exception;
        String getName();
    }
    
    static class DatabaseService implements Service {
        @Override
        public void start() throws Exception {
            Thread.sleep(2000); // 模拟数据库连接初始化
        }
        
        @Override
        public String getName() {
            return "Database Service";
        }
    }
    
    static class CacheService implements Service {
        @Override
        public void start() throws Exception {
            Thread.sleep(1000); // 模拟缓存初始化
        }
        
        @Override
        public String getName() {
            return "Cache Service";
        }
    }
}

3. 并发测试场景

public class ConcurrentTester {
    public void testConcurrentAccess() throws InterruptedException {
        int threadCount = 100;
        CountDownLatch startLatch = new CountDownLatch(1); // 控制所有线程同时开始
        CountDownLatch endLatch = new CountDownLatch(threadCount); // 等待所有线程完成
        
        AtomicInteger counter = new AtomicInteger(0);
        List<Thread> threads = new ArrayList<>();
        
        // 创建测试线程
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(() -> {
                try {
                    startLatch.await(); // 等待开始信号
                    
                    // 执行并发操作
                    for (int j = 0; j < 1000; j++) {
                        counter.incrementAndGet();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    endLatch.countDown();
                }
            });
            threads.add(thread);
            thread.start();
        }
        
        System.out.println("所有线程已准备就绪,开始并发测试...");
        long startTime = System.currentTimeMillis();
        
        startLatch.countDown(); // 发出开始信号
        endLatch.await(); // 等待所有线程完成
        
        long endTime = System.currentTimeMillis();
        System.out.println("并发测试完成!");
        System.out.println("执行时间: " + (endTime - startTime) + "ms");
        System.out.println("最终计数: " + counter.get());
        System.out.println("期望计数: " + (threadCount * 1000));
    }
}

与其他同步工具对比

CountDownLatch vs CyclicBarrier

特性 CountDownLatch CyclicBarrier
可重用性 一次性使用 可重复使用
等待方式 一个或多个线程等待其他线程 所有线程相互等待
计数方向 递减(countDown) 递增(await)
触发条件 计数归零 所有线程到达屏障点
回调支持 不支持 支持barrierAction
异常处理 不影响其他线程 一个线程异常会影响所有线程
// CountDownLatch示例:主线程等待工作线程
CountDownLatch latch = new CountDownLatch(3);
// 工作线程完成后调用latch.countDown()
// 主线程调用latch.await()等待

// CyclicBarrier示例:所有线程相互等待
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("所有线程都到达了屏障点!");
});
// 每个线程都调用barrier.await()等待其他线程

CountDownLatch vs Semaphore

特性 CountDownLatch Semaphore
用途 等待事件完成 控制资源访问
计数含义 剩余任务数 可用许可数
操作方式 只能递减 可增可减
阻塞条件 计数不为0时阻塞 无可用许可时阻塞
典型场景 任务协调 资源池管理
// CountDownLatch:等待任务完成
CountDownLatch latch = new CountDownLatch(5);
latch.await(); // 等待5个任务完成
latch.countDown(); // 完成一个任务

// Semaphore:控制资源访问
Semaphore semaphore = new Semaphore(3);
semaphore.acquire(); // 获取一个许可
semaphore.release(); // 释放一个许可

性能分析和优化

性能特点

  1. 轻量级同步:基于AQS实现,性能优秀
  2. 无锁竞争:countDown操作使用CAS,避免重量级锁
  3. 批量唤醒:所有等待线程同时被唤醒,效率高
  4. 内存开销小:只维护一个int状态和AQS队列

性能测试

public class CountDownLatchPerformanceTest {
    public static void main(String[] args) throws InterruptedException {
        testPerformance(10, 1000);   // 10个线程,1000次操作
        testPerformance(100, 1000);  // 100个线程,1000次操作
        testPerformance(1000, 100);  // 1000个线程,100次操作
    }
    
    private static void testPerformance(int threadCount, int operationsPerThread) 
            throws InterruptedException {
        System.out.println(String.format("\n=== 性能测试:%d个线程,每个线程%d次操作 ===", 
                                        threadCount, operationsPerThread));
        
        long startTime = System.currentTimeMillis();
        
        for (int round = 0; round < 10; round++) {
            CountDownLatch latch = new CountDownLatch(threadCount);
            
            // 启动工作线程
            for (int i = 0; i < threadCount; i++) {
                new Thread(() -> {
                    try {
                        // 模拟工作
                        for (int j = 0; j < operationsPerThread; j++) {
                            // 简单的计算操作
                            Math.sqrt(j);
                        }
                    } finally {
                        latch.countDown();
                    }
                }).start();
            }
            
            latch.await(); // 等待所有线程完成
        }
        
        long endTime = System.currentTimeMillis();
        long totalOperations = threadCount * operationsPerThread * 10;
        double avgTime = (endTime - startTime) / 10.0;
        
        System.out.println("平均执行时间: " + avgTime + "ms");
        System.out.println("总操作数: " + totalOperations);
        System.out.println("吞吐量: " + (totalOperations / (endTime - startTime) * 1000) + " ops/sec");
    }
}

常见面试题

1. CountDownLatch的工作原理是什么?

答案要点:

  • 基于AQS的共享锁模式实现
  • 使用state字段存储计数值
  • await()方法检查state是否为0,不为0则阻塞等待
  • countDown()方法使用CAS递减state,为0时唤醒所有等待线程
  • 支持多个线程同时等待,一次性唤醒所有等待线程

2. CountDownLatch与CyclicBarrier有什么区别?

答案要点:

  • 可重用性:CountDownLatch一次性,CyclicBarrier可重用
  • 等待模式:CountDownLatch是一个或多个线程等待其他线程,CyclicBarrier是所有线程相互等待
  • 计数方向:CountDownLatch递减,CyclicBarrier递增
  • 异常处理:CyclicBarrier中一个线程异常会影响所有线程
  • 回调支持:CyclicBarrier支持barrierAction回调

3. CountDownLatch的使用场景有哪些?

答案要点:

  1. 主线程等待子线程:主线程等待多个工作线程完成任务
  2. 服务启动协调:等待多个服务组件初始化完成
  3. 并发测试:控制多个线程同时开始执行
  4. 分阶段执行:确保前一阶段完成后再执行下一阶段
  5. 资源预加载:等待多个资源加载完成

4. 如何避免CountDownLatch的常见陷阱?

答案要点:

  • 忘记countDown:使用try-finally确保countDown被调用
  • 计数不匹配:确保countDown调用次数等于初始计数
  • 无限等待:使用带超时的await方法
  • 重复使用:CountDownLatch是一次性的,不能重置
  • 异常处理:在finally块中调用countDown

5. CountDownLatch的性能特点是什么?

答案要点:

  • 轻量级:基于AQS实现,性能优秀
  • 无锁操作:countDown使用CAS,避免重量级锁
  • 批量唤醒:所有等待线程同时被唤醒
  • 内存效率:只维护一个int状态和队列结构
  • 扩展性好:支持大量线程同时等待

最佳实践

1. 正确的使用模式

public class BestPracticeExample {
    
    // ✅ 推荐的使用模式
    public void correctUsage() throws InterruptedException {
        int taskCount = 5;
        CountDownLatch latch = new CountDownLatch(taskCount);
        ExecutorService executor = Executors.newFixedThreadPool(taskCount);
        
        try {
            // 提交任务
            for (int i = 0; i < taskCount; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    try {
                        // 执行任务逻辑
                        performTask(taskId);
                    } catch (Exception e) {
                        System.err.println("Task " + taskId + " failed: " + e.getMessage());
                    } finally {
                        latch.countDown(); // 确保总是调用
                    }
                });
            }
            
            // 等待完成(带超时)
            if (!latch.await(30, TimeUnit.SECONDS)) {
                System.err.println("Tasks did not complete within timeout");
            }
            
        } finally {
            executor.shutdown();
        }
    }
    
    private void performTask(int taskId) throws InterruptedException {
        // 模拟任务执行
        Thread.sleep(1000);
        System.out.println("Task " + taskId + " completed");
    }
}

2. 异常处理最佳实践

public class ExceptionHandlingExample {
    
    public void robustTaskExecution() throws InterruptedException {
        int taskCount = 3;
        CountDownLatch latch = new CountDownLatch(taskCount);
        AtomicInteger successCount = new AtomicInteger(0);
        AtomicInteger failureCount = new AtomicInteger(0);
        
        for (int i = 0; i < taskCount; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    // 可能抛出异常的任务
                    if (taskId == 1) {
                        throw new RuntimeException("Simulated failure");
                    }
                    
                    Thread.sleep(1000);
                    successCount.incrementAndGet();
                    System.out.println("Task " + taskId + " succeeded");
                    
                } catch (Exception e) {
                    failureCount.incrementAndGet();
                    System.err.println("Task " + taskId + " failed: " + e.getMessage());
                } finally {
                    latch.countDown(); // 无论成功失败都要countDown
                }
            }).start();
        }
        
        latch.await();
        
        System.out.println("Execution completed:");
        System.out.println("Success: " + successCount.get());
        System.out.println("Failure: " + failureCount.get());
    }
}

常见陷阱和解决方案

1. 忘记调用countDown()

// ❌ 问题代码
public class ProblematicCode {
    public void riskyTask(CountDownLatch latch) {
        try {
            // 执行任务
            if (someErrorCondition()) {
                return; // 直接返回,忘记countDown
            }
            // 正常逻辑
        } catch (Exception e) {
            // 异常处理,但忘记countDown
            handleException(e);
            return;
        }
        latch.countDown(); // 只有在正常情况下才会执行
    }
}

// ✅ 解决方案
public class SafeCode {
    public void safeTask(CountDownLatch latch) {
        try {
            // 执行任务
            if (someErrorCondition()) {
                return;
            }
            // 正常逻辑
        } catch (Exception e) {
            handleException(e);
        } finally {
            latch.countDown(); // 确保总是被调用
        }
    }
}

2. 计数不匹配

// ❌ 问题:计数与实际任务数不匹配
public class CountMismatchProblem {
    public void incorrectCount() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5); // 设置了5
        
        // 但只启动了3个任务
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                // 执行任务
                latch.countDown();
            }).start();
        }
        
        latch.await(); // 永远等待,因为计数永远不会归零
    }
}

// ✅ 解决方案:动态计算或验证
public class CountMatchSolution {
    public void correctCount() throws InterruptedException {
        List<Runnable> tasks = prepareTasks();
        int actualTaskCount = tasks.size();
        CountDownLatch latch = new CountDownLatch(actualTaskCount);
        
        for (Runnable task : tasks) {
            new Thread(() -> {
                try {
                    task.run();
                } finally {
                    latch.countDown();
                }
            }).start();
        }
        
        latch.await();
    }
    
    private List<Runnable> prepareTasks() {
        // 准备任务列表
        return Arrays.asList(
            () -> System.out.println("Task 1"),
            () -> System.out.println("Task 2"),
            () -> System.out.println("Task 3")
        );
    }
}

总结

CountDownLatch是Java并发编程中的重要同步工具,它基于AQS的共享锁模式实现,为多线程协调提供了简洁而强大的解决方案。

核心特性总结

  1. 一次性同步工具:计数器不可重置,适合一次性的任务协调
  2. 共享等待机制:多个线程可以同时等待,所有等待线程会同时被唤醒
  3. 灵活的等待方式:支持无限等待、超时等待和中断响应
  4. 高性能实现:基于AQS和CAS操作,避免重量级锁
  5. 线程安全:所有操作都是线程安全的

技术实现要点

  • AQS共享锁模式:与ReentrantLock的独占锁不同,支持多个线程同时获取
  • 状态管理:使用state字段存储计数值,通过CAS操作保证原子性
  • 等待机制:await()检查state是否为0,不为0则进入等待队列
  • 唤醒机制:countDown()递减state,为0时唤醒所有等待线程

适用场景

  1. 主线程等待子线程:等待多个工作线程完成任务
  2. 服务启动协调:等待多个组件初始化完成
  3. 并发测试:控制多个线程同时开始执行
  4. 分阶段执行:确保前置任务完成后再执行后续任务
  5. 批量处理:等待批量任务全部完成

最佳实践建议

  1. 正确使用模式:始终在finally块中调用countDown()
  2. 合理设置计数:确保计数值与实际任务数匹配
  3. 使用超时等待:避免无限等待导致的死锁
  4. 异常处理:无论任务成功失败都要countDown()
  5. 资源管理:及时释放引用,避免内存泄漏
  6. 监控调试:添加适当的日志和监控

常见陷阱避免

  • 忘记countDown():使用try-finally确保调用
  • 计数不匹配:动态计算任务数量
  • 重复使用:每次使用创建新的CountDownLatch
  • 无限等待:使用超时机制和异常处理
  • 内存泄漏:及时清理引用和设置清理机制

CountDownLatch虽然概念简单,但在实际使用中需要注意很多细节。掌握其原理和最佳实践,能够帮助我们写出更安全、高效的并发代码,也是Java并发编程面试中的重要考点。

文章标签

JavaJUCCountDownLatch并发源码分析
Semaphore 源码分析及面试题
上一篇

Semaphore 源码分析及面试题

2025-11-19

独占锁和共享锁
下一篇

独占锁和共享锁

2025-11-19

冬眠

冬眠

博主

专注于技术、阅读与思考。在这里记录学习、思考与生活。

116
文章
3
分类
关注我
系列:Java 锁

第 5 篇,共 6 篇

上一篇

独占锁和共享锁

下一篇

Semaphore 源码分析及面试题

文章目录

目录

  • 概述和核心特性
  • 基础概念和使用方式
  • AQS共享锁实现原理
  • 源码深度分析
  • 实际应用场景
  • 与其他同步工具对比
  • 性能分析和优化
  • 常见面试题
  • 最佳实践
  • 常见陷阱和解决方案
  • 总结

相关文章

查看更多
AbstractQueuedSynchronizer (AQS) 详解

AbstractQueuedSynchronizer (AQS) 详解

2025-11-19 · 23 min read

JUC 中的锁

JUC 中的锁

2025-11-19 · 11 min read

独占锁和共享锁

独占锁和共享锁

2025-11-19 · 32 min read