冬眠的笔记
首页文章分类书单项目关于
冬眠
X

© 2026 冬眠的笔记 · 用文字记录思考,用思考改变生活

首页>文章>Java
Java集合ConcurrentHashMap并发

ConcurrentHashMap 源码分析及面试题

ConcurrentHashMap 的 CAS+synchronized 并发控制、JDK 1.7 与 1.8 的实现差异以及高频面试题解析

冬眠
冬眠
专注于技术、阅读与思考
2025-11-19
发布日期
30 min read
阅读时长
浏览量
ConcurrentHashMap 源码分析及面试题

ConcurrentHashMap概述

定义和特点

ConcurrentHashMap是Java并发包中提供的线程安全的哈希表实现,它在保证线程安全的同时,尽可能地提高并发性能。

主要特点:

  • 线程安全,支持高并发访问
  • 不允许null键和null值
  • 弱一致性迭代器(fail-safe)
  • 分段锁机制(JDK1.7)或CAS+synchronized(JDK1.8)
  • 高效的并发读写操作

继承关系

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable

与其他Map实现的对比

特性 HashMap HashTable ConcurrentHashMap
线程安全 否 是 是
锁机制 无 全表锁 分段锁/CAS+synchronized
null值 允许 不允许 不允许
性能 高(单线程) 低(同步开销大) 高(并发优化)
迭代器 fail-fast fail-fast fail-safe
适用场景 单线程 低并发 高并发

适用场景

  1. 高并发缓存:多线程频繁读写的缓存系统
  2. 计数器:并发环境下的统计计数
  3. 配置管理:多线程共享的配置信息存储
  4. 会话管理:Web应用中的用户会话存储
  5. 任务调度:并发任务的状态管理

底层数据结构

JDK1.7:分段锁机制

ConcurrentHashMap
├── Segment[0] (继承ReentrantLock)
│   └── HashEntry[] table
│       ├── HashEntry -> HashEntry -> null
│       ├── HashEntry -> null
│       └── ...
├── Segment[1]
│   └── HashEntry[] table
└── ...

核心组件:

  • Segment数组:每个Segment管理一部分桶
  • ReentrantLock:每个Segment继承ReentrantLock提供锁机制
  • HashEntry链表:存储实际的键值对数据

特点:

  • 默认16个Segment,支持16个线程并发写入
  • 读操作无需加锁(volatile保证可见性)
  • 写操作只锁定对应的Segment

JDK1.8:CAS + synchronized

ConcurrentHashMap
└── Node[] table
    ├── Node -> Node -> Node -> null (链表)
    ├── TreeBin -> TreeNode (红黑树)
    ├── ForwardingNode (扩容标记)
    └── ReservationNode (占位节点)

核心组件:

  • Node数组:直接使用Node数组存储数据
  • CAS操作:无锁的原子操作
  • synchronized:细粒度锁,锁定数组的具体位置
  • 红黑树:链表长度>=8时转换为红黑树

优化点:

  • 取消Segment,减少内存开销
  • 更细粒度的锁控制
  • 支持更高的并发度
  • 引入红黑树优化查询性能

核心源码分析

重要字段(JDK1.8)

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> {
    // 最大容量
    private static final int MAXIMUM_CAPACITY = 1 << 30;
    
    // 默认初始容量
    private static final int DEFAULT_CAPACITY = 16;
    
    // 默认负载因子
    private static final float LOAD_FACTOR = 0.75f;
    
    // 链表转红黑树的阈值
    static final int TREEIFY_THRESHOLD = 8;
    
    // 红黑树转链表的阈值
    static final int UNTREEIFY_THRESHOLD = 6;
    
    // 转红黑树时数组的最小长度
    static final int MIN_TREEIFY_CAPACITY = 64;
    
    // 存储数据的数组
    transient volatile Node<K,V>[] table;
    
    // 扩容时的新数组
    private transient volatile Node<K,V>[] nextTable;
    
    // 控制标识符,用于控制table的初始化和扩容
    private transient volatile int sizeCtl;
    
    // 元素个数,使用LongAdder实现
    private transient volatile long baseCount;
}

