1. 算法概述
基于数量的滑动窗口限流算法是滑动窗口算法的一种特殊实现,它专注于控制固定时间窗口内的请求数量。与基于时间的滑动窗口不同,这种算法更注重精确的数量控制和高效的内存使用。
2. 核心特点
2.1 设计理念
- 精确计数:严格控制窗口内的请求数量
- 内存优化:使用固定大小的数据结构
- 高性能:O(1)时间复杂度的操作
- 线程安全:支持高并发环境
2.2 与传统滑动窗口的区别
| 特性 | 传统滑动窗口 | 基于数量的滑动窗口 |
|---|---|---|
| 存储方式 | 时间戳队列 | 固定大小数组 |
| 内存使用 | 动态变化 | 固定大小 |
| 时间复杂度 | O(n) | O(1) |
| 精确度 | 高 | 中等 |
| 适用场景 | 精确限流 | 高性能限流 |
3. 算法实现
3.1 基础实现
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class CountBasedSlidingWindow {
private final int maxRequests; // 最大请求数
private final long windowSizeMs; // 窗口大小(毫秒)
private final int bucketCount; // 桶的数量
private final long bucketSizeMs; // 每个桶的时间长度
private final AtomicInteger[] buckets; // 计数桶数组
private final AtomicLong lastUpdateTime; // 最后更新时间
public CountBasedSlidingWindow(int maxRequests, long windowSizeMs, int bucketCount) {
this.maxRequests = maxRequests;
this.windowSizeMs = windowSizeMs;
this.bucketCount = bucketCount;
this.bucketSizeMs = windowSizeMs / bucketCount;
this.buckets = new AtomicInteger[bucketCount];
this.lastUpdateTime = new AtomicLong(System.currentTimeMillis());
// 初始化桶
for (int i = 0; i < bucketCount; i++) {
buckets[i] = new AtomicInteger(0);
}
}
public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();
// 更新过期的桶
updateExpiredBuckets(currentTime);
// 计算当前总请求数
int totalRequests = getTotalRequests();
if (totalRequests >= maxRequests) {
return false;
}
// 增加当前桶的计数
int currentBucketIndex = getCurrentBucketIndex(currentTime);
buckets[currentBucketIndex].incrementAndGet();
return true;
}
private void updateExpiredBuckets(long currentTime) {
long lastUpdate = lastUpdateTime.get();
long timeDiff = currentTime - lastUpdate;
if (timeDiff >= bucketSizeMs) {
// 计算需要重置的桶数量
int bucketsToReset = (int) Math.min(timeDiff / bucketSizeMs, bucketCount);
// 重置过期的桶
for (int i = 0; i < bucketsToReset; i++) {
int bucketIndex = (int) (((lastUpdate / bucketSizeMs) + i + 1) % bucketCount);
buckets[bucketIndex].set(0);
}
lastUpdateTime.set(currentTime);
}
}
private int getCurrentBucketIndex(long currentTime) {
return (int) ((currentTime / bucketSizeMs) % bucketCount);
}
private int getTotalRequests() {
int total = 0;
for (AtomicInteger bucket : buckets) {
total += bucket.get();
}
return total;
}
public int getCurrentCount() {
updateExpiredBuckets(System.currentTimeMillis());
return getTotalRequests();
}
public double getCurrentRate() {
return (double) getCurrentCount() / maxRequests;
}
}
3.2 优化版本实现
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;
public class OptimizedCountBasedSlidingWindow {
private final int maxRequests;
private final long windowSizeMs;
private final int bucketCount;
private final long bucketSizeMs;
private final AtomicIntegerArray buckets;
private final AtomicLong lastResetTime;
private volatile int currentBucketIndex;
public OptimizedCountBasedSlidingWindow(int maxRequests, long windowSizeMs, int bucketCount) {
this.maxRequests = maxRequests;
this.windowSizeMs = windowSizeMs;
this.bucketCount = bucketCount;
this.bucketSizeMs = windowSizeMs / bucketCount;
this.buckets = new AtomicIntegerArray(bucketCount);
this.lastResetTime = new AtomicLong(System.currentTimeMillis());
this.currentBucketIndex = 0;
}
public boolean allowRequest() {
long currentTime = System.currentTimeMillis();
// 快速路径:检查是否需要重置桶
if (shouldResetBuckets(currentTime)) {
resetExpiredBuckets(currentTime);
}
// 计算当前请求总数
int totalRequests = calculateTotalRequests();
if (totalRequests >= maxRequests) {
return false;
}
// 原子性增加当前桶计数
int bucketIndex = getBucketIndex(currentTime);
buckets.incrementAndGet(bucketIndex);
return true;
}
private boolean shouldResetBuckets(long currentTime) {
return currentTime - lastResetTime.get() >= bucketSizeMs;
}
private synchronized void resetExpiredBuckets(long currentTime) {
long lastReset = lastResetTime.get();
long timeDiff = currentTime - lastReset;
if (timeDiff >= bucketSizeMs) {
int bucketsToReset = (int) Math.min(timeDiff / bucketSizeMs, bucketCount);
for (int i = 0; i < bucketsToReset; i++) {
int resetIndex = (currentBucketIndex + i + 1) % bucketCount;
buckets.set(resetIndex, 0);
}
currentBucketIndex = getBucketIndex(currentTime);
lastResetTime.set(currentTime);
}
}
private int getBucketIndex(long currentTime) {
return (int) ((currentTime / bucketSizeMs) % bucketCount);
}
private int calculateTotalRequests() {
int total = 0;
for (int i = 0; i < bucketCount; i++) {
total += buckets.get(i);
}
return total;
}
// 获取详细统计信息
public WindowStats getStats() {
long currentTime = System.currentTimeMillis();
if (shouldResetBuckets(currentTime)) {
resetExpiredBuckets(currentTime);
}
int totalRequests = calculateTotalRequests();
double utilizationRate = (double) totalRequests / maxRequests;
return new WindowStats(totalRequests, maxRequests, utilizationRate, currentTime);
}
public static class WindowStats {
private final int currentRequests;
private final int maxRequests;
private final double utilizationRate;
private final long timestamp;
public WindowStats(int currentRequests, int maxRequests, double utilizationRate, long timestamp) {
this.currentRequests = currentRequests;
this.maxRequests = maxRequests;
this.utilizationRate = utilizationRate;
this.timestamp = timestamp;
}
// Getters
public int getCurrentRequests() { return currentRequests; }
public int getMaxRequests() { return maxRequests; }
public double getUtilizationRate() { return utilizationRate; }
public long getTimestamp() { return timestamp; }
@Override
public String toString() {
return String.format("WindowStats{current=%d, max=%d, rate=%.2f%%, time=%d}",
currentRequests, maxRequests, utilizationRate * 100, timestamp);
}
}
}
4. 高级特性
4.1 自适应桶数量
public class AdaptiveCountBasedSlidingWindow {
private final int maxRequests;
private final long windowSizeMs;
private volatile int bucketCount;
private volatile long bucketSizeMs;
private volatile AtomicIntegerArray buckets;
private final AtomicLong lastAdaptTime;
public AdaptiveCountBasedSlidingWindow(int maxRequests, long windowSizeMs) {
this.maxRequests = maxRequests;
this.windowSizeMs = windowSizeMs;
this.bucketCount = calculateOptimalBucketCount(maxRequests);
this.bucketSizeMs = windowSizeMs / bucketCount;
this.buckets = new AtomicIntegerArray(bucketCount);
this.lastAdaptTime = new AtomicLong(System.currentTimeMillis());
}
private int calculateOptimalBucketCount(int maxRequests) {
// 根据请求量自适应调整桶数量
if (maxRequests <= 10) return 5;
if (maxRequests <= 100) return 10;
if (maxRequests <= 1000) return 20;
return 50;
}
public boolean allowRequest() {
// 定期检查是否需要调整桶数量
long currentTime = System.currentTimeMillis();
if (currentTime - lastAdaptTime.get() > 60000) { // 每分钟检查一次
adaptBucketCount();
}
return allowRequestInternal(currentTime);
}
private synchronized void adaptBucketCount() {
// 根据当前负载情况调整桶数量
int currentLoad = calculateCurrentLoad();
int optimalBucketCount = calculateOptimalBucketCount(currentLoad);
if (optimalBucketCount != bucketCount) {
// 重新初始化桶数组
bucketCount = optimalBucketCount;
bucketSizeMs = windowSizeMs / bucketCount;
buckets = new AtomicIntegerArray(bucketCount);
}
lastAdaptTime.set(System.currentTimeMillis());
}
private int calculateCurrentLoad() {
int total = 0;
for (int i = 0; i < buckets.length(); i++) {
total += buckets.get(i);
}
return total;
}
private boolean allowRequestInternal(long currentTime) {
// 实现基本的限流逻辑
// ... (类似前面的实现)
return true; // 简化实现
}
}
4.2 多级限流
public class MultiLevelCountBasedLimiter {
private final CountBasedSlidingWindow[] limiters;
private final String[] levelNames;
public MultiLevelCountBasedLimiter() {
// 定义多个级别的限流器
limiters = new CountBasedSlidingWindow[]{
new CountBasedSlidingWindow(10, 1000, 5), // 1秒10次
new CountBasedSlidingWindow(100, 60000, 10), // 1分钟100次
new CountBasedSlidingWindow(1000, 3600000, 20) // 1小时1000次
};
levelNames = new String[]{"秒级", "分钟级", "小时级"};
}
public LimitResult allowRequest() {
for (int i = 0; i < limiters.length; i++) {
if (!limiters[i].allowRequest()) {
return new LimitResult(false, levelNames[i],
limiters[i].getCurrentCount(), limiters[i].maxRequests);
}
}
return new LimitResult(true, null, 0, 0);
}
public static class LimitResult {
private final boolean allowed;
private final String limitLevel;
private final int currentCount;
private final int maxCount;
public LimitResult(boolean allowed, String limitLevel, int currentCount, int maxCount) {
this.allowed = allowed;
this.limitLevel = limitLevel;
this.currentCount = currentCount;
this.maxCount = maxCount;
}
// Getters
public boolean isAllowed() { return allowed; }
public String getLimitLevel() { return limitLevel; }
public int getCurrentCount() { return currentCount; }
public int getMaxCount() { return maxCount; }
@Override
public String toString() {
if (allowed) {
return "请求通过";
} else {
return String.format("请求被%s限流阻止 (%d/%d)", limitLevel, currentCount, maxCount);
}
}
}
}
5. 性能测试与对比
5.1 性能测试代码
public class PerformanceTest {
public static void main(String[] args) throws InterruptedException {
int threadCount = 10;
int requestsPerThread = 10000;
// 测试基于数量的滑动窗口
CountBasedSlidingWindow countBased = new CountBasedSlidingWindow(1000, 1000, 10);
long countBasedTime = testPerformance(countBased, threadCount, requestsPerThread);
System.out.println("基于数量的滑动窗口性能: " + countBasedTime + "ms");
}
private static long testPerformance(CountBasedSlidingWindow limiter,
int threadCount, int requestsPerThread)
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger successCount = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
for (int j = 0; j < requestsPerThread; j++) {
if (limiter.allowRequest()) {
successCount.incrementAndGet();
}
}
} finally {
latch.countDown();
}
}).start();
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.println("成功请求数: " + successCount.get());
return endTime - startTime;
}
}
6. 应用场景与最佳实践
6.1 API网关限流
@Component
public class ApiGatewayRateLimiter {
private final Map<String, CountBasedSlidingWindow> apiLimiters = new ConcurrentHashMap<>();
public boolean checkApiLimit(String apiPath, String clientId) {
String key = apiPath + ":" + clientId;
CountBasedSlidingWindow limiter = apiLimiters.computeIfAbsent(key,
k -> createLimiterForApi(apiPath));
return limiter.allowRequest();
}
private CountBasedSlidingWindow createLimiterForApi(String apiPath) {
// 根据API路径配置不同的限流参数
if (apiPath.startsWith("/api/public")) {
return new CountBasedSlidingWindow(100, 60000, 10); // 公共API:1分钟100次
} else if (apiPath.startsWith("/api/premium")) {
return new CountBasedSlidingWindow(1000, 60000, 20); // 高级API:1分钟1000次
} else {
return new CountBasedSlidingWindow(50, 60000, 5); // 默认:1分钟50次
}
}
}
6.2 微服务间调用限流
@Service
public class ServiceCallLimiter {
private final Map<String, CountBasedSlidingWindow> serviceLimiters = new ConcurrentHashMap<>();
@Around("@annotation(RateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {
String serviceKey = getServiceKey(joinPoint);
CountBasedSlidingWindow limiter = serviceLimiters.computeIfAbsent(serviceKey,
k -> new CountBasedSlidingWindow(rateLimit.value(), rateLimit.window(), rateLimit.buckets()));
if (!limiter.allowRequest()) {
throw new RateLimitExceededException("服务调用频率超限: " + serviceKey);
}
return joinPoint.proceed();
}
private String getServiceKey(ProceedingJoinPoint joinPoint) {
return joinPoint.getTarget().getClass().getSimpleName() + "." +
joinPoint.getSignature().getName();
}
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
int value() default 100; // 最大请求数
long window() default 60000; // 时间窗口(毫秒)
int buckets() default 10; // 桶数量
}
7. 监控与告警
7.1 监控指标收集
@Component
public class RateLimiterMonitor {
private final MeterRegistry meterRegistry;
private final Map<String, CountBasedSlidingWindow> limiters;
public RateLimiterMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.limiters = new ConcurrentHashMap<>();
}
@Scheduled(fixedRate = 5000) // 每5秒收集一次指标
public void collectMetrics() {
limiters.forEach((key, limiter) -> {
OptimizedCountBasedSlidingWindow.WindowStats stats =
((OptimizedCountBasedSlidingWindow) limiter).getStats();
// 记录当前请求数
Gauge.builder("rate_limiter.current_requests")
.tag("limiter", key)
.register(meterRegistry, stats, WindowStats::getCurrentRequests);
// 记录利用率
Gauge.builder("rate_limiter.utilization_rate")
.tag("limiter", key)
.register(meterRegistry, stats, WindowStats::getUtilizationRate);
});
}
public void registerLimiter(String key, CountBasedSlidingWindow limiter) {
limiters.put(key, limiter);
}
}
8. 总结与建议
8.1 算法优势
- 高性能:O(1)时间复杂度,适合高并发场景
- 内存可控:固定大小的内存占用
- 配置灵活:可根据需求调整桶数量和窗口大小
- 监控友好:提供丰富的统计信息
8.2 使用建议
- 桶数量选择:建议桶数量为窗口时间的1/10到1/5
- 内存考虑:大量限流器实例时注意内存使用
- 精度权衡:在性能和精度之间找到平衡点
- 监控配置:建立完善的监控和告警机制
8.3 适用场景
- 高并发API限流:需要高性能的场景
- 微服务治理:服务间调用频率控制
- 资源保护:数据库、缓存等资源访问控制
- 用户行为限制:防止恶意请求和爬虫
基于数量的滑动窗口限流算法在保证限流效果的同时,提供了优秀的性能表现,是现代分布式系统中重要的流量控制工具。
文章标签
冬眠
博主专注于技术、阅读与思考。在这里记录学习、思考与生活。
系列:限流算法
第 2 篇,共 2 篇
