概述
Java并发包(java.util.concurrent)中的队列是构建高性能并发系统的核心组件。作为架构师,深入理解这些队列的设计原理、性能特征和适用场景,对于构建可扩展的分布式系统至关重要。本文将从架构师的视角,全面解析JUC队列的设计哲学和实践应用。
队列分类体系
1. 按阻塞特性分类
1.1 阻塞队列(BlockingQueue)
阻塞队列是JUC中最重要的队列类型,提供了线程安全的生产者-消费者模式实现。
核心特性:
- 队列满时,生产者线程阻塞
- 队列空时,消费者线程阻塞
- 提供超时机制和中断响应
接口定义:
public interface BlockingQueue<E> extends Queue<E> {
// 阻塞插入
void put(E e) throws InterruptedException;
// 阻塞获取
E take() throws InterruptedException;
// 超时插入
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 超时获取
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
// 剩余容量
int remainingCapacity();
}
1.2 非阻塞队列
非阻塞队列基于CAS操作实现,提供高性能的无锁并发访问。
代表实现:
- ConcurrentLinkedQueue
- ConcurrentLinkedDeque
2. 按容量特性分类
2.1 有界队列
- ArrayBlockingQueue
- LinkedBlockingQueue(可选有界)
- PriorityBlockingQueue(可选有界)
2.2 无界队列
- LinkedBlockingQueue(默认无界)
- ConcurrentLinkedQueue
- DelayQueue
3. 按数据结构分类
3.1 数组实现
- ArrayBlockingQueue
3.2 链表实现
- LinkedBlockingQueue
- ConcurrentLinkedQueue
3.3 堆实现
- PriorityBlockingQueue
核心队列实现详解
1. ArrayBlockingQueue
1.1 设计特点
- 基于数组的有界阻塞队列
- FIFO(先进先出)顺序
- 支持公平/非公平访问策略
- 固定容量,创建后不可改变
1.2 实现原理
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 存储元素的数组
final Object[] items;
// 取元素的索引
int takeIndex;
// 放元素的索引
int putIndex;
// 队列中的元素数量
int count;
// 主锁
final ReentrantLock lock;
// 等待取元素的条件
private final Condition notEmpty;
// 等待放元素的条件
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
notFull.signal();
return x;
}
}
1.3 性能特征
- 时间复杂度:O(1)的入队和出队操作
- 空间复杂度:O(n),n为队列容量
- 并发性能:单锁设计,读写互斥
- 内存占用:预分配数组,内存占用固定
1.4 适用场景
// 生产者-消费者模式示例
public class ProducerConsumerExample {
private final ArrayBlockingQueue<Task> taskQueue =
new ArrayBlockingQueue<>(1000, true); // 公平队列
// 生产者
public class Producer implements Runnable {
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Task task = generateTask();
taskQueue.put(task); // 阻塞插入
System.out.println("Produced: " + task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private Task generateTask() {
return new Task("Task-" + System.currentTimeMillis());
}
}
// 消费者
public class Consumer implements Runnable {
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Task task = taskQueue.take(); // 阻塞获取
processTask(task);
System.out.println("Consumed: " + task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processTask(Task task) {
// 处理任务逻辑
try {
Thread.sleep(100); // 模拟处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Task {
private final String name;
public Task(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
}
2. LinkedBlockingQueue
2.1 设计特点
- 基于链表的可选有界阻塞队列
- FIFO顺序
- 默认容量为Integer.MAX_VALUE(近似无界)
- 读写分离锁设计
2.2 实现原理
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 节点定义
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
// 队列容量
private final int capacity;
// 当前元素数量
private final AtomicInteger count = new AtomicInteger();
// 头节点
transient Node<E> head;
// 尾节点
private transient Node<E> last;
// 取元素锁
private final ReentrantLock takeLock = new ReentrantLock();
// 非空条件
private final Condition notEmpty = takeLock.newCondition();
// 放元素锁
private final ReentrantLock putLock = new ReentrantLock();
// 非满条件
private final Condition notFull = putLock.newCondition();
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
}
2.3 性能优势
- 读写分离:put和take操作使用不同的锁
- 更高并发:读写可以并行进行
- 动态扩容:基于链表,按需分配内存
3. ConcurrentLinkedQueue
3.1 设计特点
- 基于链表的无界非阻塞队列
- 使用CAS操作实现无锁并发
- FIFO顺序
- 高性能,适合高并发场景
3.2 实现原理
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe机制
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是最后一个节点
if (p.casNext(null, newNode)) {
// CAS成功,更新tail
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// CAS失败,重试
}
else if (p == q)
// 遇到哨兵节点,重新开始
p = (t != (t = tail)) ? t : head;
else
// 检查tail是否落后,如果是则推进
p = (p != t && t != (t = tail)) ? t : q;
}
}
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// CAS成功获取元素
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
private boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
private boolean casHead(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
// Unsafe机制
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentLinkedQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
} catch (Exception e) {
throw new Error(e);
}
}
}
3.3 性能特征
- 无锁设计:基于CAS操作,避免线程阻塞
- 高并发:支持大量并发读写
- 内存效率:按需分配,无预分配开销
- 弱一致性:size()方法不保证实时准确性
4. PriorityBlockingQueue
4.1 设计特点
- 基于优先级堆的无界阻塞队列
- 元素按优先级排序
- 支持自然排序或自定义Comparator
- 线程安全的优先级队列
4.2 实现原理
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 默认初始容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 最大数组大小
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 存储元素的数组(二叉堆)
private transient Object[] queue;
// 队列中的元素数量
private transient int size;
// 比较器
private transient Comparator<? super E> comparator;
// 主锁
private final ReentrantLock lock;
// 非空条件
private final Condition notEmpty;
// 扩容时的自旋锁
private transient volatile int allocationSpinLock;
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
public void put(E e) {
offer(e); // 永不阻塞
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
// 上浮操作(插入时维护堆性质)
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
// 下沉操作(删除时维护堆性质)
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>) x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
// 扩容操作
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // 释放锁,允许其他操作
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
}
4.3 应用场景
// 任务调度系统示例
public class TaskScheduler {
private final PriorityBlockingQueue<ScheduledTask> taskQueue =
new PriorityBlockingQueue<>();
public static class ScheduledTask implements Comparable<ScheduledTask> {
private final String name;
private final int priority;
private final long executeTime;
public ScheduledTask(String name, int priority, long executeTime) {
this.name = name;
this.priority = priority;
this.executeTime = executeTime;
}
@Override
public int compareTo(ScheduledTask other) {
// 优先级高的先执行,优先级相同则按执行时间排序
int result = Integer.compare(other.priority, this.priority);
if (result == 0) {
result = Long.compare(this.executeTime, other.executeTime);
}
return result;
}
public void execute() {
System.out.println("Executing task: " + name +
", priority: " + priority +
", time: " + executeTime);
}
// getters...
}
public void scheduleTask(String name, int priority, long delay) {
long executeTime = System.currentTimeMillis() + delay;
ScheduledTask task = new ScheduledTask(name, priority, executeTime);
taskQueue.offer(task);
}
public void startScheduler() {
Thread scheduler = new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
ScheduledTask task = taskQueue.take();
// 检查是否到了执行时间
long currentTime = System.currentTimeMillis();
if (task.executeTime > currentTime) {
// 还没到执行时间,重新放回队列
taskQueue.offer(task);
Thread.sleep(100); // 短暂等待
continue;
}
// 执行任务
task.execute();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
scheduler.setDaemon(true);
scheduler.start();
}
}
5. DelayQueue
5.1 设计特点
- 基于PriorityQueue的延迟队列
- 元素必须实现Delayed接口
- 只有到期的元素才能被取出
- 适用于定时任务和缓存过期
5.2 实现原理
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 等待队列头元素的线程
private Thread leader = null;
// 有新元素到达队列头时的条件
private final Condition available = lock.newCondition();
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
}
5.3 应用示例
// 缓存过期管理
public class CacheManager<K, V> {
private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
private final DelayQueue<DelayedCacheEntry<K>> delayQueue = new DelayQueue<>();
public static class DelayedCacheEntry<K> implements Delayed {
private final K key;
private final long expireTime;
public DelayedCacheEntry(K key, long ttl, TimeUnit unit) {
this.key = key;
this.expireTime = System.nanoTime() + unit.toNanos(ttl);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.nanoTime(), NANOSECONDS);
}
@Override
public int compareTo(Delayed other) {
if (other == this) return 0;
if (other instanceof DelayedCacheEntry) {
DelayedCacheEntry<?> otherEntry = (DelayedCacheEntry<?>) other;
return Long.compare(this.expireTime, otherEntry.expireTime);
}
return Long.compare(getDelay(NANOSECONDS), other.getDelay(NANOSECONDS));
}
public K getKey() {
return key;
}
}
public CacheManager() {
// 启动清理线程
Thread cleanupThread = new Thread(this::cleanup);
cleanupThread.setDaemon(true);
cleanupThread.start();
}
public void put(K key, V value, long ttl, TimeUnit unit) {
cache.put(key, value);
delayQueue.offer(new DelayedCacheEntry<>(key, ttl, unit));
}
public V get(K key) {
return cache.get(key);
}
private void cleanup() {
try {
while (!Thread.currentThread().isInterrupted()) {
DelayedCacheEntry<K> expiredEntry = delayQueue.take();
cache.remove(expiredEntry.getKey());
System.out.println("Removed expired cache entry: " + expiredEntry.getKey());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public int size() {
return cache.size();
}
}
性能对比与选择指南
1. 性能测试数据
| 队列类型 | 吞吐量(ops/sec) | 延迟(μs) | 内存占用 | 适用场景 |
|---|---|---|---|---|
| ArrayBlockingQueue | 500万 | 10-50 | 固定 | 有界生产消费 |
| LinkedBlockingQueue | 300万 | 20-80 | 动态 | 大容量缓冲 |
| ConcurrentLinkedQueue | 1000万 | 5-20 | 动态 | 高并发无阻塞 |
| PriorityBlockingQueue | 200万 | 50-200 | 动态 | 优先级调度 |
| DelayQueue | 100万 | 100-500 | 动态 | 延迟处理 |
2. 选择决策树
需要阻塞语义?
├─ 是
│ ├─ 需要优先级?
│ │ ├─ 是 → PriorityBlockingQueue
│ │ └─ 否
│ │ ├─ 需要延迟?
│ │ │ ├─ 是 → DelayQueue
│ │ │ └─ 否
│ │ │ ├─ 固定容量?
│ │ │ │ ├─ 是 → ArrayBlockingQueue
│ │ │ │ └─ 否 → LinkedBlockingQueue
└─ 否 → ConcurrentLinkedQueue
3. 架构设计原则
3.1 容量规划
// 容量计算公式
public class QueueCapacityCalculator {
/**
* 计算队列容量
* @param producerRate 生产速率 (items/second)
* @param consumerRate 消费速率 (items/second)
* @param maxLatency 最大延迟 (seconds)
* @param safetyFactor 安全系数 (1.2-2.0)
* @return 建议容量
*/
public static int calculateCapacity(double producerRate,
double consumerRate,
double maxLatency,
double safetyFactor) {
if (consumerRate >= producerRate) {
// 消费能力充足,基于延迟计算
return (int) Math.ceil(producerRate * maxLatency * safetyFactor);
} else {
// 消费能力不足,需要更大容量
double backlogRate = producerRate - consumerRate;
double baseCapacity = producerRate * maxLatency;
double backlogCapacity = backlogRate * maxLatency * 2; // 额外缓冲
return (int) Math.ceil((baseCapacity + backlogCapacity) * safetyFactor);
}
}
// 使用示例
public static void main(String[] args) {
// 生产速率: 1000 items/sec
// 消费速率: 800 items/sec
// 最大延迟: 5 seconds
// 安全系数: 1.5
int capacity = calculateCapacity(1000, 800, 5, 1.5);
System.out.println("Recommended capacity: " + capacity);
}
}
3.2 监控指标
public class QueueMonitor {
private final BlockingQueue<?> queue;
private final String queueName;
private final ScheduledExecutorService scheduler;
// 监控指标
private volatile long totalOffered = 0;
private volatile long totalPolled = 0;
private volatile long totalRejected = 0;
private volatile long maxSize = 0;
public QueueMonitor(BlockingQueue<?> queue, String queueName) {
this.queue = queue;
this.queueName = queueName;
this.scheduler = Executors.newScheduledThreadPool(1);
startMonitoring();
}
private void startMonitoring() {
scheduler.scheduleAtFixedRate(this::collectMetrics, 0, 10, TimeUnit.SECONDS);
}
private void collectMetrics() {
int currentSize = queue.size();
maxSize = Math.max(maxSize, currentSize);
// 计算利用率
double utilization = 0.0;
if (queue instanceof ArrayBlockingQueue) {
int capacity = queue.remainingCapacity() + currentSize;
utilization = (double) currentSize / capacity;
}
// 输出监控信息
System.out.printf("Queue[%s] - Size: %d, Max: %d, Utilization: %.2f%%, " +
"Offered: %d, Polled: %d, Rejected: %d%n",
queueName, currentSize, maxSize, utilization * 100,
totalOffered, totalPolled, totalRejected);
// 告警检查
if (utilization > 0.8) {
System.err.println("WARNING: Queue " + queueName + " utilization is high: " +
String.format("%.2f%%", utilization * 100));
}
}
public void recordOffer() {
totalOffered++;
}
public void recordPoll() {
totalPolled++;
}
public void recordReject() {
totalRejected++;
}
public void shutdown() {
scheduler.shutdown();
}
}
高级应用模式
1. 多级队列架构
// 多级优先级队列系统
public class MultiLevelQueueSystem {
private final Map<Priority, BlockingQueue<Task>> queues;
private final ExecutorService executorService;
private volatile boolean running = true;
public enum Priority {
HIGH(1), MEDIUM(2), LOW(3);
private final int level;
Priority(int level) {
this.level = level;
}
public int getLevel() {
return level;
}
}
public MultiLevelQueueSystem(int workerThreads) {
this.queues = new EnumMap<>(Priority.class);
this.queues.put(Priority.HIGH, new ArrayBlockingQueue<>(1000));
this.queues.put(Priority.MEDIUM, new ArrayBlockingQueue<>(2000));
this.queues.put(Priority.LOW, new LinkedBlockingQueue<>());
this.executorService = Executors.newFixedThreadPool(workerThreads);
startWorkers();
}
private void startWorkers() {
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
executorService.submit(this::workerLoop);
}
}
private void workerLoop() {
while (running) {
try {
Task task = getNextTask();
if (task != null) {
processTask(task);
} else {
Thread.sleep(10); // 短暂休眠
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private Task getNextTask() throws InterruptedException {
// 按优先级顺序检查队列
for (Priority priority : Priority.values()) {
BlockingQueue<Task> queue = queues.get(priority);
Task task = queue.poll(100, TimeUnit.MILLISECONDS);
if (task != null) {
return task;
}
}
return null;
}
public boolean submitTask(Task task) {
BlockingQueue<Task> queue = queues.get(task.getPriority());
return queue.offer(task);
}
private void processTask(Task task) {
try {
task.execute();
} catch (Exception e) {
System.err.println("Task execution failed: " + e.getMessage());
}
}
public void shutdown() {
running = false;
executorService.shutdown();
}
public static class Task {
private final String name;
private final Priority priority;
private final Runnable action;
public Task(String name, Priority priority, Runnable action) {
this.name = name;
this.priority = priority;
this.action = action;
}
public void execute() {
System.out.println("Executing task: " + name + " (" + priority + ")");
action.run();
}
public Priority getPriority() {
return priority;
}
}
}
2. 背压控制机制
// 带背压控制的队列包装器
public class BackpressureQueue<T> {
private final BlockingQueue<T> queue;
private final AtomicLong pendingCount = new AtomicLong(0);
private final long maxPending;
private final BackpressureStrategy strategy;
public enum BackpressureStrategy {
DROP_OLDEST, // 丢弃最旧的元素
DROP_NEWEST, // 丢弃最新的元素
BLOCK, // 阻塞生产者
REJECT // 拒绝新元素
}
public BackpressureQueue(BlockingQueue<T> queue,
long maxPending,
BackpressureStrategy strategy) {
this.queue = queue;
this.maxPending = maxPending;
this.strategy = strategy;
}
public boolean offer(T element) throws InterruptedException {
long current = pendingCount.get();
if (current >= maxPending) {
switch (strategy) {
case DROP_OLDEST:
queue.poll(); // 移除最旧元素
pendingCount.decrementAndGet();
break;
case DROP_NEWEST:
return false; // 拒绝新元素
case BLOCK:
// 等待直到有空间
while (pendingCount.get() >= maxPending) {
Thread.sleep(1);
}
break;
case REJECT:
throw new IllegalStateException("Queue is full, rejecting element");
}
}
boolean offered = queue.offer(element);
if (offered) {
pendingCount.incrementAndGet();
}
return offered;
}
public T take() throws InterruptedException {
T element = queue.take();
pendingCount.decrementAndGet();
return element;
}
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
T element = queue.poll(timeout, unit);
if (element != null) {
pendingCount.decrementAndGet();
}
return element;
}
public long getPendingCount() {
return pendingCount.get();
}
public double getUtilization() {
return (double) pendingCount.get() / maxPending;
}
}
3. 队列分片技术
// 分片队列实现
public class ShardedQueue<T> {
private final List<BlockingQueue<T>> shards;
private final int shardCount;
private final AtomicInteger roundRobinIndex = new AtomicInteger(0);
public ShardedQueue(int shardCount, int shardCapacity) {
this.shardCount = shardCount;
this.shards = new ArrayList<>(shardCount);
for (int i = 0; i < shardCount; i++) {
shards.add(new ArrayBlockingQueue<>(shardCapacity));
}
}
// 基于哈希的分片策略
public boolean offer(T element) {
int shardIndex = Math.abs(element.hashCode()) % shardCount;
return shards.get(shardIndex).offer(element);
}
// 轮询分片策略
public boolean offerRoundRobin(T element) {
int shardIndex = roundRobinIndex.getAndIncrement() % shardCount;
return shards.get(shardIndex).offer(element);
}
// 负载均衡的分片策略
public boolean offerBalanced(T element) {
// 找到最空闲的分片
BlockingQueue<T> leastLoadedShard = shards.stream()
.min(Comparator.comparingInt(Queue::size))
.orElse(shards.get(0));
return leastLoadedShard.offer(element);
}
// 从所有分片中获取元素
public T poll() {
for (BlockingQueue<T> shard : shards) {
T element = shard.poll();
if (element != null) {
return element;
}
}
return null;
}
// 阻塞获取(轮询所有分片)
public T take() throws InterruptedException {
while (true) {
for (BlockingQueue<T> shard : shards) {
T element = shard.poll(10, TimeUnit.MILLISECONDS);
if (element != null) {
return element;
}
}
}
}
public int size() {
return shards.stream().mapToInt(Queue::size).sum();
}
public boolean isEmpty() {
return shards.stream().allMatch(Queue::isEmpty);
}
}
故障处理与恢复
1. 队列溢出处理
public class OverflowHandler<T> {
private final BlockingQueue<T> primaryQueue;
private final BlockingQueue<T> overflowQueue;
private final AtomicBoolean overflowMode = new AtomicBoolean(false);
public OverflowHandler(BlockingQueue<T> primaryQueue,
BlockingQueue<T> overflowQueue) {
this.primaryQueue = primaryQueue;
this.overflowQueue = overflowQueue;
// 启动恢复检查线程
startRecoveryCheck();
}
public boolean offer(T element) {
if (!overflowMode.get()) {
boolean offered = primaryQueue.offer(element);
if (!offered) {
// 主队列满,切换到溢出模式
overflowMode.set(true);
System.out.println("Switching to overflow mode");
return overflowQueue.offer(element);
}
return true;
} else {
return overflowQueue.offer(element);
}
}
public T take() throws InterruptedException {
if (overflowMode.get()) {
// 优先处理溢出队列
T element = overflowQueue.poll();
if (element != null) {
return element;
}
}
return primaryQueue.take();
}
private void startRecoveryCheck() {
Thread recoveryThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
if (overflowMode.get() && primaryQueue.remainingCapacity() > 0) {
// 尝试将溢出队列的元素移回主队列
drainOverflowQueue();
if (overflowQueue.isEmpty()) {
overflowMode.set(false);
System.out.println("Recovered from overflow mode");
}
}
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
recoveryThread.setDaemon(true);
recoveryThread.start();
}
private void drainOverflowQueue() {
while (!overflowQueue.isEmpty() && primaryQueue.remainingCapacity() > 0) {
T element = overflowQueue.poll();
if (element != null && !primaryQueue.offer(element)) {
// 如果主队列又满了,放回溢出队列
overflowQueue.offer(element);
break;
}
}
}
}
2. 持久化队列
// 支持持久化的队列包装器
public class PersistentQueue<T> {
private final BlockingQueue<T> memoryQueue;
private final String persistenceFile;
private final ObjectMapper objectMapper;
private final Class<T> elementType;
public PersistentQueue(BlockingQueue<T> memoryQueue,
String persistenceFile,
Class<T> elementType) {
this.memoryQueue = memoryQueue;
this.persistenceFile = persistenceFile;
this.elementType = elementType;
this.objectMapper = new ObjectMapper();
// 启动时恢复数据
recoverFromDisk();
// 注册关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(this::persistToDisk));
}
public boolean offer(T element) {
boolean offered = memoryQueue.offer(element);
if (offered) {
// 异步持久化
CompletableFuture.runAsync(this::persistToDisk);
}
return offered;
}
public T take() throws InterruptedException {
T element = memoryQueue.take();
// 异步更新持久化文件
CompletableFuture.runAsync(this::persistToDisk);
return element;
}
private void persistToDisk() {
try {
List<T> elements = new ArrayList<>();
memoryQueue.drainTo(elements);
// 写入文件
objectMapper.writeValue(new File(persistenceFile), elements);
// 重新放回内存队列
elements.forEach(memoryQueue::offer);
} catch (IOException e) {
System.err.println("Failed to persist queue: " + e.getMessage());
}
}
private void recoverFromDisk() {
try {
File file = new File(persistenceFile);
if (file.exists()) {
TypeReference<List<T>> typeRef = new TypeReference<List<T>>() {};
List<T> elements = objectMapper.readValue(file, typeRef);
elements.forEach(memoryQueue::offer);
System.out.println("Recovered " + elements.size() + " elements from disk");
}
} catch (IOException e) {
System.err.println("Failed to recover queue: " + e.getMessage());
}
}
public int size() {
return memoryQueue.size();
}
}
总结
JUC队列是构建高并发系统的基础设施,每种队列都有其特定的适用场景和性能特征。作为架构师,需要:
1. 核心设计原则
- 选择合适的队列类型:根据业务需求选择阻塞/非阻塞、有界/无界
- 合理设计容量:基于生产消费速率和延迟要求计算容量
- 实施监控告警:监控队列利用率、吞吐量和延迟指标
- 考虑故障恢复:设计溢出处理和持久化机制
2. 性能优化策略
- 减少锁竞争:使用分片队列或无锁队列
- 批量操作:使用drainTo等批量API提高效率
- 背压控制:防止队列溢出影响系统稳定性
- 内存管理:合理配置队列容量,避免内存泄漏
3. 架构最佳实践
- 分层设计:使用多级队列处理不同优先级任务
- 水平扩展:通过队列分片提高并发能力
- 故障隔离:使用独立队列隔离不同业务模块
- 可观测性:建立完善的监控和告警体系
队列不仅仅是数据结构,更是系统架构的重要组成部分。正确理解和使用JUC队列,是构建高性能、高可用分布式系统的关键技能。
文章标签
冬眠
博主专注于技术、阅读与思考。在这里记录学习、思考与生活。
系列:Java 并发包
第 1 篇,共 2 篇