Node节点结构

// 普通节点
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;        // volatile保证可见性
    volatile Node<K,V> next; // volatile保证可见性
    
    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
}

// 红黑树节点
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;
    boolean red;
}

// 扩容时的转发节点
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
}

put方法详解

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    
    // 1. 计算hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    
    // 2. 自旋插入
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        
        // 3. 如果table为空,进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        
        // 4. 如果目标位置为空,使用CAS插入
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break; // 插入成功,跳出循环
        }
        
        // 5. 如果正在扩容,协助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        
        // 6. 发生hash冲突,需要加锁处理
        else {
            V oldVal = null;
            synchronized (f) { // 锁定头节点
                if (tabAt(tab, i) == f) {
                    // 链表处理
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 找到相同key,更新value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 到达链表尾部,插入新节点
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    // 红黑树处理
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            
            // 7. 检查是否需要转换为红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    
    // 8. 增加计数,检查是否需要扩容
    addCount(1L, binCount);
    return null;
}

put方法执行流程:

  1. 检查key和value是否为null(不允许null)
  2. 计算hash值
  3. 自旋尝试插入:
    • 如果table未初始化,先初始化
    • 如果目标位置为空,使用CAS插入
    • 如果正在扩容,协助扩容
    • 如果发生冲突,使用synchronized锁定头节点进行插入
  4. 检查链表长度,必要时转换为红黑树
  5. 更新元素计数,检查是否需要扩容

get方法详解

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    
    // 1. 计算hash值
    int h = spread(key.hashCode());
    
    // 2. 检查table和目标位置
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        
        // 3. 检查第一个节点
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 4. 如果是特殊节点(红黑树或ForwardingNode)
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        
        // 5. 遍历链表查找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get方法特点:

  • 无锁读取:利用volatile保证可见性
  • 高效查找:支持链表和红黑树两种结构
  • 扩容兼容:通过ForwardingNode处理扩容期间的查找

扩容机制(transfer方法)

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    
    // 1. 计算每个线程处理的桶数量
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE;
    
    // 2. 初始化新数组
    if (nextTab == null) {
        try {
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false;
    
    // 3. 多线程协作扩容
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        
        // 获取下一个需要处理的桶
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
                                       nextBound = (nextIndex > stride ?
                                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        
        // 处理具体的桶
        if (i < 0 || i >= n || i + n >= nextn) {
            // 扩容完成处理
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // 已经被处理
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        // 链表分割
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        // 红黑树分割
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

扩容机制特点:

  • 多线程协作:多个线程同时参与扩容过程
  • 分段处理:每个线程处理一部分桶,提高效率
  • ForwardingNode:标记已处理的桶,支持扩容期间的查找
  • 链表分割:类似HashMap,根据hash值重新分布元素

线程安全机制

1. volatile关键字

// 保证数组引用的可见性
transient volatile Node<K,V>[] table;

// 保证控制字段的可见性
private transient volatile int sizeCtl;

// 保证节点值的可见性
static class Node<K,V> {
    volatile V val;
    volatile Node<K,V> next;
}

作用:

  • 保证内存可见性
  • 禁止指令重排序
  • 实现无锁读取

2. CAS操作

// 原子地设置数组元素
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

// 原子地获取数组元素
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int index) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)index << ASHIFT) + ABASE);
}

// 原子地设置数组元素
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

优势:

  • 无锁操作,性能高
  • 避免线程阻塞
  • 减少上下文切换

3. synchronized锁

// 锁定链表或红黑树的头节点
synchronized (f) {
    if (tabAt(tab, i) == f) {
        // 执行插入或更新操作
    }
}

特点:

  • 细粒度锁定
  • 只锁定冲突的桶
  • JVM优化的轻量级锁

4. 分段锁思想(JDK1.7)

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile HashEntry<K,V>[] table;
    transient int count;
    transient int modCount;
    transient int threshold;
    final float loadFactor;
}

