ConcurrentHashMap概述
定义和特点
ConcurrentHashMap是Java并发包中提供的线程安全的哈希表实现,它在保证线程安全的同时,尽可能地提高并发性能。
主要特点:
- 线程安全,支持高并发访问
- 不允许null键和null值
- 弱一致性迭代器(fail-safe)
- 分段锁机制(JDK1.7)或CAS+synchronized(JDK1.8)
- 高效的并发读写操作
继承关系
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable
与其他Map实现的对比
| 特性 | HashMap | HashTable | ConcurrentHashMap |
|---|---|---|---|
| 线程安全 | 否 | 是 | 是 |
| 锁机制 | 无 | 全表锁 | 分段锁/CAS+synchronized |
| null值 | 允许 | 不允许 | 不允许 |
| 性能 | 高(单线程) | 低(同步开销大) | 高(并发优化) |
| 迭代器 | fail-fast | fail-fast | fail-safe |
| 适用场景 | 单线程 | 低并发 | 高并发 |
适用场景
- 高并发缓存:多线程频繁读写的缓存系统
- 计数器:并发环境下的统计计数
- 配置管理:多线程共享的配置信息存储
- 会话管理:Web应用中的用户会话存储
- 任务调度:并发任务的状态管理
底层数据结构
JDK1.7:分段锁机制
ConcurrentHashMap
├── Segment[0] (继承ReentrantLock)
│ └── HashEntry[] table
│ ├── HashEntry -> HashEntry -> null
│ ├── HashEntry -> null
│ └── ...
├── Segment[1]
│ └── HashEntry[] table
└── ...
核心组件:
- Segment数组:每个Segment管理一部分桶
- ReentrantLock:每个Segment继承ReentrantLock提供锁机制
- HashEntry链表:存储实际的键值对数据
特点:
- 默认16个Segment,支持16个线程并发写入
- 读操作无需加锁(volatile保证可见性)
- 写操作只锁定对应的Segment
JDK1.8:CAS + synchronized
ConcurrentHashMap
└── Node[] table
├── Node -> Node -> Node -> null (链表)
├── TreeBin -> TreeNode (红黑树)
├── ForwardingNode (扩容标记)
└── ReservationNode (占位节点)
核心组件:
- Node数组:直接使用Node数组存储数据
- CAS操作:无锁的原子操作
- synchronized:细粒度锁,锁定数组的具体位置
- 红黑树:链表长度>=8时转换为红黑树
优化点:
- 取消Segment,减少内存开销
- 更细粒度的锁控制
- 支持更高的并发度
- 引入红黑树优化查询性能
核心源码分析
重要字段(JDK1.8)
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> {
// 最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认初始容量
private static final int DEFAULT_CAPACITY = 16;
// 默认负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
// 红黑树转链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 转红黑树时数组的最小长度
static final int MIN_TREEIFY_CAPACITY = 64;
// 存储数据的数组
transient volatile Node<K,V>[] table;
// 扩容时的新数组
private transient volatile Node<K,V>[] nextTable;
// 控制标识符,用于控制table的初始化和扩容
private transient volatile int sizeCtl;
// 元素个数,使用LongAdder实现
private transient volatile long baseCount;
}
Node节点结构
// 普通节点
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val; // volatile保证可见性
volatile Node<K,V> next; // volatile保证可见性
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
}
// 红黑树节点
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev;
boolean red;
}
// 扩容时的转发节点
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
put方法详解
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 1. 计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
// 2. 自旋插入
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 3. 如果table为空,进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 4. 如果目标位置为空,使用CAS插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // 插入成功,跳出循环
}
// 5. 如果正在扩容,协助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 6. 发生hash冲突,需要加锁处理
else {
V oldVal = null;
synchronized (f) { // 锁定头节点
if (tabAt(tab, i) == f) {
// 链表处理
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到相同key,更新value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 到达链表尾部,插入新节点
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 红黑树处理
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 7. 检查是否需要转换为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 8. 增加计数,检查是否需要扩容
addCount(1L, binCount);
return null;
}
put方法执行流程:
- 检查key和value是否为null(不允许null)
- 计算hash值
- 自旋尝试插入:
- 如果table未初始化,先初始化
- 如果目标位置为空,使用CAS插入
- 如果正在扩容,协助扩容
- 如果发生冲突,使用synchronized锁定头节点进行插入
- 检查链表长度,必要时转换为红黑树
- 更新元素计数,检查是否需要扩容
get方法详解
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 1. 计算hash值
int h = spread(key.hashCode());
// 2. 检查table和目标位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 3. 检查第一个节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 4. 如果是特殊节点(红黑树或ForwardingNode)
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 5. 遍历链表查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get方法特点:
- 无锁读取:利用volatile保证可见性
- 高效查找:支持链表和红黑树两种结构
- 扩容兼容:通过ForwardingNode处理扩容期间的查找
扩容机制(transfer方法)
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 1. 计算每个线程处理的桶数量
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// 2. 初始化新数组
if (nextTab == null) {
try {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
// 3. 多线程协作扩容
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 获取下一个需要处理的桶
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 处理具体的桶
if (i < 0 || i >= n || i + n >= nextn) {
// 扩容完成处理
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // 已经被处理
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
// 链表分割
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
// 红黑树分割
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
扩容机制特点:
- 多线程协作:多个线程同时参与扩容过程
- 分段处理:每个线程处理一部分桶,提高效率
- ForwardingNode:标记已处理的桶,支持扩容期间的查找
- 链表分割:类似HashMap,根据hash值重新分布元素
线程安全机制
1. volatile关键字
// 保证数组引用的可见性
transient volatile Node<K,V>[] table;
// 保证控制字段的可见性
private transient volatile int sizeCtl;
// 保证节点值的可见性
static class Node<K,V> {
volatile V val;
volatile Node<K,V> next;
}
作用:
- 保证内存可见性
- 禁止指令重排序
- 实现无锁读取
2. CAS操作
// 原子地设置数组元素
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// 原子地获取数组元素
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int index) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)index << ASHIFT) + ABASE);
}
// 原子地设置数组元素
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
优势:
- 无锁操作,性能高
- 避免线程阻塞
- 减少上下文切换
3. synchronized锁
// 锁定链表或红黑树的头节点
synchronized (f) {
if (tabAt(tab, i) == f) {
// 执行插入或更新操作
}
}
特点:
- 细粒度锁定
- 只锁定冲突的桶
- JVM优化的轻量级锁
4. 分段锁思想(JDK1.7)
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
transient int count;
transient int modCount;
transient int threshold;
final float loadFactor;
}
原理:
- 将数据分成多个段(Segment)
- 每个段独立加锁
- 不同段之间可以并发访问
- 默认16个段,支持16个线程并发写入
JDK版本差异
JDK1.7 vs JDK1.8 对比
| 特性 | JDK1.7 | JDK1.8 |
|---|---|---|
| 锁机制 | Segment分段锁 | CAS + synchronized |
| 数据结构 | Segment + HashEntry[] | Node[] + 红黑树 |
| 并发度 | 固定(默认16) | 动态(数组长度) |
| 内存占用 | 较高(Segment开销) | 较低 |
| 扩容方式 | 单线程扩容 | 多线程协作扩容 |
| 查询性能 | O(n) | O(log n) |
详细分析
1. 锁机制优化
JDK1.7分段锁问题:
- Segment数量固定,限制了并发度
- 即使数据分布不均,也无法动态调整
- Segment继承ReentrantLock,内存开销大
JDK1.8优化方案:
- 取消Segment,直接在Node数组上加锁
- 并发度等于数组长度,可动态扩展
- 使用内置锁synchronized,JVM优化更好
2. 数据结构改进
JDK1.7结构:
ConcurrentHashMap {
Segment[] segments;
}
Segment {
HashEntry[] table;
ReentrantLock lock;
}
JDK1.8结构:
ConcurrentHashMap {
Node[] table;
// 直接管理Node数组
}
3. 扩容机制升级
JDK1.7扩容:
- 每个Segment独立扩容
- 单线程执行,效率较低
- 扩容期间该Segment不可用
JDK1.8扩容:
- 多线程协作扩容
- 分段处理,提高效率
- 扩容期间仍可提供服务
性能分析与优化
性能特点
1. 读操作性能
// 读操作无需加锁,性能接近HashMap
public V get(Object key) {
// 利用volatile保证可见性
// 无锁读取,高并发性能优秀
}
优势:
- 无锁读取,并发性能好
- volatile保证数据一致性
- 支持弱一致性迭代
2. 写操作性能
// 写操作使用CAS + synchronized
public V put(K key, V value) {
// 1. CAS尝试无锁插入
// 2. 冲突时使用synchronized细粒度锁
// 3. 多线程协作扩容
}
特点:
- CAS操作避免不必要的加锁
- synchronized细粒度锁定
- 锁竞争小,性能好
性能测试对比
// 性能测试示例
public class ConcurrentHashMapPerformanceTest {
private static final int THREAD_COUNT = 16;
private static final int OPERATIONS = 1000000;
public static void testConcurrentHashMap() {
ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
// 多线程写入测试
long startTime = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
executor.submit(() -> {
try {
for (int j = 0; j < OPERATIONS / THREAD_COUNT; j++) {
map.put(threadId * OPERATIONS + j, "value" + j);
}
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
long endTime = System.currentTimeMillis();
System.out.println("ConcurrentHashMap写入耗时: " + (endTime - startTime) + "ms");
executor.shutdown();
}
}
优化建议
1. 合理设置初始容量
// 根据预期数据量设置初始容量
int expectedSize = 10000;
int initialCapacity = (int) (expectedSize / 0.75f) + 1;
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(initialCapacity);
2. 选择合适的并发级别(JDK1.7)
// JDK1.7中可以设置并发级别
int concurrencyLevel = Runtime.getRuntime().availableProcessors();
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);
3. 避免热点数据
// 避免大量线程访问相同的key
// 可以使用分片技术分散热点
public class ShardedCounter {
private final ConcurrentHashMap<String, AtomicLong>[] shards;
private final int shardCount;
public ShardedCounter(int shardCount) {
this.shardCount = shardCount;
this.shards = new ConcurrentHashMap[shardCount];
for (int i = 0; i < shardCount; i++) {
shards[i] = new ConcurrentHashMap<>();
}
}
public void increment(String key) {
int shard = Math.abs(key.hashCode()) % shardCount;
shards[shard].computeIfAbsent(key, k -> new AtomicLong(0)).incrementAndGet();
}
}
4. 使用批量操作
// 使用putAll进行批量插入
Map<String, String> batch = new HashMap<>();
for (int i = 0; i < 1000; i++) {
batch.put("key" + i, "value" + i);
}
concurrentMap.putAll(batch);
// 使用compute系列方法进行原子更新
concurrentMap.compute("counter", (k, v) -> v == null ? 1 : Integer.parseInt(v) + 1);
常见面试题
基础概念类
Q1: ConcurrentHashMap的实现原理是什么?
答案: ConcurrentHashMap通过分段锁机制实现线程安全的哈希表:
JDK1.7实现:
- 使用Segment数组,每个Segment继承ReentrantLock
- 不同Segment之间可以并发访问
- 读操作无需加锁,写操作只锁定对应Segment
- 默认16个Segment,支持16个线程并发写入
JDK1.8优化:
- 取消Segment,直接使用Node数组
- 采用CAS + synchronized实现线程安全
- 读操作无锁,写操作细粒度锁定
- 支持多线程协作扩容
Q2: ConcurrentHashMap为什么不允许null键和null值?
答案: 主要原因是二义性问题:
-
get()方法的二义性:
V value = map.get(key); if (value == null) { // 无法区分: // 1. key不存在 // 2. key存在但value为null } -
并发环境下的问题:
- 在单线程HashMap中可以用containsKey()区分
- 在并发环境下,containsKey()和get()之间状态可能改变
- 无法保证原子性判断
-
设计一致性:
- ConcurrentHashMap设计为高并发场景
- 避免歧义,提供明确的语义
Q3: ConcurrentHashMap的size()方法是如何实现的?
答案:
JDK1.7实现:
public int size() {
final Segment<K,V>[] segments = this.segments;
long sum = 0;
long check = 0;
int[] mc = new int[segments.length];
// 尝试无锁计算
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
check = 0;
sum = 0;
for (int i = 0; i < segments.length; ++i) {
sum += segments[i].count;
mc[i] = segments[i].modCount;
}
for (int i = 0; i < segments.length; ++i) {
check += segments[i].modCount;
}
if (check == sum) // 无修改,返回结果
break;
}
// 加锁计算
if (check != sum) {
sum = 0;
for (Segment<K,V> segment : segments)
segment.lock();
try {
for (Segment<K,V> segment : segments)
sum += segment.count;
} finally {
for (Segment<K,V> segment : segments)
segment.unlock();
}
}
return (int) sum;
}
JDK1.8实现:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
特点:
- JDK1.8使用类似LongAdder的计数方式
- 分散计数,减少竞争
- 结果是弱一致性的近似值
Q4: ConcurrentHashMap与Collections.synchronizedMap()的区别?
答案:
| 特性 | ConcurrentHashMap | Collections.synchronizedMap() |
|---|---|---|
| 锁粒度 | 细粒度(桶级别) | 粗粒度(整个Map) |
| 并发性能 | 高 | 低 |
| 读操作 | 无锁 | 需要同步 |
| 迭代器 | fail-safe | fail-fast |
| null值 | 不允许 | 允许 |
| 扩容 | 多线程协作 | 单线程 |
// Collections.synchronizedMap实现
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
return new SynchronizedMap<>(m);
}
private static class SynchronizedMap<K,V> implements Map<K,V> {
private final Map<K,V> m;
final Object mutex;
public V get(Object key) {
synchronized (mutex) { return m.get(key); } // 读也需要同步
}
public V put(K key, V value) {
synchronized (mutex) { return m.put(key, value); }
}
}
源码实现类
Q5: ConcurrentHashMap的put操作是如何保证线程安全的?
答案: ConcurrentHashMap的put操作通过多层机制保证线程安全:
-
CAS无锁插入:
// 如果目标位置为空,使用CAS插入 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; -
synchronized细粒度锁:
// 发生冲突时,锁定头节点 synchronized (f) { if (tabAt(tab, i) == f) { // 执行链表或红黑树操作 } } -
volatile保证可见性:
volatile V val; volatile Node<K,V> next; -
自旋重试:
for (Node<K,V>[] tab = table;;) { // 自旋直到插入成功 }
Q6: ConcurrentHashMap如何实现无锁读取?
答案: 通过volatile关键字和内存屏障实现:
-
volatile数组引用:
transient volatile Node<K,V>[] table; -
volatile节点字段:
static class Node<K,V> { volatile V val; volatile Node<K,V> next; } -
Unsafe原子操作:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int index) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)index << ASHIFT) + ABASE); }
原理:
- volatile保证写操作对读操作立即可见
- happens-before关系确保内存一致性
- 读操作不需要加锁,性能接近HashMap
Q7: ConcurrentHashMap的扩容过程是怎样的?
答案: ConcurrentHashMap支持多线程协作扩容:
-
触发扩容:
if (++size > threshold) addCount(1L, binCount); // 可能触发扩容 -
多线程协作:
// 每个线程处理一部分桶 int stride = (NCPU > 1) ? (n >>> 3) / NCPU : n; -
ForwardingNode标记:
// 已处理的桶放置ForwardingNode setTabAt(tab, i, new ForwardingNode<K,V>(nextTab)); -
分段处理:
- 将旧数组分成多个段
- 每个线程负责一个段的迁移
- 使用CAS获取待处理的段
-
扩容期间的读写:
- 读操作:通过ForwardingNode转发到新数组
- 写操作:协助扩容或等待扩容完成
Q8: 红黑树在ConcurrentHashMap中是如何使用的?
答案:
-
转换条件:
// 链表长度>=8且数组长度>=64时转换 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); -
TreeBin包装器:
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; } -
读写锁机制:
// 读操作使用乐观锁 // 写操作使用悲观锁 static final int WRITER = 1; // 写锁 static final int WAITER = 2; // 等待锁 static final int READER = 4; // 读锁 -
线程安全保证:
- 读操作:乐观读,无锁访问
- 写操作:获取写锁,独占访问
- 冲突处理:读写冲突时降级为链表遍历
性能优化类
Q9: 如何选择ConcurrentHashMap的初始容量?
答案:
-
基本原则:
// 初始容量 = 预期元素数量 / 负载因子 int expectedSize = 10000; int initialCapacity = (int) (expectedSize / 0.75f) + 1; ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(initialCapacity); -
考虑因素:
- 预期数据量:避免频繁扩容
- 并发度:JDK1.8中容量影响并发度
- 内存限制:过大的初始容量浪费内存
-
最佳实践:
// 根据业务场景调整 public class ConcurrentHashMapFactory { public static <K, V> ConcurrentHashMap<K, V> create(int expectedSize) { // 计算合适的初始容量 int capacity = 1; while (capacity < expectedSize / 0.75f) { capacity <<= 1; } return new ConcurrentHashMap<>(capacity); } }
Q10: ConcurrentHashMap在高并发场景下的性能瓶颈是什么?
答案:
-
热点数据竞争:
// 大量线程访问相同key会导致锁竞争 // 解决方案:数据分片 public class ShardedMap<K, V> { private final ConcurrentHashMap<K, V>[] shards; private final int shardCount; public V get(K key) { return getShardMap(key).get(key); } private ConcurrentHashMap<K, V> getShardMap(K key) { return shards[Math.abs(key.hashCode()) % shardCount]; } } -
扩容开销:
- 多线程协作扩容虽然高效,但仍有开销
- 解决方案:合理设置初始容量
-
内存可见性开销:
- volatile读写有一定性能开销
- 解决方案:批量操作,减少访问频率
-
GC压力:
- 频繁创建Node对象
- 解决方案:对象池,减少对象创建
Q11: 如何监控ConcurrentHashMap的性能?
答案:
-
基本指标监控:
public class ConcurrentHashMapMonitor<K, V> { private final ConcurrentHashMap<K, V> map; private final AtomicLong getCount = new AtomicLong(); private final AtomicLong putCount = new AtomicLong(); private final AtomicLong getTime = new AtomicLong(); private final AtomicLong putTime = new AtomicLong(); public V get(K key) { long start = System.nanoTime(); try { return map.get(key); } finally { getCount.incrementAndGet(); getTime.addAndGet(System.nanoTime() - start); } } public void printStats() { long gets = getCount.get(); long puts = putCount.get(); System.out.println("平均get耗时: " + (getTime.get() / gets) + "ns"); System.out.println("平均put耗时: " + (putTime.get() / puts) + "ns"); System.out.println("当前大小: " + map.size()); } } -
JVM监控:
# 使用JProfiler、VisualVM等工具 # 监控GC频率、内存使用、线程竞争 jstat -gc -t pid 1s -
应用监控:
// 使用Micrometer等监控框架 @Component public class MapMetrics { private final MeterRegistry meterRegistry; private final ConcurrentHashMap<String, String> cache; public MapMetrics(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; Gauge.builder("cache.size") .register(meterRegistry, cache, Map::size); } }
实际应用类
Q12: ConcurrentHashMap适用于哪些场景?
答案:
-
高并发缓存:
@Component public class UserCache { private final ConcurrentHashMap<Long, User> cache = new ConcurrentHashMap<>(); public User getUser(Long userId) { return cache.computeIfAbsent(userId, this::loadUserFromDB); } private User loadUserFromDB(Long userId) { // 从数据库加载用户信息 return userRepository.findById(userId); } } -
计数器实现:
public class ConcurrentCounter { private final ConcurrentHashMap<String, AtomicLong> counters = new ConcurrentHashMap<>(); public long increment(String key) { return counters.computeIfAbsent(key, k -> new AtomicLong(0)) .incrementAndGet(); } public long getCount(String key) { AtomicLong counter = counters.get(key); return counter != null ? counter.get() : 0; } } -
配置管理:
@Component public class ConfigManager { private final ConcurrentHashMap<String, String> configs = new ConcurrentHashMap<>(); @PostConstruct public void loadConfigs() { // 从配置文件或数据库加载配置 } public String getConfig(String key) { return configs.get(key); } public void updateConfig(String key, String value) { configs.put(key, value); // 通知配置变更 } } -
会话管理:
@Component public class SessionManager { private final ConcurrentHashMap<String, HttpSession> sessions = new ConcurrentHashMap<>(); public void addSession(String sessionId, HttpSession session) { sessions.put(sessionId, session); } public HttpSession getSession(String sessionId) { return sessions.get(sessionId); } public void removeExpiredSessions() { long now = System.currentTimeMillis(); sessions.entrySet().removeIf(entry -> { HttpSession session = entry.getValue(); return now - session.getLastAccessedTime() > session.getMaxInactiveInterval() * 1000; }); } }
Q13: ConcurrentHashMap的迭代器有什么特点?
答案:
ConcurrentHashMap使用fail-safe迭代器:
-
弱一致性:
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); map.put("key1", "value1"); map.put("key2", "value2"); // 迭代过程中的修改不会抛出异常 for (Map.Entry<String, String> entry : map.entrySet()) { if ("key1".equals(entry.getKey())) { map.put("key3", "value3"); // 不会抛出ConcurrentModificationException } } -
快照语义:
// 迭代器创建时的快照,可能不反映最新状态 Iterator<String> iterator = map.keySet().iterator(); map.put("newKey", "newValue"); // 迭代器可能看不到这个新元素 -
与HashMap对比:
// HashMap - fail-fast Map<String, String> hashMap = new HashMap<>(); for (String key : hashMap.keySet()) { hashMap.put("newKey", "newValue"); // 抛出ConcurrentModificationException } // ConcurrentHashMap - fail-safe ConcurrentHashMap<String, String> concurrentMap = new ConcurrentHashMap<>(); for (String key : concurrentMap.keySet()) { concurrentMap.put("newKey", "newValue"); // 正常执行 }
Q14: 如何正确使用ConcurrentHashMap的原子操作方法?
答案:
ConcurrentHashMap提供了多个原子操作方法:
-
putIfAbsent():
// 线程安全的"如果不存在则插入" String oldValue = map.putIfAbsent("key", "value"); if (oldValue == null) { // 插入成功 } else { // key已存在,使用oldValue } -
replace():
// 原子替换 boolean success = map.replace("key", "oldValue", "newValue"); if (success) { // 替换成功 } -
compute()系列:
// 原子计算并更新 map.compute("counter", (key, value) -> { return value == null ? 1 : value + 1; }); // 仅当key存在时计算 map.computeIfPresent("key", (key, value) -> { return value.toUpperCase(); }); // 仅当key不存在时计算 map.computeIfAbsent("key", key -> { return loadValueFromDB(key); }); -
merge():
// 合并操作 map.merge("key", 1, (oldValue, newValue) -> oldValue + newValue); // 实现计数器 public void incrementCounter(String key) { map.merge(key, 1L, Long::sum); }
Q15: ConcurrentHashMap在分布式系统中的应用注意事项?
答案:
-
本地缓存场景:
@Component public class LocalCache { private final ConcurrentHashMap<String, CacheEntry> cache = new ConcurrentHashMap<>(); public <T> T get(String key, Supplier<T> loader) { CacheEntry entry = cache.computeIfAbsent(key, k -> { T value = loader.get(); return new CacheEntry(value, System.currentTimeMillis()); }); // 检查过期 if (System.currentTimeMillis() - entry.timestamp > EXPIRE_TIME) { cache.remove(key); return get(key, loader); // 重新加载 } return (T) entry.value; } } -
数据一致性问题:
// 注意:ConcurrentHashMap只保证单机一致性 // 分布式环境需要额外的一致性保证 @Service public class DistributedCounter { private final ConcurrentHashMap<String, Long> localCounters = new ConcurrentHashMap<>(); private final RedisTemplate<String, Long> redisTemplate; public void increment(String key) { // 本地计数 localCounters.merge(key, 1L, Long::sum); // 定期同步到Redis if (shouldSync()) { syncToRedis(); } } } -
内存管理:
// 防止内存泄漏 @Component public class ManagedCache { private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>(); private final ScheduledExecutorService cleaner = Executors.newScheduledThreadPool(1); @PostConstruct public void startCleaner() { cleaner.scheduleAtFixedRate(this::cleanup, 1, 1, TimeUnit.HOURS); } private void cleanup() { // 清理过期数据 cache.entrySet().removeIf(this::isExpired); } @PreDestroy public void shutdown() { cleaner.shutdown(); } }
总结
ConcurrentHashMap是Java并发编程中的重要工具,深入理解其实现原理对于开发高性能并发应用至关重要:
核心要点
-
线程安全机制:
- JDK1.7:分段锁(Segment + ReentrantLock)
- JDK1.8:CAS + synchronized + volatile
-
性能优化:
- 无锁读取,高并发性能
- 细粒度锁定,减少竞争
- 多线程协作扩容
-
数据结构演进:
- 引入红黑树优化查询性能
- 取消Segment减少内存开销
- 动态并发度调整
-
弱一致性:
- fail-safe迭代器
- size()方法返回近似值
- 适合高并发场景
使用建议
- 合理设置初始容量,避免频繁扩容
- 使用原子操作方法,保证操作的原子性
- 避免热点数据,使用分片技术分散访问
- 注意内存管理,定期清理过期数据
- 监控性能指标,及时发现瓶颈
面试重点
- 实现原理:分段锁机制、CAS操作、volatile关键字
- 版本差异:JDK1.7与JDK1.8的优化点
- 线程安全:如何保证并发安全性
- 性能特点:读写性能、扩容机制
- 应用场景:适用的业务场景和最佳实践
- 与其他Map的对比:HashMap、HashTable、Collections.synchronizedMap
学习路径
- 理解基础概念:哈希表、线程安全、并发控制
- 掌握实现原理:源码分析、数据结构、算法实现
- 学习性能优化:瓶颈分析、调优技巧、监控方法
- 实践应用场景:缓存、计数器、配置管理等
- 对比学习:与其他并发集合类的异同
掌握ConcurrentHashMap不仅能在面试中展现扎实的并发编程基础,更能在实际项目中设计出高性能的并发系统。
文章标签
冬眠
博主专注于技术、阅读与思考。在这里记录学习、思考与生活。
第 2 篇,共 2 篇
