冬眠的笔记
首页文章分类书单项目关于
冬眠
X

© 2026 冬眠的笔记 · 用文字记录思考,用思考改变生活

首页>文章>面试
算法限流滑动窗口高并发

基于数量的滑动窗口

以请求数量为维度的滑动窗口限流实现及其应用场景

冬眠
冬眠
专注于技术、阅读与思考
2025-11-17
发布日期
10 min read
阅读时长
浏览量
基于数量的滑动窗口

1. 算法概述

基于数量的滑动窗口限流算法是滑动窗口算法的一种特殊实现,它专注于控制固定时间窗口内的请求数量。与基于时间的滑动窗口不同,这种算法更注重精确的数量控制和高效的内存使用。

2. 核心特点

2.1 设计理念

  • 精确计数:严格控制窗口内的请求数量
  • 内存优化:使用固定大小的数据结构
  • 高性能:O(1)时间复杂度的操作
  • 线程安全:支持高并发环境

2.2 与传统滑动窗口的区别

特性 传统滑动窗口 基于数量的滑动窗口
存储方式 时间戳队列 固定大小数组
内存使用 动态变化 固定大小
时间复杂度 O(n) O(1)
精确度 高 中等
适用场景 精确限流 高性能限流

3. 算法实现

3.1 基础实现

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class CountBasedSlidingWindow {
    private final int maxRequests;          // 最大请求数
    private final long windowSizeMs;        // 窗口大小(毫秒)
    private final int bucketCount;          // 桶的数量
    private final long bucketSizeMs;        // 每个桶的时间长度
    
    private final AtomicInteger[] buckets;  // 计数桶数组
    private final AtomicLong lastUpdateTime; // 最后更新时间
    
    public CountBasedSlidingWindow(int maxRequests, long windowSizeMs, int bucketCount) {
        this.maxRequests = maxRequests;
        this.windowSizeMs = windowSizeMs;
        this.bucketCount = bucketCount;
        this.bucketSizeMs = windowSizeMs / bucketCount;
        this.buckets = new AtomicInteger[bucketCount];
        this.lastUpdateTime = new AtomicLong(System.currentTimeMillis());
        
        // 初始化桶
        for (int i = 0; i < bucketCount; i++) {
            buckets[i] = new AtomicInteger(0);
        }
    }
    
    public synchronized boolean allowRequest() {
        long currentTime = System.currentTimeMillis();
        
        // 更新过期的桶
        updateExpiredBuckets(currentTime);
        
        // 计算当前总请求数
        int totalRequests = getTotalRequests();
        
        if (totalRequests >= maxRequests) {
            return false;
        }
        
        // 增加当前桶的计数
        int currentBucketIndex = getCurrentBucketIndex(currentTime);
        buckets[currentBucketIndex].incrementAndGet();
        
        return true;
    }
    
    private void updateExpiredBuckets(long currentTime) {
        long lastUpdate = lastUpdateTime.get();
        long timeDiff = currentTime - lastUpdate;
        
        if (timeDiff >= bucketSizeMs) {
            // 计算需要重置的桶数量
            int bucketsToReset = (int) Math.min(timeDiff / bucketSizeMs, bucketCount);
            
            // 重置过期的桶
            for (int i = 0; i < bucketsToReset; i++) {
                int bucketIndex = (int) (((lastUpdate / bucketSizeMs) + i + 1) % bucketCount);
                buckets[bucketIndex].set(0);
            }
            
            lastUpdateTime.set(currentTime);
        }
    }
    
    private int getCurrentBucketIndex(long currentTime) {
        return (int) ((currentTime / bucketSizeMs) % bucketCount);
    }
    
    private int getTotalRequests() {
        int total = 0;
        for (AtomicInteger bucket : buckets) {
            total += bucket.get();
        }
        return total;
    }
    
    public int getCurrentCount() {
        updateExpiredBuckets(System.currentTimeMillis());
        return getTotalRequests();
    }
    
    public double getCurrentRate() {
        return (double) getCurrentCount() / maxRequests;
    }
}

3.2 优化版本实现

import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;

public class OptimizedCountBasedSlidingWindow {
    private final int maxRequests;
    private final long windowSizeMs;
    private final int bucketCount;
    private final long bucketSizeMs;
    
    private final AtomicIntegerArray buckets;
    private final AtomicLong lastResetTime;
    private volatile int currentBucketIndex;
    
