并发容器与原子类
ConcurrentHashMap
Section titled “ConcurrentHashMap”JDK 7:分段锁(Segment)
Section titled “JDK 7:分段锁(Segment)”JDK 7 结构:
ConcurrentHashMap└── Segment[16](默认16个段,每段是一个 ReentrantLock) └── Segment 0: [Entry链表0, Entry链表1, ...] └── Segment 1: [Entry链表0, Entry链表1, ...] ...
put(key, value): 1. 计算 key 的 hash,确定属于哪个 Segment 2. 对该 Segment 加锁(ReentrantLock) 3. 在 Segment 内部的 HashTable 中操作 4. 释放锁
并发度 = Segment 数量(默认16) → 最多16个线程可以同时写入不同的 SegmentJDK 8:CAS + synchronized(当前实现)
Section titled “JDK 8:CAS + synchronized(当前实现)”JDK 8 放弃了 Segment,直接对每个桶(链表头节点/树根节点)加锁,粒度更细。
JDK 8 结构:
Node[] table(数组 + 链表 + 红黑树,同 HashMap 8)
关键操作:
put(key, value): 1. 计算 hash 2. table[i] 为空?→ CAS 直接写入(无锁) 3. table[i] 不为空?→ synchronized(table[i]) 后操作 • 链表长度 < 8:链表追加 • 链表长度 ≥ 8 且数组长度 ≥ 64:转红黑树 4. 更新 size(LongAdder 思想的 counterCells,减少竞争)
get(key):完全无锁 • Node.val 和 Node.next 均为 volatile,保证可见性 • 数组引用也是 volatile(tabAt 使用 Unsafe 的 volatile 读)size() 的实现
Section titled “size() 的实现”JDK 8 的 size() 不加全局锁,而是使用类似 LongAdder 的分散计数:
baseCount: long(主计数器)counterCells: CounterCell[](分散计数器)
put/remove 时: 1. CAS 更新 baseCount 2. 失败(竞争激烈)→ 选一个 counterCells[i] 的 CAS 更新
size() = baseCount + sum(counterCells) 只是一个近似值,不保证实时精确(因为 size() 期间可能有其他写操作)JDK 7 vs JDK 8 对比
Section titled “JDK 7 vs JDK 8 对比”| 对比项 | JDK 7(Segment) | JDK 8(CAS + sync) |
|---|---|---|
| 锁粒度 | Segment(默认16个桶共享一把锁) | 单个桶(每个链表头独立锁) |
| 最大并发度 | Segment 数量(默认16) | 数组长度(理论上 n 个并发写) |
| 锁类型 | ReentrantLock | synchronized(JVM 优化更好) |
| 结构 | 数组+链表 | 数组+链表+红黑树 |
| 查询复杂度 | O(n) 最坏 | O(log n) 最坏(红黑树) |
| 内存占用 | 较高(Segment 对象开销) | 较低 |
CopyOnWriteArrayList
Section titled “CopyOnWriteArrayList”设计思想:写时复制(Copy-On-Write)。每次修改时,先拷贝一份完整的底层数组,在副本上修改,再用 volatile 写将引用替换为新数组。
// add 的核心逻辑public boolean add(E e) { synchronized (lock) { // 写操作加锁(防止并发写出现多份副本) Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); // 拷贝 newElements[len] = e; // 在副本上写入 setArray(newElements); // volatile 写,替换引用 return true; }}
// get 完全无锁(读旧数组也没关系,允许弱一致性)public E get(int index) { return elementAt(getArray(), index); // volatile 读数组引用}适用场景与限制
Section titled “适用场景与限制”适用:读多写少,且能接受弱一致性(读操作可能读到稍旧的快照)。
不适用:
- 写操作频繁(每次写都要完整拷贝数组,内存 GC 压力大)
- 数组很大(拷贝开销随数组大小线性增长)
- 需要强一致性的读(
iterator()返回的是创建时刻的快照,不反映之后的修改)
BlockingQueue 家族
Section titled “BlockingQueue 家族”BlockingQueue 是线程池、生产者-消费者模型的核心接口。
接口方法: 操作方式 抛异常 返回特殊值 阻塞 超时 ───────────────────────────────────────────────────── 插入 add(e) offer(e) put(e) offer(e, t, unit) 移除 remove() poll() take() poll(t, unit) 检查 element() peek() 不适用 不适用主要实现类对比
Section titled “主要实现类对比”| 实现类 | 有界/无界 | 底层结构 | 特点 |
|---|---|---|---|
ArrayBlockingQueue | 有界 | 数组 | 生产和消费用同一把锁(ReentrantLock),公平/非公平可选 |
LinkedBlockingQueue | 可选(默认无界) | 链表 | 生产和消费用两把锁(takeLock/putLock),吞吐量更高 |
SynchronousQueue | 容量为 0 | 无 | 每次 put 必须等待 take,直接交付,无缓冲 |
PriorityBlockingQueue | 无界 | 堆(优先级) | 按 Comparator 排序,不保证 FIFO |
DelayQueue | 无界 | 优先级队列 | 元素需实现 Delayed,到期才能取出 |
LinkedTransferQueue | 无界 | 链表+CAS | 支持 transfer(可直接交付或入队),吞吐量高 |
ArrayBlockingQueue vs LinkedBlockingQueue
Section titled “ArrayBlockingQueue vs LinkedBlockingQueue”ArrayBlockingQueue: • 单把锁(putLock == takeLock),生产消费互斥 • 有界,避免 OOM • 数组结构,内存连续,GC 友好
LinkedBlockingQueue: • 两把锁(putLock 和 takeLock 独立),生产消费可并发 • 默认无界(Integer.MAX_VALUE),需显式指定容量 • 链表结构,每次入队/出队创建 Node 对象
结论:高吞吐优先用 LinkedBlockingQueue(双锁); 内存严格控制用 ArrayBlockingQueue(有界)。Atomic 原子类与 CAS
Section titled “Atomic 原子类与 CAS”CAS(Compare-And-Swap)
Section titled “CAS(Compare-And-Swap)”CAS 是一种硬件级别的原子操作,对应 x86 的 CMPXCHG 指令。
CAS(内存地址V, 预期值A, 新值B): if (V == A) { V = B; return true; // 更新成功 } else { return false; // 更新失败,A 已经不是最新值 }
整个操作由 CPU 保证原子性,不需要加锁。Java 中的 CAS:通过 sun.misc.Unsafe(JDK 9+ 为 jdk.internal.misc.Unsafe)调用 native 方法实现。
AtomicInteger 原理
Section titled “AtomicInteger 原理”// 核心字段private volatile int value;
// getAndIncrement 的实现(i++的原子版本)public final int getAndIncrement() { return U.getAndAddInt(this, VALUE, 1);}
// Unsafe.getAndAddInt 的本质(自旋 CAS)public final int getAndAddInt(Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); // volatile 读取当前值 } while (!weakCompareAndSetInt(o, offset, v, v + delta)); // CAS 更新 return v;}CAS 的三大问题
Section titled “CAS 的三大问题”1. ABA 问题
线程A读取值为 A线程B将值改为 B,再改回 A线程A的 CAS 成功(值还是 A),但中间已经发生了变化
解决:AtomicStampedReference(携带版本号) compareAndSet(expectedRef, newRef, expectedStamp, newStamp)2. 自旋开销
CAS 失败后不断重试,在高竞争场景下 CPU 空转严重。
解决:
- 限制自旋次数,超过则升级为阻塞(synchronized 的做法)
LongAdder:分散热点,减少单点竞争
3. 只能保证单个变量的原子性
无法用一个 CAS 同时更新两个变量。解决:
AtomicReference<T>将多个字段封装为对象进行整体 CAS- 使用
synchronized或锁
LongAdder vs AtomicLong
Section titled “LongAdder vs AtomicLong”AtomicLong: 单个 volatile long 值,所有线程争同一个变量 高并发时 CAS 失败率高,自旋严重
LongAdder(Striped64 思想): cells: Cell[] ← 分散的计数单元(每个 Cell 内有一个 long) base: long ← 无竞争时用 base
increment 时: 1. CAS base 成功 → 直接更新 base 2. 失败 → 根据线程哈希选择 cells[i],CAS 更新 cell 3. cells[i] 也失败 → 扩展 cells 数组
sum() = base + Σ cells[i].value
结论: 低并发:AtomicLong 略快(无额外对象开销) 高并发:LongAdder 明显更快(分散竞争) 统计计数、监控指标等场景推荐 LongAdderAtomic 原子类体系
Section titled “Atomic 原子类体系”基本类型: AtomicInteger / AtomicLong / AtomicBoolean
引用类型: AtomicReference<V> - 对象引用 CAS AtomicStampedReference<V> - 带版本号,解决 ABA AtomicMarkableReference<V> - 带布尔标记
数组类型: AtomicIntegerArray / AtomicLongArray / AtomicReferenceArray
字段更新器(反射 + CAS,减少对象创建): AtomicIntegerFieldUpdater<T> AtomicLongFieldUpdater<T> AtomicReferenceFieldUpdater<T, V>
高性能累加器(Java 8+): LongAdder / LongAccumulator DoubleAdder / DoubleAccumulatorQ:ConcurrentHashMap JDK 7 和 JDK 8 有什么区别?
JDK 7 使用分段锁(Segment + ReentrantLock),并发度为 Segment 数量(默认16)。JDK 8 改为 CAS + synchronized,锁粒度细化到单个桶,最大并发度等于数组长度,同时引入红黑树优化链表过长时的查询性能。
Q:ConcurrentHashMap 的 get 操作需要加锁吗?
不需要。JDK 8 中 Node 的 val 和 next 字段均为 volatile,数组引用也通过 Unsafe 进行 volatile 读,保证了可见性。因此 get 操作完全无锁。
Q:CopyOnWriteArrayList 的迭代器是强一致性的吗?
不是,是弱一致性(快照一致性)。迭代器创建时持有当时数组的引用,后续写操作替换数组引用不影响迭代器,因此迭代时不会 ConcurrentModificationException,但也不会看到之后的修改。
Q:CAS 的 ABA 问题是什么?如何解决?
线程 A 读取值为 A,线程 B 将其修改为 B 再改回 A,线程 A 的 CAS 因值仍为 A 而成功,但实际上值已经被改变过了。解决方案是使用 AtomicStampedReference,在对象引用之外附加一个版本号(stamp),每次修改都递增版本号,CAS 时同时验证引用和版本号。
Q:LongAdder 为什么比 AtomicLong 在高并发下更快?
AtomicLong 所有线程争同一个 volatile long,高竞争时 CAS 失败率高,自旋浪费 CPU。LongAdder 通过 cells 数组将热点分散,不同线程更新不同的 Cell,减少了 CAS 竞争;汇总时将所有 Cell 的值相加。以空间(多个 Cell 对象)换时间(减少竞争)。