概述和核心特性
CountDownLatch(倒计时门闩)是Java并发包(java.util.concurrent)中的一个同步工具类,它允许一个或多个线程等待其他线程完成操作。
主要特性
- 一次性使用:CountDownLatch是一次性的,计数器不能被重置
- 共享等待:多个线程可以同时等待同一个CountDownLatch
- 灵活的等待方式:支持无限等待、超时等待和可中断等待
- 基于AQS实现:底层使用AbstractQueuedSynchronizer的共享锁模式
- 线程安全:所有操作都是线程安全的
核心概念
// CountDownLatch的基本结构
public class CountDownLatch {
private final Sync sync;
// 构造函数:设置初始计数值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 等待计数归零
public void await() throws InterruptedException
// 带超时的等待
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
// 递减计数
public void countDown()
// 获取当前计数值
public long getCount()
}
基础概念和使用方式
构造函数
// 创建一个初始计数为5的CountDownLatch
CountDownLatch latch = new CountDownLatch(5);
- count参数:初始计数值,必须为非负数
- 一次性设置:构造后无法修改初始计数值
核心方法
1. await() - 等待计数归零
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
2. countDown() - 递减计数
public void countDown() {
sync.releaseShared(1);
}
3. getCount() - 获取当前计数
public long getCount() {
return sync.getCount();
}
基本使用示例
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);
// 启动多个工作线程
for (int i = 0; i < threadCount; i++) {
new Thread(new Worker(latch, "Worker-" + i)).start();
}
System.out.println("主线程等待所有工作线程完成...");
latch.await(); // 等待所有线程完成
System.out.println("所有工作线程已完成,主线程继续执行");
}
static class Worker implements Runnable {
private final CountDownLatch latch;
private final String name;
Worker(CountDownLatch latch, String name) {
this.latch = latch;
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + " 开始工作");
Thread.sleep(2000); // 模拟工作
System.out.println(name + " 完成工作");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 完成工作后递减计数
}
}
}
}
AQS共享锁实现原理
CountDownLatch基于AbstractQueuedSynchronizer(AQS)的共享锁模式实现,这与ReentrantLock的独占锁模式不同。
AQS共享锁模式特点
- 多个线程可以同时获取锁:所有等待的线程在条件满足时会同时被唤醒
- 状态共享:多个线程共享同一个同步状态
- 传播性唤醒:一个线程的释放操作可能唤醒多个等待线程
CountDownLatch中的AQS实现
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 构造函数:设置初始状态
Sync(int count) {
setState(count);
}
// 获取当前计数
int getCount() {
return getState();
}
// 尝试获取共享锁(用于await)
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 尝试释放共享锁(用于countDown)
protected boolean tryReleaseShared(int releases) {
// 循环CAS递减计数
for (;;) {
int c = getState();
if (c == 0)
return false; // 已经为0,无需释放
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0; // 返回是否需要唤醒等待线程
}
}
}
状态变化流程
// 初始状态:state = count(如3)
// ┌─────────────────────────────────────┐
// │ state = 3 (初始计数) │
// └─────────────────────────────────────┘
// ↓
// 第一次countDown(): state = 2
// ┌─────────────────────────────────────┐
// │ state = 2 (还有2个任务未完成) │
// └─────────────────────────────────────┘
// ↓
// 第二次countDown(): state = 1
// ┌─────────────────────────────────────┐
// │ state = 1 (还有1个任务未完成) │
// └─────────────────────────────────────┘
// ↓
// 第三次countDown(): state = 0
// ┌─────────────────────────────────────┐
// │ state = 0 (所有任务完成,唤醒等待线程) │
// └─────────────────────────────────────┘
源码深度分析
构造函数分析
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// Sync构造函数
Sync(int count) {
setState(count); // 设置AQS的state为初始计数值
}
关键点:
- 参数校验:count必须非负
- 直接设置AQS的state为初始计数值
- 创建内部Sync对象
await()方法分析
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS中的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch中的tryAcquireShared实现
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
执行流程:
- 检查线程中断状态
- 调用tryAcquireShared检查state是否为0
- 如果state为0,直接返回(不需要等待)
- 如果state不为0,进入doAcquireSharedInterruptibly等待
countDown()方法分析
public void countDown() {
sync.releaseShared(1);
}
// AQS中的releaseShared方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); // 唤醒等待线程
return true;
}
return false;
}
// CountDownLatch中的tryReleaseShared实现
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false; // 已经为0,无需释放
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0; // 只有当计数变为0时才返回true
}
}
执行流程:
- 使用CAS循环递减state
- 如果state已经为0,直接返回false
- 如果递减后state变为0,返回true并触发doReleaseShared
- doReleaseShared会唤醒所有等待的线程
带超时的await()方法
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// AQS中的tryAcquireSharedNanos方法
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
特点:
- 支持超时等待
- 超时后返回false,正常完成返回true
- 同样支持中断响应
实际应用场景
1. 主线程等待多个子线程完成
public class ParallelTaskExample {
public static void main(String[] args) throws InterruptedException {
int taskCount = 5;
CountDownLatch latch = new CountDownLatch(taskCount);
List<Future<String>> futures = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(taskCount);
// 提交多个任务
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
Future<String> future = executor.submit(() -> {
try {
// 模拟任务执行
Thread.sleep(1000 + taskId * 500);
return "Task " + taskId + " completed";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Task " + taskId + " interrupted";
} finally {
latch.countDown(); // 任务完成后递减计数
}
});
futures.add(future);
}
// 等待所有任务完成
System.out.println("等待所有任务完成...");
latch.await();
System.out.println("所有任务已完成!");
// 收集结果
for (Future<String> future : futures) {
try {
System.out.println(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
}
}
2. 服务启动协调
public class ServiceStartupCoordinator {
private final CountDownLatch startupLatch;
private final List<Service> services;
public ServiceStartupCoordinator(List<Service> services) {
this.services = services;
this.startupLatch = new CountDownLatch(services.size());
}
public void startAllServices() throws InterruptedException {
// 启动所有服务
for (Service service : services) {
new Thread(() -> {
try {
service.start();
System.out.println(service.getName() + " 启动完成");
} catch (Exception e) {
System.err.println(service.getName() + " 启动失败: " + e.getMessage());
} finally {
startupLatch.countDown();
}
}).start();
}
// 等待所有服务启动完成
System.out.println("等待所有服务启动...");
startupLatch.await();
System.out.println("所有服务启动完成,系统就绪!");
}
interface Service {
void start() throws Exception;
String getName();
}
static class DatabaseService implements Service {
@Override
public void start() throws Exception {
Thread.sleep(2000); // 模拟数据库连接初始化
}
@Override
public String getName() {
return "Database Service";
}
}
static class CacheService implements Service {
@Override
public void start() throws Exception {
Thread.sleep(1000); // 模拟缓存初始化
}
@Override
public String getName() {
return "Cache Service";
}
}
}
3. 并发测试场景
public class ConcurrentTester {
public void testConcurrentAccess() throws InterruptedException {
int threadCount = 100;
CountDownLatch startLatch = new CountDownLatch(1); // 控制所有线程同时开始
CountDownLatch endLatch = new CountDownLatch(threadCount); // 等待所有线程完成
AtomicInteger counter = new AtomicInteger(0);
List<Thread> threads = new ArrayList<>();
// 创建测试线程
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(() -> {
try {
startLatch.await(); // 等待开始信号
// 执行并发操作
for (int j = 0; j < 1000; j++) {
counter.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
});
threads.add(thread);
thread.start();
}
System.out.println("所有线程已准备就绪,开始并发测试...");
long startTime = System.currentTimeMillis();
startLatch.countDown(); // 发出开始信号
endLatch.await(); // 等待所有线程完成
long endTime = System.currentTimeMillis();
System.out.println("并发测试完成!");
System.out.println("执行时间: " + (endTime - startTime) + "ms");
System.out.println("最终计数: " + counter.get());
System.out.println("期望计数: " + (threadCount * 1000));
}
}
与其他同步工具对比
CountDownLatch vs CyclicBarrier
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 可重用性 | 一次性使用 | 可重复使用 |
| 等待方式 | 一个或多个线程等待其他线程 | 所有线程相互等待 |
| 计数方向 | 递减(countDown) | 递增(await) |
| 触发条件 | 计数归零 | 所有线程到达屏障点 |
| 回调支持 | 不支持 | 支持barrierAction |
| 异常处理 | 不影响其他线程 | 一个线程异常会影响所有线程 |
// CountDownLatch示例:主线程等待工作线程
CountDownLatch latch = new CountDownLatch(3);
// 工作线程完成后调用latch.countDown()
// 主线程调用latch.await()等待
// CyclicBarrier示例:所有线程相互等待
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程都到达了屏障点!");
});
// 每个线程都调用barrier.await()等待其他线程
CountDownLatch vs Semaphore
| 特性 | CountDownLatch | Semaphore |
|---|---|---|
| 用途 | 等待事件完成 | 控制资源访问 |
| 计数含义 | 剩余任务数 | 可用许可数 |
| 操作方式 | 只能递减 | 可增可减 |
| 阻塞条件 | 计数不为0时阻塞 | 无可用许可时阻塞 |
| 典型场景 | 任务协调 | 资源池管理 |
// CountDownLatch:等待任务完成
CountDownLatch latch = new CountDownLatch(5);
latch.await(); // 等待5个任务完成
latch.countDown(); // 完成一个任务
// Semaphore:控制资源访问
Semaphore semaphore = new Semaphore(3);
semaphore.acquire(); // 获取一个许可
semaphore.release(); // 释放一个许可
性能分析和优化
性能特点
- 轻量级同步:基于AQS实现,性能优秀
- 无锁竞争:countDown操作使用CAS,避免重量级锁
- 批量唤醒:所有等待线程同时被唤醒,效率高
- 内存开销小:只维护一个int状态和AQS队列
性能测试
public class CountDownLatchPerformanceTest {
public static void main(String[] args) throws InterruptedException {
testPerformance(10, 1000); // 10个线程,1000次操作
testPerformance(100, 1000); // 100个线程,1000次操作
testPerformance(1000, 100); // 1000个线程,100次操作
}
private static void testPerformance(int threadCount, int operationsPerThread)
throws InterruptedException {
System.out.println(String.format("\n=== 性能测试:%d个线程,每个线程%d次操作 ===",
threadCount, operationsPerThread));
long startTime = System.currentTimeMillis();
for (int round = 0; round < 10; round++) {
CountDownLatch latch = new CountDownLatch(threadCount);
// 启动工作线程
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
// 模拟工作
for (int j = 0; j < operationsPerThread; j++) {
// 简单的计算操作
Math.sqrt(j);
}
} finally {
latch.countDown();
}
}).start();
}
latch.await(); // 等待所有线程完成
}
long endTime = System.currentTimeMillis();
long totalOperations = threadCount * operationsPerThread * 10;
double avgTime = (endTime - startTime) / 10.0;
System.out.println("平均执行时间: " + avgTime + "ms");
System.out.println("总操作数: " + totalOperations);
System.out.println("吞吐量: " + (totalOperations / (endTime - startTime) * 1000) + " ops/sec");
}
}
常见面试题
1. CountDownLatch的工作原理是什么?
答案要点:
- 基于AQS的共享锁模式实现
- 使用state字段存储计数值
- await()方法检查state是否为0,不为0则阻塞等待
- countDown()方法使用CAS递减state,为0时唤醒所有等待线程
- 支持多个线程同时等待,一次性唤醒所有等待线程
2. CountDownLatch与CyclicBarrier有什么区别?
答案要点:
- 可重用性:CountDownLatch一次性,CyclicBarrier可重用
- 等待模式:CountDownLatch是一个或多个线程等待其他线程,CyclicBarrier是所有线程相互等待
- 计数方向:CountDownLatch递减,CyclicBarrier递增
- 异常处理:CyclicBarrier中一个线程异常会影响所有线程
- 回调支持:CyclicBarrier支持barrierAction回调
3. CountDownLatch的使用场景有哪些?
答案要点:
- 主线程等待子线程:主线程等待多个工作线程完成任务
- 服务启动协调:等待多个服务组件初始化完成
- 并发测试:控制多个线程同时开始执行
- 分阶段执行:确保前一阶段完成后再执行下一阶段
- 资源预加载:等待多个资源加载完成
4. 如何避免CountDownLatch的常见陷阱?
答案要点:
- 忘记countDown:使用try-finally确保countDown被调用
- 计数不匹配:确保countDown调用次数等于初始计数
- 无限等待:使用带超时的await方法
- 重复使用:CountDownLatch是一次性的,不能重置
- 异常处理:在finally块中调用countDown
5. CountDownLatch的性能特点是什么?
答案要点:
- 轻量级:基于AQS实现,性能优秀
- 无锁操作:countDown使用CAS,避免重量级锁
- 批量唤醒:所有等待线程同时被唤醒
- 内存效率:只维护一个int状态和队列结构
- 扩展性好:支持大量线程同时等待
最佳实践
1. 正确的使用模式
public class BestPracticeExample {
// ✅ 推荐的使用模式
public void correctUsage() throws InterruptedException {
int taskCount = 5;
CountDownLatch latch = new CountDownLatch(taskCount);
ExecutorService executor = Executors.newFixedThreadPool(taskCount);
try {
// 提交任务
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
executor.submit(() -> {
try {
// 执行任务逻辑
performTask(taskId);
} catch (Exception e) {
System.err.println("Task " + taskId + " failed: " + e.getMessage());
} finally {
latch.countDown(); // 确保总是调用
}
});
}
// 等待完成(带超时)
if (!latch.await(30, TimeUnit.SECONDS)) {
System.err.println("Tasks did not complete within timeout");
}
} finally {
executor.shutdown();
}
}
private void performTask(int taskId) throws InterruptedException {
// 模拟任务执行
Thread.sleep(1000);
System.out.println("Task " + taskId + " completed");
}
}
2. 异常处理最佳实践
public class ExceptionHandlingExample {
public void robustTaskExecution() throws InterruptedException {
int taskCount = 3;
CountDownLatch latch = new CountDownLatch(taskCount);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failureCount = new AtomicInteger(0);
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
// 可能抛出异常的任务
if (taskId == 1) {
throw new RuntimeException("Simulated failure");
}
Thread.sleep(1000);
successCount.incrementAndGet();
System.out.println("Task " + taskId + " succeeded");
} catch (Exception e) {
failureCount.incrementAndGet();
System.err.println("Task " + taskId + " failed: " + e.getMessage());
} finally {
latch.countDown(); // 无论成功失败都要countDown
}
}).start();
}
latch.await();
System.out.println("Execution completed:");
System.out.println("Success: " + successCount.get());
System.out.println("Failure: " + failureCount.get());
}
}
常见陷阱和解决方案
1. 忘记调用countDown()
// ❌ 问题代码
public class ProblematicCode {
public void riskyTask(CountDownLatch latch) {
try {
// 执行任务
if (someErrorCondition()) {
return; // 直接返回,忘记countDown
}
// 正常逻辑
} catch (Exception e) {
// 异常处理,但忘记countDown
handleException(e);
return;
}
latch.countDown(); // 只有在正常情况下才会执行
}
}
// ✅ 解决方案
public class SafeCode {
public void safeTask(CountDownLatch latch) {
try {
// 执行任务
if (someErrorCondition()) {
return;
}
// 正常逻辑
} catch (Exception e) {
handleException(e);
} finally {
latch.countDown(); // 确保总是被调用
}
}
}
2. 计数不匹配
// ❌ 问题:计数与实际任务数不匹配
public class CountMismatchProblem {
public void incorrectCount() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5); // 设置了5
// 但只启动了3个任务
for (int i = 0; i < 3; i++) {
new Thread(() -> {
// 执行任务
latch.countDown();
}).start();
}
latch.await(); // 永远等待,因为计数永远不会归零
}
}
// ✅ 解决方案:动态计算或验证
public class CountMatchSolution {
public void correctCount() throws InterruptedException {
List<Runnable> tasks = prepareTasks();
int actualTaskCount = tasks.size();
CountDownLatch latch = new CountDownLatch(actualTaskCount);
for (Runnable task : tasks) {
new Thread(() -> {
try {
task.run();
} finally {
latch.countDown();
}
}).start();
}
latch.await();
}
private List<Runnable> prepareTasks() {
// 准备任务列表
return Arrays.asList(
() -> System.out.println("Task 1"),
() -> System.out.println("Task 2"),
() -> System.out.println("Task 3")
);
}
}
总结
CountDownLatch是Java并发编程中的重要同步工具,它基于AQS的共享锁模式实现,为多线程协调提供了简洁而强大的解决方案。
核心特性总结
- 一次性同步工具:计数器不可重置,适合一次性的任务协调
- 共享等待机制:多个线程可以同时等待,所有等待线程会同时被唤醒
- 灵活的等待方式:支持无限等待、超时等待和中断响应
- 高性能实现:基于AQS和CAS操作,避免重量级锁
- 线程安全:所有操作都是线程安全的
技术实现要点
- AQS共享锁模式:与ReentrantLock的独占锁不同,支持多个线程同时获取
- 状态管理:使用state字段存储计数值,通过CAS操作保证原子性
- 等待机制:await()检查state是否为0,不为0则进入等待队列
- 唤醒机制:countDown()递减state,为0时唤醒所有等待线程
适用场景
- 主线程等待子线程:等待多个工作线程完成任务
- 服务启动协调:等待多个组件初始化完成
- 并发测试:控制多个线程同时开始执行
- 分阶段执行:确保前置任务完成后再执行后续任务
- 批量处理:等待批量任务全部完成
最佳实践建议
- 正确使用模式:始终在finally块中调用countDown()
- 合理设置计数:确保计数值与实际任务数匹配
- 使用超时等待:避免无限等待导致的死锁
- 异常处理:无论任务成功失败都要countDown()
- 资源管理:及时释放引用,避免内存泄漏
- 监控调试:添加适当的日志和监控
常见陷阱避免
- 忘记countDown():使用try-finally确保调用
- 计数不匹配:动态计算任务数量
- 重复使用:每次使用创建新的CountDownLatch
- 无限等待:使用超时机制和异常处理
- 内存泄漏:及时清理引用和设置清理机制
CountDownLatch虽然概念简单,但在实际使用中需要注意很多细节。掌握其原理和最佳实践,能够帮助我们写出更安全、高效的并发代码,也是Java并发编程面试中的重要考点。
文章标签
冬眠
博主专注于技术、阅读与思考。在这里记录学习、思考与生活。
系列:Java 锁
第 5 篇,共 6 篇
