冬眠的笔记
首页文章分类书单项目关于
冬眠
X

© 2026 冬眠的笔记 · 用文字记录思考,用思考改变生活

首页>文章>Java
JavaJUC并发队列

JUC 中的队列

深入分析 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 等并发队列的实现原理与适用场景

冬眠
冬眠
专注于技术、阅读与思考
2025-11-19
发布日期
25 min read
阅读时长
浏览量
JUC 中的队列

概述

Java并发包(java.util.concurrent)中的队列是构建高性能并发系统的核心组件。作为架构师,深入理解这些队列的设计原理、性能特征和适用场景,对于构建可扩展的分布式系统至关重要。本文将从架构师的视角,全面解析JUC队列的设计哲学和实践应用。

队列分类体系

1. 按阻塞特性分类

1.1 阻塞队列(BlockingQueue)

阻塞队列是JUC中最重要的队列类型,提供了线程安全的生产者-消费者模式实现。

核心特性:

  • 队列满时,生产者线程阻塞
  • 队列空时,消费者线程阻塞
  • 提供超时机制和中断响应

接口定义:

public interface BlockingQueue<E> extends Queue<E> {
    // 阻塞插入
    void put(E e) throws InterruptedException;
    
    // 阻塞获取
    E take() throws InterruptedException;
    
    // 超时插入
    boolean offer(E e, long timeout, TimeUnit unit) 
        throws InterruptedException;
    
    // 超时获取
    E poll(long timeout, TimeUnit unit) 
        throws InterruptedException;
    
    // 剩余容量
    int remainingCapacity();
}

1.2 非阻塞队列

非阻塞队列基于CAS操作实现,提供高性能的无锁并发访问。

代表实现:

  • ConcurrentLinkedQueue
  • ConcurrentLinkedDeque

2. 按容量特性分类

2.1 有界队列

  • ArrayBlockingQueue
  • LinkedBlockingQueue(可选有界)
  • PriorityBlockingQueue(可选有界)

2.2 无界队列

  • LinkedBlockingQueue(默认无界)
  • ConcurrentLinkedQueue
  • DelayQueue

3. 按数据结构分类

3.1 数组实现

  • ArrayBlockingQueue

3.2 链表实现

  • LinkedBlockingQueue
  • ConcurrentLinkedQueue

3.3 堆实现

  • PriorityBlockingQueue

核心队列实现详解

1. ArrayBlockingQueue

1.1 设计特点

  • 基于数组的有界阻塞队列
  • FIFO(先进先出)顺序
  • 支持公平/非公平访问策略
  • 固定容量,创建后不可改变

1.2 实现原理

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
    // 存储元素的数组
    final Object[] items;
    
    // 取元素的索引
    int takeIndex;
    
    // 放元素的索引
    int putIndex;
    
    // 队列中的元素数量
    int count;
    
    // 主锁
    final ReentrantLock lock;
    
    // 等待取元素的条件
    private final Condition notEmpty;
    
    // 等待放元素的条件
    private final Condition notFull;
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }
    
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
    
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        notFull.signal();
        return x;
    }
}

1.3 性能特征

  • 时间复杂度:O(1)的入队和出队操作
  • 空间复杂度:O(n),n为队列容量
  • 并发性能:单锁设计,读写互斥
  • 内存占用:预分配数组,内存占用固定

1.4 适用场景

// 生产者-消费者模式示例
public class ProducerConsumerExample {
    private final ArrayBlockingQueue<Task> taskQueue = 
        new ArrayBlockingQueue<>(1000, true); // 公平队列
    
    // 生产者
    public class Producer implements Runnable {
        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    Task task = generateTask();
                    taskQueue.put(task); // 阻塞插入
                    System.out.println("Produced: " + task);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        private Task generateTask() {
            return new Task("Task-" + System.currentTimeMillis());
        }
    }
    
    // 消费者
    public class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    Task task = taskQueue.take(); // 阻塞获取
                    processTask(task);
                    System.out.println("Consumed: " + task);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        private void processTask(Task task) {
            // 处理任务逻辑
            try {
                Thread.sleep(100); // 模拟处理时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    static class Task {
        private final String name;
        
        public Task(String name) {
            this.name = name;
        }
        
        @Override
        public String toString() {
            return name;
        }
    }
}

2. LinkedBlockingQueue

2.1 设计特点