原理:

  • 将数据分成多个段(Segment)
  • 每个段独立加锁
  • 不同段之间可以并发访问
  • 默认16个段,支持16个线程并发写入

JDK版本差异

JDK1.7 vs JDK1.8 对比

特性 JDK1.7 JDK1.8
锁机制 Segment分段锁 CAS + synchronized
数据结构 Segment + HashEntry[] Node[] + 红黑树
并发度 固定(默认16) 动态(数组长度)
内存占用 较高(Segment开销) 较低
扩容方式 单线程扩容 多线程协作扩容
查询性能 O(n) O(log n)

详细分析

1. 锁机制优化

JDK1.7分段锁问题:

  • Segment数量固定,限制了并发度
  • 即使数据分布不均,也无法动态调整
  • Segment继承ReentrantLock,内存开销大

JDK1.8优化方案:

  • 取消Segment,直接在Node数组上加锁
  • 并发度等于数组长度,可动态扩展
  • 使用内置锁synchronized,JVM优化更好

2. 数据结构改进

JDK1.7结构:

ConcurrentHashMap {
    Segment[] segments;
}

Segment {
    HashEntry[] table;
    ReentrantLock lock;
}

JDK1.8结构:

ConcurrentHashMap {
    Node[] table;
    // 直接管理Node数组
}

3. 扩容机制升级

JDK1.7扩容:

  • 每个Segment独立扩容
  • 单线程执行,效率较低
  • 扩容期间该Segment不可用

JDK1.8扩容:

  • 多线程协作扩容
  • 分段处理,提高效率
  • 扩容期间仍可提供服务

性能分析与优化

性能特点

1. 读操作性能

// 读操作无需加锁,性能接近HashMap
public V get(Object key) {
    // 利用volatile保证可见性
    // 无锁读取,高并发性能优秀
}

优势:

  • 无锁读取,并发性能好
  • volatile保证数据一致性
  • 支持弱一致性迭代

2. 写操作性能

// 写操作使用CAS + synchronized
public V put(K key, V value) {
    // 1. CAS尝试无锁插入
    // 2. 冲突时使用synchronized细粒度锁
    // 3. 多线程协作扩容
}

特点:

  • CAS操作避免不必要的加锁
  • synchronized细粒度锁定
  • 锁竞争小,性能好

性能测试对比

// 性能测试示例
public class ConcurrentHashMapPerformanceTest {
    private static final int THREAD_COUNT = 16;
    private static final int OPERATIONS = 1000000;
    
    public static void testConcurrentHashMap() {
        ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
        
        // 多线程写入测试
        long startTime = System.currentTimeMillis();
        
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
        
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadId = i;
            executor.submit(() -> {
                try {
                    for (int j = 0; j < OPERATIONS / THREAD_COUNT; j++) {
                        map.put(threadId * OPERATIONS + j, "value" + j);
                    }
                } finally {
                    latch.countDown();
                }
            });
        }
        
        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        long endTime = System.currentTimeMillis();
        System.out.println("ConcurrentHashMap写入耗时: " + (endTime - startTime) + "ms");
        
        executor.shutdown();
    }
}

优化建议

1. 合理设置初始容量

// 根据预期数据量设置初始容量
int expectedSize = 10000;
int initialCapacity = (int) (expectedSize / 0.75f) + 1;
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(initialCapacity);

2. 选择合适的并发级别(JDK1.7)

// JDK1.7中可以设置并发级别
int concurrencyLevel = Runtime.getRuntime().availableProcessors();
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);

3. 避免热点数据

// 避免大量线程访问相同的key
// 可以使用分片技术分散热点
public class ShardedCounter {
    private final ConcurrentHashMap<String, AtomicLong>[] shards;
    private final int shardCount;
    