    public OptimizedCountBasedSlidingWindow(int maxRequests, long windowSizeMs, int bucketCount) {
        this.maxRequests = maxRequests;
        this.windowSizeMs = windowSizeMs;
        this.bucketCount = bucketCount;
        this.bucketSizeMs = windowSizeMs / bucketCount;
        this.buckets = new AtomicIntegerArray(bucketCount);
        this.lastResetTime = new AtomicLong(System.currentTimeMillis());
        this.currentBucketIndex = 0;
    }
    
    public boolean allowRequest() {
        long currentTime = System.currentTimeMillis();
        
        // 快速路径:检查是否需要重置桶
        if (shouldResetBuckets(currentTime)) {
            resetExpiredBuckets(currentTime);
        }
        
        // 计算当前请求总数
        int totalRequests = calculateTotalRequests();
        
        if (totalRequests >= maxRequests) {
            return false;
        }
        
        // 原子性增加当前桶计数
        int bucketIndex = getBucketIndex(currentTime);
        buckets.incrementAndGet(bucketIndex);
        
        return true;
    }
    
    private boolean shouldResetBuckets(long currentTime) {
        return currentTime - lastResetTime.get() >= bucketSizeMs;
    }
    
    private synchronized void resetExpiredBuckets(long currentTime) {
        long lastReset = lastResetTime.get();
        long timeDiff = currentTime - lastReset;
        
        if (timeDiff >= bucketSizeMs) {
            int bucketsToReset = (int) Math.min(timeDiff / bucketSizeMs, bucketCount);
            
            for (int i = 0; i < bucketsToReset; i++) {
                int resetIndex = (currentBucketIndex + i + 1) % bucketCount;
                buckets.set(resetIndex, 0);
            }
            
            currentBucketIndex = getBucketIndex(currentTime);
            lastResetTime.set(currentTime);
        }
    }
    
    private int getBucketIndex(long currentTime) {
        return (int) ((currentTime / bucketSizeMs) % bucketCount);
    }
    
    private int calculateTotalRequests() {
        int total = 0;
        for (int i = 0; i < bucketCount; i++) {
            total += buckets.get(i);
        }
        return total;
    }
    
    // 获取详细统计信息
    public WindowStats getStats() {
        long currentTime = System.currentTimeMillis();
        if (shouldResetBuckets(currentTime)) {
            resetExpiredBuckets(currentTime);
        }
        
        int totalRequests = calculateTotalRequests();
        double utilizationRate = (double) totalRequests / maxRequests;
        
        return new WindowStats(totalRequests, maxRequests, utilizationRate, currentTime);
    }
    
    public static class WindowStats {
        private final int currentRequests;
        private final int maxRequests;
        private final double utilizationRate;
        private final long timestamp;
        
        public WindowStats(int currentRequests, int maxRequests, double utilizationRate, long timestamp) {
            this.currentRequests = currentRequests;
            this.maxRequests = maxRequests;
            this.utilizationRate = utilizationRate;
            this.timestamp = timestamp;
        }
        
        // Getters
        public int getCurrentRequests() { return currentRequests; }
        public int getMaxRequests() { return maxRequests; }
        public double getUtilizationRate() { return utilizationRate; }
        public long getTimestamp() { return timestamp; }
        
        @Override
        public String toString() {
            return String.format("WindowStats{current=%d, max=%d, rate=%.2f%%, time=%d}", 
                currentRequests, maxRequests, utilizationRate * 100, timestamp);
        }
    }
}

4. 高级特性

4.1 自适应桶数量

public class AdaptiveCountBasedSlidingWindow {
    private final int maxRequests;
    private final long windowSizeMs;
    private volatile int bucketCount;
    private volatile long bucketSizeMs;
    
    private volatile AtomicIntegerArray buckets;
    private final AtomicLong lastAdaptTime;
    
    public AdaptiveCountBasedSlidingWindow(int maxRequests, long windowSizeMs) {
        this.maxRequests = maxRequests;
        this.windowSizeMs = windowSizeMs;
        this.bucketCount = calculateOptimalBucketCount(maxRequests);
        this.bucketSizeMs = windowSizeMs / bucketCount;
        this.buckets = new AtomicIntegerArray(bucketCount);
        this.lastAdaptTime = new AtomicLong(System.currentTimeMillis());
    }
    
    private int calculateOptimalBucketCount(int maxRequests) {
        // 根据请求量自适应调整桶数量
        if (maxRequests <= 10) return 5;
        if (maxRequests <= 100) return 10;
        if (maxRequests <= 1000) return 20;
        return 50;
    }
    
    public boolean allowRequest() {
        // 定期检查是否需要调整桶数量
        long currentTime = System.currentTimeMillis();
        if (currentTime - lastAdaptTime.get() > 60000) { // 每分钟检查一次
            adaptBucketCount();
        }
        
        return allowRequestInternal(currentTime);
    }
    