  • 基于链表的可选有界阻塞队列
  • FIFO顺序
  • 默认容量为Integer.MAX_VALUE(近似无界)
  • 读写分离锁设计

2.2 实现原理

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
    // 节点定义
    static class Node<E> {
        E item;
        Node<E> next;
        
        Node(E x) { item = x; }
    }
    
    // 队列容量
    private final int capacity;
    
    // 当前元素数量
    private final AtomicInteger count = new AtomicInteger();
    
    // 头节点
    transient Node<E> head;
    
    // 尾节点
    private transient Node<E> last;
    
    // 取元素锁
    private final ReentrantLock takeLock = new ReentrantLock();
    
    // 非空条件
    private final Condition notEmpty = takeLock.newCondition();
    
    // 放元素锁
    private final ReentrantLock putLock = new ReentrantLock();
    
    // 非满条件
    private final Condition notFull = putLock.newCondition();
    
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
    
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }
    
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
    
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
    
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
}

2.3 性能优势

  • 读写分离:put和take操作使用不同的锁
  • 更高并发:读写可以并行进行
  • 动态扩容:基于链表,按需分配内存

3. ConcurrentLinkedQueue

3.1 设计特点

  • 基于链表的无界非阻塞队列
  • 使用CAS操作实现无锁并发
  • FIFO顺序
  • 高性能,适合高并发场景

3.2 实现原理

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
        
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
        
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
        
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
        
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
        
        // Unsafe机制
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
        
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    
    private transient volatile Node<E> head;
    private transient volatile Node<E> tail;
    
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
    
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);
        
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p是最后一个节点
                if (p.casNext(null, newNode)) {
                    // CAS成功,更新tail
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // CAS失败,重试
            }
            else if (p == q)
                // 遇到哨兵节点,重新开始
                p = (t != (t = tail)) ? t : head;
            else
                // 检查tail是否落后,如果是则推进
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
    
    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                
                if (item != null && p.casItem(item, null)) {
                    // CAS成功获取元素
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
    
    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }
    
    private boolean casTail(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
    }
    
    private boolean casHead(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
    }
    
    // Unsafe机制
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
    
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

3.3 性能特征

  • 无锁设计:基于CAS操作,避免线程阻塞
  • 高并发:支持大量并发读写
  • 内存效率:按需分配,无预分配开销
  • 弱一致性:size()方法不保证实时准确性

4. PriorityBlockingQueue

4.1 设计特点

  • 基于优先级堆的无界阻塞队列
  • 元素按优先级排序
  • 支持自然排序或自定义Comparator
  • 线程安全的优先级队列

4.2 实现原理

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    
    // 默认初始容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
    // 最大数组大小
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    
    // 存储元素的数组(二叉堆)
    private transient Object[] queue;
    
    // 队列中的元素数量
    private transient int size;
    
    // 比较器
    private transient Comparator<? super E> comparator;
    
    // 主锁
    private final ReentrantLock lock;
    
    // 非空条件
    private final Condition notEmpty;
    
    // 扩容时的自旋锁
    private transient volatile int allocationSpinLock;
    
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
    
    public void put(E e) {
        offer(e); // 永不阻塞
    }
    
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
    
    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }
    
    // 上浮操作(插入时维护堆性质)
    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
    
    // 下沉操作(删除时维护堆性质)
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>) x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }
    
    // 扩容操作
    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // 释放锁,允许其他操作
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }
}

4.3 应用场景

// 任务调度系统示例
public class TaskScheduler {
    private final PriorityBlockingQueue<ScheduledTask> taskQueue = 
        new PriorityBlockingQueue<>();
    
    public static class ScheduledTask implements Comparable<ScheduledTask> {
        private final String name;
        private final int priority;
        private final long executeTime;
        
        public ScheduledTask(String name, int priority, long executeTime) {
            this.name = name;
            this.priority = priority;
            this.executeTime = executeTime;
        }
        
