概述
Semaphore是什么?
Semaphore(信号量)是Java并发包中的一个同步工具,用于控制同时访问特定资源的线程数量。它维护了一组许可证(permits),线程在访问资源前必须先获取许可证,使用完毕后释放许可证。
核心特性
- 许可证管理:维护固定数量的许可证,控制并发访问
- 资源池控制:限制同时使用资源的线程数量
- 公平性支持:支持公平和非公平两种模式
- 批量操作:支持一次获取/释放多个许可证
- 中断支持:支持可中断的许可证获取
- 超时机制:支持带超时的许可证获取
基本使用示例
public class SemaphoreExample {
// 创建一个允许3个线程同时访问的信号量
private static final Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 获取许可证
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 获得许可证");
// 模拟业务处理
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 释放许可证");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放许可证
semaphore.release();
}
}, "Thread-" + i).start();
}
}
}
基础概念和使用方式
构造函数
// 创建具有指定许可数的非公平信号量
public Semaphore(int permits)
// 创建具有指定许可数和公平性设置的信号量
public Semaphore(int permits, boolean fair)
示例:
// 非公平模式(默认)
Semaphore semaphore1 = new Semaphore(5);
// 公平模式
Semaphore semaphore2 = new Semaphore(5, true);
核心方法
1. 获取许可证
// 阻塞获取一个许可证
public void acquire() throws InterruptedException
// 阻塞获取指定数量的许可证
public void acquire(int permits) throws InterruptedException
// 不可中断地获取一个许可证
public void acquireUninterruptibly()
// 尝试获取一个许可证(非阻塞)
public boolean tryAcquire()
// 尝试在指定时间内获取许可证
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
2. 释放许可证
// 释放一个许可证
public void release()
// 释放指定数量的许可证
public void release(int permits)
3. 查询方法
// 获取可用许可证数量
public int availablePermits()
// 获取等待许可证的线程数(估计值)
public int getQueueLength()
// 是否有线程在等待许可证
public boolean hasQueuedThreads()
公平性模式
非公平模式(默认)
- 性能更好
- 可能导致线程饥饿
- 新来的线程可能插队获取许可证
公平模式
- 严格按照FIFO顺序分配许可证
- 避免线程饥饿
- 性能相对较差
public class FairnessComparison {
public static void main(String[] args) throws InterruptedException {
// 非公平模式测试
testSemaphore(new Semaphore(2, false), "非公平模式");
Thread.sleep(3000);
// 公平模式测试
testSemaphore(new Semaphore(2, true), "公平模式");
}
private static void testSemaphore(Semaphore semaphore, String mode) {
System.out.println("\n=== " + mode + " ===");
for (int i = 0; i < 5; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread-" + threadId + " 尝试获取许可证");
semaphore.acquire();
System.out.println("Thread-" + threadId + " 获得许可证");
Thread.sleep(1000);
System.out.println("Thread-" + threadId + " 释放许可证");
semaphore.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 让线程按顺序启动
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
AQS实现原理
AQS共享锁模式
Semaphore基于AbstractQueuedSynchronizer(AQS)的共享锁模式实现:
- state字段:表示可用许可证数量
- 共享模式:多个线程可以同时获取许可证
- 同步队列:管理等待许可证的线程
核心实现机制
1. 状态管理
// AQS中的state字段表示可用许可证数量
private volatile int state;
// 获取许可证时递减state
protected int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 释放许可证时递增state
protected boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
2. 公平性实现
非公平模式(NonfairSync):
static final class NonfairSync extends Sync {
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
公平模式(FairSync):
static final class FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
for (;;) {
// 检查是否有前驱节点在等待
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
同步队列管理
// AQS同步队列节点结构
static final class Node {
static final int SHARED = 1; // 共享模式标记
volatile Thread thread; // 等待的线程
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile int waitStatus; // 等待状态
}
// 入队操作
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
源码深度分析
Semaphore类结构
public class Semaphore implements java.io.Serializable {
private final Sync sync;
// 内部同步器抽象类
abstract static class Sync extends AbstractQueuedSynchronizer {
// 公共方法实现
}
// 非公平同步器
static final class NonfairSync extends Sync {
// 非公平获取实现
}
// 公平同步器
static final class FairSync extends Sync {
// 公平获取实现
}
}
核心方法源码分析
1. acquire()方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS中的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 可中断的共享获取
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2. release()方法
public void release() {
sync.releaseShared(1);
}
// AQS中的releaseShared方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 释放共享锁并唤醒后继节点
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
3. tryAcquire()方法
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// AQS中的tryAcquireSharedNanos方法
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
批量操作实现
// 批量获取许可证
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// 批量释放许可证
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
// 批量尝试获取
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
实际应用场景
1. 数据库连接池管理
public class DatabaseConnectionPool {
private final Semaphore semaphore;
private final List<Connection> connections;
public DatabaseConnectionPool(int maxConnections) {
this.semaphore = new Semaphore(maxConnections);
this.connections = new ArrayList<>(maxConnections);
// 初始化连接池
for (int i = 0; i < maxConnections; i++) {
connections.add(createConnection());
}
}
public Connection getConnection() throws InterruptedException {
// 获取许可证
semaphore.acquire();
synchronized (connections) {
if (!connections.isEmpty()) {
return connections.remove(connections.size() - 1);
}
}
// 如果没有可用连接,创建新连接
return createConnection();
}
public void returnConnection(Connection connection) {
synchronized (connections) {
connections.add(connection);
}
// 释放许可证
semaphore.release();
}
private Connection createConnection() {
// 创建数据库连接的逻辑
return null; // 示例代码
}
}
2. HTTP请求限流
public class RateLimiter {
private final Semaphore semaphore;
private final ScheduledExecutorService scheduler;
public RateLimiter(int maxRequests, long refillPeriod, TimeUnit unit) {
this.semaphore = new Semaphore(maxRequests);
this.scheduler = Executors.newScheduledThreadPool(1);
// 定期补充许可证
scheduler.scheduleAtFixedRate(() -> {
int permits = maxRequests - semaphore.availablePermits();
if (permits > 0) {
semaphore.release(permits);
}
}, refillPeriod, refillPeriod, unit);
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return semaphore.tryAcquire(timeout, unit);
}
public void shutdown() {
scheduler.shutdown();
}
}
3. 停车场管理系统
public class ParkingLot {
private final Semaphore parkingSpaces;
private final AtomicInteger occupiedSpaces;
public ParkingLot(int totalSpaces) {
this.parkingSpaces = new Semaphore(totalSpaces, true); // 使用公平模式
this.occupiedSpaces = new AtomicInteger(0);
}
public boolean parkCar(String carId) {
try {
// 尝试获取停车位(等待最多30秒)
if (parkingSpaces.tryAcquire(30, TimeUnit.SECONDS)) {
int occupied = occupiedSpaces.incrementAndGet();
System.out.println(carId + " 停车成功,当前占用: " + occupied);
return true;
} else {
System.out.println(carId + " 停车失败,停车场已满");
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
public void leaveCar(String carId) {
int occupied = occupiedSpaces.decrementAndGet();
parkingSpaces.release();
System.out.println(carId + " 离开停车场,当前占用: " + occupied);
}
public int getAvailableSpaces() {
return parkingSpaces.availablePermits();
}
public int getOccupiedSpaces() {
return occupiedSpaces.get();
}
}
与其他同步工具对比
对比表格
| 特性 | Semaphore | CountDownLatch | CyclicBarrier | ReentrantLock |
|---|---|---|---|---|
| 用途 | 资源访问控制 | 等待多个任务完成 | 多线程同步点 | 互斥锁 |
| 许可证数量 | 可配置多个 | 固定倒计时 | 固定参与者数量 | 1个(独占) |
| 可重用性 | 可重用 | 一次性 | 可重用 | 可重用 |
| 公平性 | 支持 | 不适用 | 支持 | 支持 |
| 中断支持 | 支持 | 支持 | 支持 | 支持 |
| 超时机制 | 支持 | 支持 | 支持 | 支持 |
| 批量操作 | 支持 | 不支持 | 不支持 | 不支持 |
| 锁模式 | 共享锁 | 共享锁 | 共享锁 | 独占锁 |
选择指南
使用Semaphore的场景:
- 需要控制同时访问资源的线程数量
- 实现资源池管理
- 需要限流控制
- 需要批量获取/释放许可证
使用CountDownLatch的场景:
- 主线程等待多个子线程完成
- 服务启动时等待多个组件初始化
- 一次性同步场景
使用CyclicBarrier的场景:
- 多线程分阶段执行
- 需要重复使用的同步点
- 所有线程必须同时到达某个点
使用ReentrantLock的场景:
- 需要独占访问
- 需要可重入锁
- 需要条件变量(Condition)
性能分析和测试
公平 vs 非公平性能对比
public class SemaphorePerformanceTest {
private static final int THREAD_COUNT = 100;
private static final int ITERATIONS = 10000;
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Semaphore性能测试 ===");
// 测试非公平模式
testSemaphore(new Semaphore(10, false), "非公平模式");
// 测试公平模式
testSemaphore(new Semaphore(10, true), "公平模式");
// 测试不同许可证数量的影响
testDifferentPermits();
}
private static void testSemaphore(Semaphore semaphore, String mode)
throws InterruptedException {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
long startTime = System.nanoTime();
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
try {
startLatch.await();
for (int j = 0; j < ITERATIONS; j++) {
semaphore.acquire();
// 模拟短暂工作
Thread.yield();
semaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
}).start();
}
startLatch.countDown(); // 开始测试
endLatch.await(); // 等待所有线程完成
long endTime = System.nanoTime();
long duration = (endTime - startTime) / 1_000_000; // 转换为毫秒
System.out.printf("%s: %d线程 x %d次操作 = 总耗时 %d ms%n",
mode, THREAD_COUNT, ITERATIONS, duration);
}
private static void testDifferentPermits() throws InterruptedException {
System.out.println("\n=== 不同许可证数量性能测试 ===");
int[] permits = {1, 5, 10, 20, 50};
for (int permit : permits) {
Semaphore semaphore = new Semaphore(permit, false);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(50);
long startTime = System.nanoTime();
for (int i = 0; i < 50; i++) {
new Thread(() -> {
try {
startLatch.await();
for (int j = 0; j < 1000; j++) {
semaphore.acquire();
Thread.sleep(1); // 模拟工作
semaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
}).start();
}
startLatch.countDown();
endLatch.await();
long endTime = System.nanoTime();
long duration = (endTime - startTime) / 1_000_000;
System.out.printf("许可证数量: %2d, 耗时: %d ms%n", permit, duration);
}
}
}
常见面试题
1. Semaphore的工作原理是什么?
答案要点:
- 基于AQS共享锁模式实现
- 使用state字段表示可用许可证数量
- acquire()时递减state,release()时递增state
- 支持公平和非公平两种模式
- 通过同步队列管理等待线程
2. Semaphore与CountDownLatch有什么区别?
答案要点:
- 用途不同:Semaphore控制资源访问,CountDownLatch等待任务完成
- 可重用性:Semaphore可重用,CountDownLatch一次性
- 操作方向:Semaphore可增可减,CountDownLatch只能递减
- 许可证管理:Semaphore支持批量操作,CountDownLatch不支持
3. 如何选择公平和非公平模式?
答案要点:
- 非公平模式:性能更好,适合高并发场景,可能导致饥饿
- 公平模式:严格FIFO,避免饥饿,性能相对较差
- 选择依据:根据业务需求权衡性能和公平性
4. Semaphore如何避免许可证泄漏?
答案要点:
public void safeOperation() {
try {
semaphore.acquire();
// 业务逻辑
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} finally {
semaphore.release(); // 确保释放
}
}
5. Semaphore的性能特点是什么?
答案要点:
- 非公平模式性能优于公平模式
- 许可证数量影响并发度和性能
- 批量操作比单个操作效率更高
- 内存使用相对较少
6. 如何实现一个简单的限流器?
答案示例:
public class SimpleRateLimiter {
private final Semaphore semaphore;
private final ScheduledExecutorService scheduler;
public SimpleRateLimiter(int maxRequests, long period, TimeUnit unit) {
this.semaphore = new Semaphore(maxRequests);
this.scheduler = Executors.newScheduledThreadPool(1);
// 定期补充许可证
scheduler.scheduleAtFixedRate(() -> {
int toRelease = maxRequests - semaphore.availablePermits();
if (toRelease > 0) {
semaphore.release(toRelease);
}
}, period, period, unit);
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
}
7. Semaphore在什么情况下会发生死锁?
答案要点:
- 循环等待许可证
- 获取许可证后忘记释放
- 多个Semaphore之间的循环依赖
8. 如何监控Semaphore的使用情况?
答案示例:
public class SemaphoreMonitor {
private final Semaphore semaphore;
private final AtomicLong acquireCount = new AtomicLong();
private final AtomicLong releaseCount = new AtomicLong();
public SemaphoreMonitor(int permits) {
this.semaphore = new Semaphore(permits);
}
public void acquire() throws InterruptedException {
semaphore.acquire();
acquireCount.incrementAndGet();
}
public void release() {
semaphore.release();
releaseCount.incrementAndGet();
}
public void printStats() {
System.out.println("可用许可证: " + semaphore.availablePermits());
System.out.println("等待线程数: " + semaphore.getQueueLength());
System.out.println("获取次数: " + acquireCount.get());
System.out.println("释放次数: " + releaseCount.get());
}
}
最佳实践和常见陷阱
最佳实践
1. 使用try-finally确保许可证释放
// ✅ 正确做法
public void correctUsage() {
try {
semaphore.acquire();
// 业务逻辑
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} finally {
semaphore.release(); // 确保释放
}
}
// ❌ 错误做法
public void incorrectUsage() throws InterruptedException {
semaphore.acquire();
// 业务逻辑
semaphore.release(); // 异常时可能不会执行
}
2. 合理选择公平性模式
// 高性能场景使用非公平模式
Semaphore highPerformance = new Semaphore(10, false);
// 需要公平性的场景使用公平模式
Semaphore fairAccess = new Semaphore(10, true);
3. 批量操作优化
// ✅ 批量获取
public void batchOperation(int count) {
try {
semaphore.acquire(count);
// 批量处理
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(count);
}
}
常见陷阱
1. 许可证泄漏
// ❌ 许可证泄漏示例
public class PermitLeak {
private final Semaphore semaphore = new Semaphore(5);
public void leakyMethod() throws InterruptedException {
semaphore.acquire();
if (someCondition()) {
return; // 忘记释放许可证!
}
// 正常逻辑
semaphore.release();
}
// ✅ 修复版本
public void fixedMethod() {
try {
semaphore.acquire();
if (someCondition()) {
return;
}
// 正常逻辑
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 确保释放
}
}
private boolean someCondition() {
return Math.random() > 0.5;
}
}
2. 许可证数量不匹配
// ❌ 获取和释放数量不匹配
public void mismatchedOperation() {
try {
semaphore.acquire(3); // 获取3个许可证
// 业务逻辑
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(2); // 只释放2个许可证!
}
}
// ✅ 正确做法
public void correctOperation() {
int permits = 3;
try {
semaphore.acquire(permits);
// 业务逻辑
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(permits); // 释放相同数量
}
}
代码示例和实战案例
完整的资源池管理系统
public class ResourcePoolManager<T> {
private final Semaphore semaphore;
private final Queue<T> resources;
private final Object lock = new Object();
private final ResourceFactory<T> factory;
private final int maxSize;
private volatile boolean shutdown = false;
public interface ResourceFactory<T> {
T create();
void destroy(T resource);
boolean validate(T resource);
}
public ResourcePoolManager(int maxSize, ResourceFactory<T> factory) {
this.maxSize = maxSize;
this.factory = factory;
this.semaphore = new Semaphore(maxSize, true); // 使用公平模式
this.resources = new LinkedList<>();
// 预创建资源
for (int i = 0; i < maxSize; i++) {
resources.offer(factory.create());
}
}
public T acquire() throws InterruptedException {
if (shutdown) {
throw new IllegalStateException("Resource pool is shutdown");
}
semaphore.acquire();
synchronized (lock) {
T resource = resources.poll();
if (resource != null && factory.validate(resource)) {
return resource;
}
// 创建新资源或重新验证
return factory.create();
}
}
public void release(T resource) {
if (resource == null) {
return;
}
synchronized (lock) {
if (factory.validate(resource) && resources.size() < maxSize) {
resources.offer(resource);
} else {
factory.destroy(resource);
}
}
semaphore.release();
}
public void shutdown() {
shutdown = true;
synchronized (lock) {
while (!resources.isEmpty()) {
factory.destroy(resources.poll());
}
}
}
public int getAvailableResources() {
return semaphore.availablePermits();
}
public int getWaitingThreads() {
return semaphore.getQueueLength();
}
}
总结和学习建议
核心知识点总结
-
基本概念
- Semaphore是基于许可证的同步工具
- 控制同时访问资源的线程数量
- 支持公平和非公平两种模式
-
实现原理
- 基于AQS共享锁模式
- state字段表示可用许可证数量
- 支持批量获取和释放操作
-
应用场景
- 资源池管理(连接池、线程池等)
- 限流控制
- 并发访问控制
-
性能特点
- 非公平模式性能更好
- 公平模式避免饥饿
- 批量操作效率更高
学习建议
-
理论学习
- 深入理解AQS共享锁模式
- 掌握公平性和非公平性的区别
- 了解与其他同步工具的对比
-
实践练习
- 实现简单的资源池管理
- 编写限流器
- 测试不同模式的性能差异
-
源码阅读
- 阅读Semaphore源码
- 理解AQS的实现机制
- 分析公平和非公平的实现差异
-
面试准备
- 掌握常见面试题的答案要点
- 准备实际应用场景的案例
- 理解最佳实践和常见陷阱
进阶学习方向
-
并发工具深入
- 学习其他JUC工具类
- 理解并发编程模式
- 掌握性能优化技巧
-
实际项目应用
- 在项目中合理使用Semaphore
- 结合业务场景选择合适的同步工具
- 监控和调优并发性能
通过系统学习和实践,你将能够熟练掌握Semaphore的使用,并在实际项目中合理应用这个强大的同步工具。
文章标签
冬眠
博主专注于技术、阅读与思考。在这里记录学习、思考与生活。
系列:Java 锁
第 6 篇,共 6 篇