    private synchronized void adaptBucketCount() {
        // 根据当前负载情况调整桶数量
        int currentLoad = calculateCurrentLoad();
        int optimalBucketCount = calculateOptimalBucketCount(currentLoad);
        
        if (optimalBucketCount != bucketCount) {
            // 重新初始化桶数组
            bucketCount = optimalBucketCount;
            bucketSizeMs = windowSizeMs / bucketCount;
            buckets = new AtomicIntegerArray(bucketCount);
        }
        
        lastAdaptTime.set(System.currentTimeMillis());
    }
    
    private int calculateCurrentLoad() {
        int total = 0;
        for (int i = 0; i < buckets.length(); i++) {
            total += buckets.get(i);
        }
        return total;
    }
    
    private boolean allowRequestInternal(long currentTime) {
        // 实现基本的限流逻辑
        // ... (类似前面的实现)
        return true; // 简化实现
    }
}

4.2 多级限流

public class MultiLevelCountBasedLimiter {
    private final CountBasedSlidingWindow[] limiters;
    private final String[] levelNames;
    
    public MultiLevelCountBasedLimiter() {
        // 定义多个级别的限流器
        limiters = new CountBasedSlidingWindow[]{
            new CountBasedSlidingWindow(10, 1000, 5),    // 1秒10次
            new CountBasedSlidingWindow(100, 60000, 10), // 1分钟100次
            new CountBasedSlidingWindow(1000, 3600000, 20) // 1小时1000次
        };
        
        levelNames = new String[]{"秒级", "分钟级", "小时级"};
    }
    
    public LimitResult allowRequest() {
        for (int i = 0; i < limiters.length; i++) {
            if (!limiters[i].allowRequest()) {
                return new LimitResult(false, levelNames[i], 
                    limiters[i].getCurrentCount(), limiters[i].maxRequests);
            }
        }
        
        return new LimitResult(true, null, 0, 0);
    }
    
    public static class LimitResult {
        private final boolean allowed;
        private final String limitLevel;
        private final int currentCount;
        private final int maxCount;
        
        public LimitResult(boolean allowed, String limitLevel, int currentCount, int maxCount) {
            this.allowed = allowed;
            this.limitLevel = limitLevel;
            this.currentCount = currentCount;
            this.maxCount = maxCount;
        }
        
        // Getters
        public boolean isAllowed() { return allowed; }
        public String getLimitLevel() { return limitLevel; }
        public int getCurrentCount() { return currentCount; }
        public int getMaxCount() { return maxCount; }
        
        @Override
        public String toString() {
            if (allowed) {
                return "请求通过";
            } else {
                return String.format("请求被%s限流阻止 (%d/%d)", limitLevel, currentCount, maxCount);
            }
        }
    }
}

5. 性能测试与对比

5.1 性能测试代码

public class PerformanceTest {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 10;
        int requestsPerThread = 10000;
        
        // 测试基于数量的滑动窗口
        CountBasedSlidingWindow countBased = new CountBasedSlidingWindow(1000, 1000, 10);
        long countBasedTime = testPerformance(countBased, threadCount, requestsPerThread);
        
        System.out.println("基于数量的滑动窗口性能: " + countBasedTime + "ms");
    }
    
    private static long testPerformance(CountBasedSlidingWindow limiter, 
                                       int threadCount, int requestsPerThread) 
                                       throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicInteger successCount = new AtomicInteger(0);
        
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < requestsPerThread; j++) {
                        if (limiter.allowRequest()) {
                            successCount.incrementAndGet();
                        }
                    }
                } finally {
                    latch.countDown();
                }
            }).start();
        }
        
        latch.await();
        long endTime = System.currentTimeMillis();
        
        System.out.println("成功请求数: " + successCount.get());
        return endTime - startTime;
    }
}

6. 应用场景与最佳实践

6.1 API网关限流

@Component
public class ApiGatewayRateLimiter {
    private final Map<String, CountBasedSlidingWindow> apiLimiters = new ConcurrentHashMap<>();
    
    public boolean checkApiLimit(String apiPath, String clientId) {
        String key = apiPath + ":" + clientId;
        CountBasedSlidingWindow limiter = apiLimiters.computeIfAbsent(key, 
            k -> createLimiterForApi(apiPath));
        
        return limiter.allowRequest();
    }
    