    public ShardedCounter(int shardCount) {
        this.shardCount = shardCount;
        this.shards = new ConcurrentHashMap[shardCount];
        for (int i = 0; i < shardCount; i++) {
            shards[i] = new ConcurrentHashMap<>();
        }
    }
    
    public void increment(String key) {
        int shard = Math.abs(key.hashCode()) % shardCount;
        shards[shard].computeIfAbsent(key, k -> new AtomicLong(0)).incrementAndGet();
    }
}

4. 使用批量操作

// 使用putAll进行批量插入
Map<String, String> batch = new HashMap<>();
for (int i = 0; i < 1000; i++) {
    batch.put("key" + i, "value" + i);
}
concurrentMap.putAll(batch);

// 使用compute系列方法进行原子更新
concurrentMap.compute("counter", (k, v) -> v == null ? 1 : Integer.parseInt(v) + 1);

常见面试题

基础概念类

Q1: ConcurrentHashMap的实现原理是什么?

答案: ConcurrentHashMap通过分段锁机制实现线程安全的哈希表:

JDK1.7实现:

  • 使用Segment数组,每个Segment继承ReentrantLock
  • 不同Segment之间可以并发访问
  • 读操作无需加锁,写操作只锁定对应Segment
  • 默认16个Segment,支持16个线程并发写入

JDK1.8优化:

  • 取消Segment,直接使用Node数组
  • 采用CAS + synchronized实现线程安全
  • 读操作无锁,写操作细粒度锁定
  • 支持多线程协作扩容

Q2: ConcurrentHashMap为什么不允许null键和null值?

答案: 主要原因是二义性问题:

  1. get()方法的二义性:

    V value = map.get(key);
    if (value == null) {
        // 无法区分:
        // 1. key不存在
        // 2. key存在但value为null
    }
    
  2. 并发环境下的问题:

    • 在单线程HashMap中可以用containsKey()区分
    • 在并发环境下,containsKey()和get()之间状态可能改变
    • 无法保证原子性判断
  3. 设计一致性:

    • ConcurrentHashMap设计为高并发场景
    • 避免歧义,提供明确的语义

Q3: ConcurrentHashMap的size()方法是如何实现的?

答案:

JDK1.7实现:

public int size() {
    final Segment<K,V>[] segments = this.segments;
    long sum = 0;
    long check = 0;
    int[] mc = new int[segments.length];
    
    // 尝试无锁计算
    for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
        check = 0;
        sum = 0;
        for (int i = 0; i < segments.length; ++i) {
            sum += segments[i].count;
            mc[i] = segments[i].modCount;
        }
        for (int i = 0; i < segments.length; ++i) {
            check += segments[i].modCount;
        }
        if (check == sum) // 无修改,返回结果
            break;
    }
    
    // 加锁计算
    if (check != sum) {
        sum = 0;
        for (Segment<K,V> segment : segments)
            segment.lock();
        try {
            for (Segment<K,V> segment : segments)
                sum += segment.count;
        } finally {
            for (Segment<K,V> segment : segments)
                segment.unlock();
        }
    }
    return (int) sum;
}

JDK1.8实现:

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

特点:

  • JDK1.8使用类似LongAdder的计数方式
  • 分散计数,减少竞争
  • 结果是弱一致性的近似值

Q4: ConcurrentHashMap与Collections.synchronizedMap()的区别?

答案:

特性 ConcurrentHashMap Collections.synchronizedMap()
锁粒度 细粒度(桶级别) 粗粒度(整个Map)
并发性能 高 低
读操作 无锁 需要同步
迭代器 fail-safe fail-fast
null值 不允许 允许
扩容 多线程协作 单线程
// Collections.synchronizedMap实现
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
    return new SynchronizedMap<>(m);
}

private static class SynchronizedMap<K,V> implements Map<K,V> {
    private final Map<K,V> m;
    final Object mutex;
    
    public V get(Object key) {
        synchronized (mutex) { return m.get(key); } // 读也需要同步
    }
    