        @Override
        public int compareTo(ScheduledTask other) {
            // 优先级高的先执行,优先级相同则按执行时间排序
            int result = Integer.compare(other.priority, this.priority);
            if (result == 0) {
                result = Long.compare(this.executeTime, other.executeTime);
            }
            return result;
        }
        
        public void execute() {
            System.out.println("Executing task: " + name + 
                             ", priority: " + priority + 
                             ", time: " + executeTime);
        }
        
        // getters...
    }
    
    public void scheduleTask(String name, int priority, long delay) {
        long executeTime = System.currentTimeMillis() + delay;
        ScheduledTask task = new ScheduledTask(name, priority, executeTime);
        taskQueue.offer(task);
    }
    
    public void startScheduler() {
        Thread scheduler = new Thread(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    ScheduledTask task = taskQueue.take();
                    
                    // 检查是否到了执行时间
                    long currentTime = System.currentTimeMillis();
                    if (task.executeTime > currentTime) {
                        // 还没到执行时间,重新放回队列
                        taskQueue.offer(task);
                        Thread.sleep(100); // 短暂等待
                        continue;
                    }
                    
                    // 执行任务
                    task.execute();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        scheduler.setDaemon(true);
        scheduler.start();
    }
}

5. DelayQueue

5.1 设计特点

  • 基于PriorityQueue的延迟队列
  • 元素必须实现Delayed接口
  • 只有到期的元素才能被取出
  • 适用于定时任务和缓存过期

5.2 实现原理

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    
    // 等待队列头元素的线程
    private Thread leader = null;
    
    // 有新元素到达队列头时的条件
    private final Condition available = lock.newCondition();
    
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
    
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
}

5.3 应用示例

// 缓存过期管理
public class CacheManager<K, V> {
    private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
    private final DelayQueue<DelayedCacheEntry<K>> delayQueue = new DelayQueue<>();
    
    public static class DelayedCacheEntry<K> implements Delayed {
        private final K key;
        private final long expireTime;
        
        public DelayedCacheEntry(K key, long ttl, TimeUnit unit) {
            this.key = key;
            this.expireTime = System.nanoTime() + unit.toNanos(ttl);
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expireTime - System.nanoTime(), NANOSECONDS);
        }
        
        @Override
        public int compareTo(Delayed other) {
            if (other == this) return 0;
            if (other instanceof DelayedCacheEntry) {
                DelayedCacheEntry<?> otherEntry = (DelayedCacheEntry<?>) other;
                return Long.compare(this.expireTime, otherEntry.expireTime);
            }
            return Long.compare(getDelay(NANOSECONDS), other.getDelay(NANOSECONDS));
        }
        
        public K getKey() {
            return key;
        }
    }
    
    public CacheManager() {
        // 启动清理线程
        Thread cleanupThread = new Thread(this::cleanup);
        cleanupThread.setDaemon(true);
        cleanupThread.start();
    }
    
    public void put(K key, V value, long ttl, TimeUnit unit) {
        cache.put(key, value);
        delayQueue.offer(new DelayedCacheEntry<>(key, ttl, unit));
    }
    
    public V get(K key) {
        return cache.get(key);
    }
    
    private void cleanup() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                DelayedCacheEntry<K> expiredEntry = delayQueue.take();
                cache.remove(expiredEntry.getKey());
                System.out.println("Removed expired cache entry: " + expiredEntry.getKey());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public int size() {
        return cache.size();
    }
}

性能对比与选择指南

1. 性能测试数据

队列类型 吞吐量(ops/sec) 延迟(μs) 内存占用 适用场景
ArrayBlockingQueue 500万 10-50 固定 有界生产消费
LinkedBlockingQueue 300万 20-80 动态 大容量缓冲
ConcurrentLinkedQueue 1000万 5-20 动态 高并发无阻塞
PriorityBlockingQueue 200万 50-200 动态 优先级调度
DelayQueue 100万 100-500 动态 延迟处理

2. 选择决策树