    private CountBasedSlidingWindow createLimiterForApi(String apiPath) {
        // 根据API路径配置不同的限流参数
        if (apiPath.startsWith("/api/public")) {
            return new CountBasedSlidingWindow(100, 60000, 10); // 公共API:1分钟100次
        } else if (apiPath.startsWith("/api/premium")) {
            return new CountBasedSlidingWindow(1000, 60000, 20); // 高级API:1分钟1000次
        } else {
            return new CountBasedSlidingWindow(50, 60000, 5); // 默认:1分钟50次
        }
    }
}

6.2 微服务间调用限流

@Service
public class ServiceCallLimiter {
    private final Map<String, CountBasedSlidingWindow> serviceLimiters = new ConcurrentHashMap<>();
    
    @Around("@annotation(RateLimit)")
    public Object rateLimit(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {
        String serviceKey = getServiceKey(joinPoint);
        CountBasedSlidingWindow limiter = serviceLimiters.computeIfAbsent(serviceKey,
            k -> new CountBasedSlidingWindow(rateLimit.value(), rateLimit.window(), rateLimit.buckets()));
        
        if (!limiter.allowRequest()) {
            throw new RateLimitExceededException("服务调用频率超限: " + serviceKey);
        }
        
        return joinPoint.proceed();
    }
    
    private String getServiceKey(ProceedingJoinPoint joinPoint) {
        return joinPoint.getTarget().getClass().getSimpleName() + "." + 
               joinPoint.getSignature().getName();
    }
}

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
    int value() default 100;        // 最大请求数
    long window() default 60000;    // 时间窗口(毫秒)
    int buckets() default 10;       // 桶数量
}

7. 监控与告警

7.1 监控指标收集

@Component
public class RateLimiterMonitor {
    private final MeterRegistry meterRegistry;
    private final Map<String, CountBasedSlidingWindow> limiters;
    
    public RateLimiterMonitor(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.limiters = new ConcurrentHashMap<>();
    }
    
    @Scheduled(fixedRate = 5000) // 每5秒收集一次指标
    public void collectMetrics() {
        limiters.forEach((key, limiter) -> {
            OptimizedCountBasedSlidingWindow.WindowStats stats = 
                ((OptimizedCountBasedSlidingWindow) limiter).getStats();
            
            // 记录当前请求数
            Gauge.builder("rate_limiter.current_requests")
                .tag("limiter", key)
                .register(meterRegistry, stats, WindowStats::getCurrentRequests);
            
            // 记录利用率
            Gauge.builder("rate_limiter.utilization_rate")
                .tag("limiter", key)
                .register(meterRegistry, stats, WindowStats::getUtilizationRate);
        });
    }
    
    public void registerLimiter(String key, CountBasedSlidingWindow limiter) {
        limiters.put(key, limiter);
    }
}

8. 总结与建议

8.1 算法优势

  1. 高性能:O(1)时间复杂度,适合高并发场景
  2. 内存可控:固定大小的内存占用
  3. 配置灵活:可根据需求调整桶数量和窗口大小
  4. 监控友好:提供丰富的统计信息

8.2 使用建议

  1. 桶数量选择:建议桶数量为窗口时间的1/10到1/5
  2. 内存考虑:大量限流器实例时注意内存使用
  3. 精度权衡:在性能和精度之间找到平衡点
  4. 监控配置:建立完善的监控和告警机制

8.3 适用场景

  • 高并发API限流:需要高性能的场景
  • 微服务治理:服务间调用频率控制
  • 资源保护:数据库、缓存等资源访问控制
  • 用户行为限制:防止恶意请求和爬虫

基于数量的滑动窗口限流算法在保证限流效果的同时,提供了优秀的性能表现,是现代分布式系统中重要的流量控制工具。

文章标签

算法限流滑动窗口高并发
注册中心的设计
上一篇

注册中心的设计

2025-01-19

滑动窗口限流算法
下一篇

滑动窗口限流算法

2025-11-17

冬眠

冬眠

博主

专注于技术、阅读与思考。在这里记录学习、思考与生活。

116
文章
3
分类
关注我
系列:限流算法

第 2 篇,共 2 篇

上一篇

滑动窗口限流算法

已是最后一篇

文章目录

目录

  • 1. 算法概述
  • 2. 核心特点
  • 3. 算法实现
  • 4. 高级特性
  • 5. 性能测试与对比
  • 6. 应用场景与最佳实践
  • 7. 监控与告警
  • 8. 总结与建议

相关文章

查看更多
滑动窗口限流算法

滑动窗口限流算法

2025-11-17 · 6 min read

算法分类与刷题指南

算法分类与刷题指南

2024-10-23 · 1 min read

二分查找

二分查找

2025-11-17 · 10 min read