    public V put(K key, V value) {
        synchronized (mutex) { return m.put(key, value); }
    }
}

源码实现类

Q5: ConcurrentHashMap的put操作是如何保证线程安全的?

答案: ConcurrentHashMap的put操作通过多层机制保证线程安全:

  1. CAS无锁插入:

    // 如果目标位置为空,使用CAS插入
    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
        break;
    
  2. synchronized细粒度锁:

    // 发生冲突时,锁定头节点
    synchronized (f) {
        if (tabAt(tab, i) == f) {
            // 执行链表或红黑树操作
        }
    }
    
  3. volatile保证可见性:

    volatile V val;
    volatile Node<K,V> next;
    
  4. 自旋重试:

    for (Node<K,V>[] tab = table;;) {
        // 自旋直到插入成功
    }
    

Q6: ConcurrentHashMap如何实现无锁读取?

答案: 通过volatile关键字和内存屏障实现:

  1. volatile数组引用:

    transient volatile Node<K,V>[] table;
    
  2. volatile节点字段:

    static class Node<K,V> {
        volatile V val;
        volatile Node<K,V> next;
    }
    
  3. Unsafe原子操作:

    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int index) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)index << ASHIFT) + ABASE);
    }
    

原理:

  • volatile保证写操作对读操作立即可见
  • happens-before关系确保内存一致性
  • 读操作不需要加锁,性能接近HashMap

Q7: ConcurrentHashMap的扩容过程是怎样的?

答案: ConcurrentHashMap支持多线程协作扩容:

  1. 触发扩容:

    if (++size > threshold)
        addCount(1L, binCount); // 可能触发扩容
    
  2. 多线程协作:

    // 每个线程处理一部分桶
    int stride = (NCPU > 1) ? (n >>> 3) / NCPU : n;
    
  3. ForwardingNode标记:

    // 已处理的桶放置ForwardingNode
    setTabAt(tab, i, new ForwardingNode<K,V>(nextTab));
    
  4. 分段处理:

    • 将旧数组分成多个段
    • 每个线程负责一个段的迁移
    • 使用CAS获取待处理的段
  5. 扩容期间的读写:

    • 读操作:通过ForwardingNode转发到新数组
    • 写操作:协助扩容或等待扩容完成

Q8: 红黑树在ConcurrentHashMap中是如何使用的?

答案:

  1. 转换条件:

    // 链表长度>=8且数组长度>=64时转换
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    
  2. TreeBin包装器:

    static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
    }
    
  3. 读写锁机制:

    // 读操作使用乐观锁
    // 写操作使用悲观锁
    static final int WRITER = 1; // 写锁
    static final int WAITER = 2; // 等待锁
    static final int READER = 4; // 读锁
    
  4. 线程安全保证:

    • 读操作:乐观读,无锁访问
    • 写操作:获取写锁,独占访问
    • 冲突处理:读写冲突时降级为链表遍历

性能优化类

Q9: 如何选择ConcurrentHashMap的初始容量?

答案:

  1. 基本原则:

    // 初始容量 = 预期元素数量 / 负载因子
    int expectedSize = 10000;
    int initialCapacity = (int) (expectedSize / 0.75f) + 1;
    ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(initialCapacity);
    
  2. 考虑因素:

    • 预期数据量:避免频繁扩容
    • 并发度:JDK1.8中容量影响并发度
    • 内存限制:过大的初始容量浪费内存
  3. 最佳实践:

    // 根据业务场景调整
    public class ConcurrentHashMapFactory {
        public static <K, V> ConcurrentHashMap<K, V> create(int expectedSize) {
            // 计算合适的初始容量
            int capacity = 1;
            while (capacity < expectedSize / 0.75f) {
                capacity <<= 1;
            }
            return new ConcurrentHashMap<>(capacity);
        }
    }
    

Q10: ConcurrentHashMap在高并发场景下的性能瓶颈是什么?