需要阻塞语义?
├─ 是
│  ├─ 需要优先级?
│  │  ├─ 是 → PriorityBlockingQueue
│  │  └─ 否
│  │     ├─ 需要延迟?
│  │     │  ├─ 是 → DelayQueue
│  │     │  └─ 否
│  │     │     ├─ 固定容量?
│  │     │     │  ├─ 是 → ArrayBlockingQueue
│  │     │     │  └─ 否 → LinkedBlockingQueue
└─ 否 → ConcurrentLinkedQueue

3. 架构设计原则

3.1 容量规划

// 容量计算公式
public class QueueCapacityCalculator {
    
    /**
     * 计算队列容量
     * @param producerRate 生产速率 (items/second)
     * @param consumerRate 消费速率 (items/second)
     * @param maxLatency 最大延迟 (seconds)
     * @param safetyFactor 安全系数 (1.2-2.0)
     * @return 建议容量
     */
    public static int calculateCapacity(double producerRate, 
                                       double consumerRate,
                                       double maxLatency,
                                       double safetyFactor) {
        if (consumerRate >= producerRate) {
            // 消费能力充足,基于延迟计算
            return (int) Math.ceil(producerRate * maxLatency * safetyFactor);
        } else {
            // 消费能力不足,需要更大容量
            double backlogRate = producerRate - consumerRate;
            double baseCapacity = producerRate * maxLatency;
            double backlogCapacity = backlogRate * maxLatency * 2; // 额外缓冲
            return (int) Math.ceil((baseCapacity + backlogCapacity) * safetyFactor);
        }
    }
    
    // 使用示例
    public static void main(String[] args) {
        // 生产速率: 1000 items/sec
        // 消费速率: 800 items/sec  
        // 最大延迟: 5 seconds
        // 安全系数: 1.5
        int capacity = calculateCapacity(1000, 800, 5, 1.5);
        System.out.println("Recommended capacity: " + capacity);
    }
}

3.2 监控指标

public class QueueMonitor {
    private final BlockingQueue<?> queue;
    private final String queueName;
    private final ScheduledExecutorService scheduler;
    
    // 监控指标
    private volatile long totalOffered = 0;
    private volatile long totalPolled = 0;
    private volatile long totalRejected = 0;
    private volatile long maxSize = 0;
    
    public QueueMonitor(BlockingQueue<?> queue, String queueName) {
        this.queue = queue;
        this.queueName = queueName;
        this.scheduler = Executors.newScheduledThreadPool(1);
        startMonitoring();
    }
    
    private void startMonitoring() {
        scheduler.scheduleAtFixedRate(this::collectMetrics, 0, 10, TimeUnit.SECONDS);
    }
    
    private void collectMetrics() {
        int currentSize = queue.size();
        maxSize = Math.max(maxSize, currentSize);
        
        // 计算利用率
        double utilization = 0.0;
        if (queue instanceof ArrayBlockingQueue) {
            int capacity = queue.remainingCapacity() + currentSize;
            utilization = (double) currentSize / capacity;
        }
        
        // 输出监控信息
        System.out.printf("Queue[%s] - Size: %d, Max: %d, Utilization: %.2f%%, " +
                         "Offered: %d, Polled: %d, Rejected: %d%n",
                         queueName, currentSize, maxSize, utilization * 100,
                         totalOffered, totalPolled, totalRejected);
        
        // 告警检查
        if (utilization > 0.8) {
            System.err.println("WARNING: Queue " + queueName + " utilization is high: " + 
                             String.format("%.2f%%", utilization * 100));
        }
    }
    
    public void recordOffer() {
        totalOffered++;
    }
    
    public void recordPoll() {
        totalPolled++;
    }
    
    public void recordReject() {
        totalRejected++;
    }
    
    public void shutdown() {
        scheduler.shutdown();
    }
}

高级应用模式

1. 多级队列架构

// 多级优先级队列系统
public class MultiLevelQueueSystem {
    private final Map<Priority, BlockingQueue<Task>> queues;
    private final ExecutorService executorService;
    private volatile boolean running = true;
    
    public enum Priority {
        HIGH(1), MEDIUM(2), LOW(3);
        