答案:

  1. 热点数据竞争:

    // 大量线程访问相同key会导致锁竞争
    // 解决方案:数据分片
    public class ShardedMap<K, V> {
        private final ConcurrentHashMap<K, V>[] shards;
        private final int shardCount;
        
        public V get(K key) {
            return getShardMap(key).get(key);
        }
        
        private ConcurrentHashMap<K, V> getShardMap(K key) {
            return shards[Math.abs(key.hashCode()) % shardCount];
        }
    }
    
  2. 扩容开销:

    • 多线程协作扩容虽然高效,但仍有开销
    • 解决方案:合理设置初始容量
  3. 内存可见性开销:

    • volatile读写有一定性能开销
    • 解决方案:批量操作,减少访问频率
  4. GC压力:

    • 频繁创建Node对象
    • 解决方案:对象池,减少对象创建

Q11: 如何监控ConcurrentHashMap的性能?

答案:

  1. 基本指标监控:

    public class ConcurrentHashMapMonitor<K, V> {
        private final ConcurrentHashMap<K, V> map;
        private final AtomicLong getCount = new AtomicLong();
        private final AtomicLong putCount = new AtomicLong();
        private final AtomicLong getTime = new AtomicLong();
        private final AtomicLong putTime = new AtomicLong();
        
        public V get(K key) {
            long start = System.nanoTime();
            try {
                return map.get(key);
            } finally {
                getCount.incrementAndGet();
                getTime.addAndGet(System.nanoTime() - start);
            }
        }
        
        public void printStats() {
            long gets = getCount.get();
            long puts = putCount.get();
            System.out.println("平均get耗时: " + (getTime.get() / gets) + "ns");
            System.out.println("平均put耗时: " + (putTime.get() / puts) + "ns");
            System.out.println("当前大小: " + map.size());
        }
    }
    
  2. JVM监控:

    # 使用JProfiler、VisualVM等工具
    # 监控GC频率、内存使用、线程竞争
    jstat -gc -t pid 1s
    
  3. 应用监控:

    // 使用Micrometer等监控框架
    @Component
    public class MapMetrics {
        private final MeterRegistry meterRegistry;
        private final ConcurrentHashMap<String, String> cache;
        
        public MapMetrics(MeterRegistry meterRegistry) {
            this.meterRegistry = meterRegistry;
            Gauge.builder("cache.size")
                 .register(meterRegistry, cache, Map::size);
        }
    }
    

实际应用类

Q12: ConcurrentHashMap适用于哪些场景?

答案:

  1. 高并发缓存:

    @Component
    public class UserCache {
        private final ConcurrentHashMap<Long, User> cache = new ConcurrentHashMap<>();
        
        public User getUser(Long userId) {
            return cache.computeIfAbsent(userId, this::loadUserFromDB);
        }
        
        private User loadUserFromDB(Long userId) {
            // 从数据库加载用户信息
            return userRepository.findById(userId);
        }
    }
    
  2. 计数器实现:

    public class ConcurrentCounter {
        private final ConcurrentHashMap<String, AtomicLong> counters = new ConcurrentHashMap<>();
        
        public long increment(String key) {
            return counters.computeIfAbsent(key, k -> new AtomicLong(0))
                          .incrementAndGet();
        }
        
        public long getCount(String key) {
            AtomicLong counter = counters.get(key);
            return counter != null ? counter.get() : 0;
        }
    }
    
  3. 配置管理:

    @Component
    public class ConfigManager {
        private final ConcurrentHashMap<String, String> configs = new ConcurrentHashMap<>();
        
        @PostConstruct
        public void loadConfigs() {
            // 从配置文件或数据库加载配置
        }
        
        public String getConfig(String key) {
            return configs.get(key);
        }
        
        public void updateConfig(String key, String value) {
            configs.put(key, value);
            // 通知配置变更
        }
    }
    
  4. 会话管理:

    @Component
    public class SessionManager {
        private final ConcurrentHashMap<String, HttpSession> sessions = new ConcurrentHashMap<>();
        
        public void addSession(String sessionId, HttpSession session) {
            sessions.put(sessionId, session);
        }
        
        public HttpSession getSession(String sessionId) {
            return sessions.get(sessionId);
        }
        
        public void removeExpiredSessions() {
            long now = System.currentTimeMillis();
            sessions.entrySet().removeIf(entry -> {
                HttpSession session = entry.getValue();
                return now - session.getLastAccessedTime() > session.getMaxInactiveInterval() * 1000;
            });
        }
    }
    

Q13: ConcurrentHashMap的迭代器有什么特点?

答案:

ConcurrentHashMap使用fail-safe迭代器:

  1. 弱一致性:

    ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
    map.put("key1", "value1");
    map.put("key2", "value2");
    
    // 迭代过程中的修改不会抛出异常
    for (Map.Entry<String, String> entry : map.entrySet()) {
        if ("key1".equals(entry.getKey())) {
            map.put("key3", "value3"); // 不会抛出ConcurrentModificationException
        }
    }
    
  2. 快照语义:

    // 迭代器创建时的快照,可能不反映最新状态
    Iterator<String> iterator = map.keySet().iterator();
    map.put("newKey", "newValue"); // 迭代器可能看不到这个新元素
    
  3. 与HashMap对比:

    // HashMap - fail-fast
    Map<String, String> hashMap = new HashMap<>();
    for (String key : hashMap.keySet()) {
        hashMap.put("newKey", "newValue"); // 抛出ConcurrentModificationException
    }
    
    // ConcurrentHashMap - fail-safe
    ConcurrentHashMap<String, String> concurrentMap = new ConcurrentHashMap<>();
    for (String key : concurrentMap.keySet()) {
        concurrentMap.put("newKey", "newValue"); // 正常执行
    }
    

Q14: 如何正确使用ConcurrentHashMap的原子操作方法?

答案:

ConcurrentHashMap提供了多个原子操作方法:

  1. putIfAbsent():

    // 线程安全的"如果不存在则插入"
    String oldValue = map.putIfAbsent("key", "value");
    if (oldValue == null) {
        // 插入成功
    } else {
        // key已存在,使用oldValue
    }
    
  2. replace():

    // 原子替换
    boolean success = map.replace("key", "oldValue", "newValue");
    if (success) {
        // 替换成功
    }
    
  3. compute()系列:

    // 原子计算并更新
    map.compute("counter", (key, value) -> {
        return value == null ? 1 : value + 1;
    });
    
    // 仅当key存在时计算
    map.computeIfPresent("key", (key, value) -> {
        return value.toUpperCase();
    });
    
    // 仅当key不存在时计算
    map.computeIfAbsent("key", key -> {
        return loadValueFromDB(key);
    });
    
  4. merge():

    // 合并操作
    map.merge("key", 1, (oldValue, newValue) -> oldValue + newValue);
    
    // 实现计数器
    public void incrementCounter(String key) {
        map.merge(key, 1L, Long::sum);
    }
    

Q15: ConcurrentHashMap在分布式系统中的应用注意事项?