        private final int level;
        
        Priority(int level) {
            this.level = level;
        }
        
        public int getLevel() {
            return level;
        }
    }
    
    public MultiLevelQueueSystem(int workerThreads) {
        this.queues = new EnumMap<>(Priority.class);
        this.queues.put(Priority.HIGH, new ArrayBlockingQueue<>(1000));
        this.queues.put(Priority.MEDIUM, new ArrayBlockingQueue<>(2000));
        this.queues.put(Priority.LOW, new LinkedBlockingQueue<>());
        
        this.executorService = Executors.newFixedThreadPool(workerThreads);
        startWorkers();
    }
    
    private void startWorkers() {
        for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
            executorService.submit(this::workerLoop);
        }
    }
    
    private void workerLoop() {
        while (running) {
            try {
                Task task = getNextTask();
                if (task != null) {
                    processTask(task);
                } else {
                    Thread.sleep(10); // 短暂休眠
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    private Task getNextTask() throws InterruptedException {
        // 按优先级顺序检查队列
        for (Priority priority : Priority.values()) {
            BlockingQueue<Task> queue = queues.get(priority);
            Task task = queue.poll(100, TimeUnit.MILLISECONDS);
            if (task != null) {
                return task;
            }
        }
        return null;
    }
    
    public boolean submitTask(Task task) {
        BlockingQueue<Task> queue = queues.get(task.getPriority());
        return queue.offer(task);
    }
    
    private void processTask(Task task) {
        try {
            task.execute();
        } catch (Exception e) {
            System.err.println("Task execution failed: " + e.getMessage());
        }
    }
    
    public void shutdown() {
        running = false;
        executorService.shutdown();
    }
    
    public static class Task {
        private final String name;
        private final Priority priority;
        private final Runnable action;
        
        public Task(String name, Priority priority, Runnable action) {
            this.name = name;
            this.priority = priority;
            this.action = action;
        }
        
        public void execute() {
            System.out.println("Executing task: " + name + " (" + priority + ")");
            action.run();
        }
        
        public Priority getPriority() {
            return priority;
        }
    }
}

2. 背压控制机制

// 带背压控制的队列包装器
public class BackpressureQueue<T> {
    private final BlockingQueue<T> queue;
    private final AtomicLong pendingCount = new AtomicLong(0);
    private final long maxPending;
    private final BackpressureStrategy strategy;
    
    public enum BackpressureStrategy {
        DROP_OLDEST,    // 丢弃最旧的元素
        DROP_NEWEST,    // 丢弃最新的元素
        BLOCK,          // 阻塞生产者
        REJECT          // 拒绝新元素
    }
    
    public BackpressureQueue(BlockingQueue<T> queue, 
                           long maxPending, 
                           BackpressureStrategy strategy) {
        this.queue = queue;
        this.maxPending = maxPending;
        this.strategy = strategy;
    }
    
    public boolean offer(T element) throws InterruptedException {
        long current = pendingCount.get();
        
        if (current >= maxPending) {
            switch (strategy) {
                case DROP_OLDEST:
                    queue.poll(); // 移除最旧元素
                    pendingCount.decrementAndGet();
                    break;
                    
                case DROP_NEWEST:
                    return false; // 拒绝新元素
                    
                case BLOCK:
                    // 等待直到有空间
                    while (pendingCount.get() >= maxPending) {
                        Thread.sleep(1);
                    }
                    break;
                    
                case REJECT:
                    throw new IllegalStateException("Queue is full, rejecting element");
            }
        }
        
        boolean offered = queue.offer(element);
        if (offered) {
            pendingCount.incrementAndGet();
        }
        return offered;
    }
    
    public T take() throws InterruptedException {
        T element = queue.take();
        pendingCount.decrementAndGet();
        return element;
    }
    
    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
        T element = queue.poll(timeout, unit);
        if (element != null) {
            pendingCount.decrementAndGet();
        }
        return element;
    }
    
    public long getPendingCount() {
        return pendingCount.get();
    }
    
    public double getUtilization() {
        return (double) pendingCount.get() / maxPending;
    }
}

3. 队列分片技术

// 分片队列实现
public class ShardedQueue<T> {
    private final List<BlockingQueue<T>> shards;
    private final int shardCount;
    private final AtomicInteger roundRobinIndex = new AtomicInteger(0);
    
    public ShardedQueue(int shardCount, int shardCapacity) {
        this.shardCount = shardCount;
        this.shards = new ArrayList<>(shardCount);
        
        for (int i = 0; i < shardCount; i++) {
            shards.add(new ArrayBlockingQueue<>(shardCapacity));
        }
    }
    
    // 基于哈希的分片策略
    public boolean offer(T element) {
        int shardIndex = Math.abs(element.hashCode()) % shardCount;
        return shards.get(shardIndex).offer(element);
    }
    
    // 轮询分片策略
    public boolean offerRoundRobin(T element) {
        int shardIndex = roundRobinIndex.getAndIncrement() % shardCount;
        return shards.get(shardIndex).offer(element);
    }
    
    // 负载均衡的分片策略
    public boolean offerBalanced(T element) {
        // 找到最空闲的分片
        BlockingQueue<T> leastLoadedShard = shards.stream()
            .min(Comparator.comparingInt(Queue::size))
            .orElse(shards.get(0));
        
        return leastLoadedShard.offer(element);
    }
    
    // 从所有分片中获取元素
    public T poll() {
        for (BlockingQueue<T> shard : shards) {
            T element = shard.poll();
            if (element != null) {
                return element;
            }
        }
        return null;
    }
    
    // 阻塞获取(轮询所有分片)
    public T take() throws InterruptedException {
        while (true) {
            for (BlockingQueue<T> shard : shards) {
                T element = shard.poll(10, TimeUnit.MILLISECONDS);
                if (element != null) {
                    return element;
                }
            }
        }
    }
    
    public int size() {
        return shards.stream().mapToInt(Queue::size).sum();
    }
    
    public boolean isEmpty() {
        return shards.stream().allMatch(Queue::isEmpty);
    }
}

故障处理与恢复

1. 队列溢出处理

public class OverflowHandler<T> {
    private final BlockingQueue<T> primaryQueue;
    private final BlockingQueue<T> overflowQueue;
    private final AtomicBoolean overflowMode = new AtomicBoolean(false);
    
    public OverflowHandler(BlockingQueue<T> primaryQueue, 
                          BlockingQueue<T> overflowQueue) {
        this.primaryQueue = primaryQueue;
        this.overflowQueue = overflowQueue;
        
        // 启动恢复检查线程
        startRecoveryCheck();
    }
    
    public boolean offer(T element) {
        if (!overflowMode.get()) {
            boolean offered = primaryQueue.offer(element);
            if (!offered) {
                // 主队列满,切换到溢出模式
                overflowMode.set(true);
                System.out.println("Switching to overflow mode");
                return overflowQueue.offer(element);
            }
            return true;
        } else {
            return overflowQueue.offer(element);
        }
    }
    
    public T take() throws InterruptedException {
        if (overflowMode.get()) {
            // 优先处理溢出队列
            T element = overflowQueue.poll();
            if (element != null) {
                return element;
            }
        }
        return primaryQueue.take();
    }
    
    private void startRecoveryCheck() {
        Thread recoveryThread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    if (overflowMode.get() && primaryQueue.remainingCapacity() > 0) {
                        // 尝试将溢出队列的元素移回主队列
                        drainOverflowQueue();
                        
                        if (overflowQueue.isEmpty()) {
                            overflowMode.set(false);
                            System.out.println("Recovered from overflow mode");
                        }
                    }
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        
        recoveryThread.setDaemon(true);
        recoveryThread.start();
    }
    
    private void drainOverflowQueue() {
        while (!overflowQueue.isEmpty() && primaryQueue.remainingCapacity() > 0) {
            T element = overflowQueue.poll();
            if (element != null && !primaryQueue.offer(element)) {
                // 如果主队列又满了,放回溢出队列
                overflowQueue.offer(element);
                break;
            }
        }
    }
}

2. 持久化队列

// 支持持久化的队列包装器
public class PersistentQueue<T> {
    private final BlockingQueue<T> memoryQueue;
    private final String persistenceFile;
    private final ObjectMapper objectMapper;
    private final Class<T> elementType;
    
    public PersistentQueue(BlockingQueue<T> memoryQueue, 
                          String persistenceFile, 
                          Class<T> elementType) {
        this.memoryQueue = memoryQueue;
        this.persistenceFile = persistenceFile;
        this.elementType = elementType;
        this.objectMapper = new ObjectMapper();
        
        // 启动时恢复数据
        recoverFromDisk();
        
        // 注册关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(this::persistToDisk));
    }
    
    public boolean offer(T element) {
        boolean offered = memoryQueue.offer(element);
        if (offered) {
            // 异步持久化
            CompletableFuture.runAsync(this::persistToDisk);
        }
        return offered;
    }
    
    public T take() throws InterruptedException {
        T element = memoryQueue.take();
        // 异步更新持久化文件
        CompletableFuture.runAsync(this::persistToDisk);
        return element;
    }
    
    private void persistToDisk() {
        try {
            List<T> elements = new ArrayList<>();
            memoryQueue.drainTo(elements);
            
            // 写入文件
            objectMapper.writeValue(new File(persistenceFile), elements);
            
            // 重新放回内存队列
            elements.forEach(memoryQueue::offer);
            
        } catch (IOException e) {
            System.err.println("Failed to persist queue: " + e.getMessage());
        }
    }
    
    private void recoverFromDisk() {
        try {
            File file = new File(persistenceFile);
            if (file.exists()) {
                TypeReference<List<T>> typeRef = new TypeReference<List<T>>() {};
                List<T> elements = objectMapper.readValue(file, typeRef);
                elements.forEach(memoryQueue::offer);
                System.out.println("Recovered " + elements.size() + " elements from disk");
            }
        } catch (IOException e) {
            System.err.println("Failed to recover queue: " + e.getMessage());
        }
    }
    
    public int size() {
        return memoryQueue.size();
    }
}

总结

JUC队列是构建高并发系统的基础设施,每种队列都有其特定的适用场景和性能特征。作为架构师,需要:

1. 核心设计原则

  • 选择合适的队列类型:根据业务需求选择阻塞/非阻塞、有界/无界
  • 合理设计容量:基于生产消费速率和延迟要求计算容量
  • 实施监控告警:监控队列利用率、吞吐量和延迟指标
  • 考虑故障恢复:设计溢出处理和持久化机制

2. 性能优化策略

  • 减少锁竞争:使用分片队列或无锁队列
  • 批量操作:使用drainTo等批量API提高效率
  • 背压控制:防止队列溢出影响系统稳定性
  • 内存管理:合理配置队列容量,避免内存泄漏

3. 架构最佳实践

  • 分层设计:使用多级队列处理不同优先级任务
  • 水平扩展:通过队列分片提高并发能力
  • 故障隔离:使用独立队列隔离不同业务模块
  • 可观测性:建立完善的监控和告警体系

队列不仅仅是数据结构,更是系统架构的重要组成部分。正确理解和使用JUC队列,是构建高性能、高可用分布式系统的关键技能。

文章标签

JavaJUC并发队列BlockingQueue
JUC 中的锁
上一篇

JUC 中的锁

2025-11-19

IO 多路复用
下一篇

IO 多路复用

2025-11-19

冬眠

冬眠

博主

专注于技术、阅读与思考。在这里记录学习、思考与生活。

116
文章
2
分类
关注我
系列:Java 并发包

第 1 篇,共 2 篇

已是第一篇

下一篇

Java 原子类详解

文章目录

目录

  • 概述
  • 队列分类体系
  • 核心队列实现详解
  • 性能对比与选择指南
  • 高级应用模式
  • 故障处理与恢复
  • 总结

相关文章

查看更多
Java 原子类详解

Java 原子类详解

2025-11-19 · 13 min read

JUC 中的锁

JUC 中的锁

2025-11-19 · 11 min read

独占锁和共享锁

独占锁和共享锁

2025-11-19 · 32 min read