答案:

  1. 本地缓存场景:

    @Component
    public class LocalCache {
        private final ConcurrentHashMap<String, CacheEntry> cache = new ConcurrentHashMap<>();
        
        public <T> T get(String key, Supplier<T> loader) {
            CacheEntry entry = cache.computeIfAbsent(key, k -> {
                T value = loader.get();
                return new CacheEntry(value, System.currentTimeMillis());
            });
            
            // 检查过期
            if (System.currentTimeMillis() - entry.timestamp > EXPIRE_TIME) {
                cache.remove(key);
                return get(key, loader); // 重新加载
            }
            
            return (T) entry.value;
        }
    }
    
  2. 数据一致性问题:

    // 注意:ConcurrentHashMap只保证单机一致性
    // 分布式环境需要额外的一致性保证
    
    @Service
    public class DistributedCounter {
        private final ConcurrentHashMap<String, Long> localCounters = new ConcurrentHashMap<>();
        private final RedisTemplate<String, Long> redisTemplate;
        
        public void increment(String key) {
            // 本地计数
            localCounters.merge(key, 1L, Long::sum);
            
            // 定期同步到Redis
            if (shouldSync()) {
                syncToRedis();
            }
        }
    }
    
  3. 内存管理:

    // 防止内存泄漏
    @Component
    public class ManagedCache {
        private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();
        private final ScheduledExecutorService cleaner = Executors.newScheduledThreadPool(1);
        
        @PostConstruct
        public void startCleaner() {
            cleaner.scheduleAtFixedRate(this::cleanup, 1, 1, TimeUnit.HOURS);
        }
        
        private void cleanup() {
            // 清理过期数据
            cache.entrySet().removeIf(this::isExpired);
        }
        
        @PreDestroy
        public void shutdown() {
            cleaner.shutdown();
        }
    }
    

总结

ConcurrentHashMap是Java并发编程中的重要工具,深入理解其实现原理对于开发高性能并发应用至关重要:

核心要点

  1. 线程安全机制:

    • JDK1.7:分段锁(Segment + ReentrantLock)
    • JDK1.8:CAS + synchronized + volatile
  2. 性能优化:

    • 无锁读取,高并发性能
    • 细粒度锁定,减少竞争
    • 多线程协作扩容
  3. 数据结构演进:

    • 引入红黑树优化查询性能
    • 取消Segment减少内存开销
    • 动态并发度调整
  4. 弱一致性:

    • fail-safe迭代器
    • size()方法返回近似值
    • 适合高并发场景

使用建议

  1. 合理设置初始容量,避免频繁扩容
  2. 使用原子操作方法,保证操作的原子性
  3. 避免热点数据,使用分片技术分散访问
  4. 注意内存管理,定期清理过期数据
  5. 监控性能指标,及时发现瓶颈

面试重点

  • 实现原理:分段锁机制、CAS操作、volatile关键字
  • 版本差异:JDK1.7与JDK1.8的优化点
  • 线程安全:如何保证并发安全性
  • 性能特点:读写性能、扩容机制
  • 应用场景:适用的业务场景和最佳实践
  • 与其他Map的对比:HashMap、HashTable、Collections.synchronizedMap

学习路径

  1. 理解基础概念:哈希表、线程安全、并发控制
  2. 掌握实现原理:源码分析、数据结构、算法实现
  3. 学习性能优化:瓶颈分析、调优技巧、监控方法
  4. 实践应用场景:缓存、计数器、配置管理等
  5. 对比学习:与其他并发集合类的异同

掌握ConcurrentHashMap不仅能在面试中展现扎实的并发编程基础,更能在实际项目中设计出高性能的并发系统。

文章标签

Java集合ConcurrentHashMap并发源码分析
Java IO 分类详解
上一篇

Java IO 分类详解

2025-11-19

HashMap 源码分析及面试题
下一篇

HashMap 源码分析及面试题

2025-11-19

冬眠

冬眠

博主

专注于技术、阅读与思考。在这里记录学习、思考与生活。

116
文章
2
分类
关注我
系列:Java 集合

第 2 篇,共 2 篇

上一篇

HashMap 源码分析及面试题

已是最后一篇

文章目录

目录

  • ConcurrentHashMap概述
  • 底层数据结构
  • 核心源码分析
  • 线程安全机制
  • JDK版本差异
  • 性能分析与优化
  • 常见面试题
  • 总结

相关文章

查看更多
HashMap 源码分析及面试题

HashMap 源码分析及面试题

2025-11-19 · 21 min read

ThreadLocal 详解

ThreadLocal 详解

2025-11-19 · 25 min read

AbstractQueuedSynchronizer (AQS) 详解

AbstractQueuedSynchronizer (AQS) 详解

2025-11-19 · 23 min read